Compartilhar via


Exemplos de fluxos nos Pipelines Declarativos do Lakeflow Spark

Exemplo: gravar em uma tabela de streaming de vários tópicos do Kafka

O exemplo a seguir cria uma tabela de streaming nomeada kafka_target e grava nessa tabela de streaming a partir de dois tópicos do Kafka:

Python

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Para saber mais sobre a read_kafka() função com valor de tabela usada nas consultas SQL, consulte read_kafka na referência de linguagem SQL.

No Python, você pode criar programaticamente vários fluxos direcionados a uma única tabela. O exemplo a seguir mostra esse padrão para uma lista de tópicos do Kafka.

Observação

Esse padrão tem os mesmos requisitos que usar um for loop para criar tabelas. Você deve passar explicitamente um valor do Python para a função que define o fluxo. Consulte Criar tabelas em um for loop.

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Exemplo: executar um backfill de dados único

Se você quiser executar uma consulta para acrescentar dados a uma tabela de streaming existente, use append_flow.

Depois de acrescentar um conjunto de dados existentes, você terá várias opções:

  • Se você quiser que a consulta acrescente novos dados ao chegarem no diretório de backfill, deixe a consulta no lugar.
  • Se você quiser que isso seja um backfill único e nunca mais seja executado, remova a consulta depois de executar o pipeline uma vez.
  • Se você quiser que a consulta seja executada uma vez e só seja executada novamente nos casos em que os dados estão sendo totalmente atualizados, defina o once parâmetro como True no fluxo de acréscimo. No SQL, utilize INSERT INTO ONCE.

Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de streaming:

Python

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Para obter um exemplo mais aprofundado, consulte Repreenchimento de dados históricos com pipelines.

Exemplo: usar o processamento de fluxo de anexação em vez de UNION

Em vez de usar uma consulta com uma cláusula UNION, você pode usar consultas de fluxo de anexação para combinar várias fontes e gravar em uma única tabela de streaming. Usar consultas de fluxo para acréscimo em vez de UNION permite adicionar a uma tabela de streaming a partir de várias fontes sem executar uma atualização completa.

O exemplo do Python a seguir inclui uma consulta que combina várias fontes de dados com uma UNION cláusula:

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

Os exemplos a seguir substituem a UNION consulta por consultas de fluxo de acréscimo:

Python

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );