次の方法で共有


Lakeflow Spark 宣言パイプラインのフローの例

例: 複数の Kafka トピックからストリーミング テーブルに書き込む

次の例では、 kafka_target という名前のストリーミング テーブルを作成し、2 つの Kafka トピックからそのストリーミング テーブルに書き込みます。

Python

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

SQL クエリで使用 read_kafka() テーブル値関数の詳細については、SQL 言語リファレンスの read_kafka を参照してください。

Python では、1 つのテーブルを対象とする複数のフローをプログラムで作成できます。 次の例は、Kafka トピックの一覧に対するこのパターンを示しています。

このパターンには、 for ループを使用してテーブルを作成する場合と同じ要件があります。 フローを定義する関数に Python 値を明示的に渡す必要があります。 for ループでのテーブルの作成」を参照してください。

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

例: 1 回限りデータ バックフィルを実行する

既存のストリーミング テーブルにデータを追加するクエリを実行する場合は、 append_flowを使用します。

既存のデータのセットを追加した後、複数のオプションがあります。

  • クエリがバックフィル ディレクトリに到着した場合に新しいデータを追加する場合は、クエリをそのまま使用します。
  • これを 1 回限りのバックフィルにし、もう一度実行しない場合は、パイプラインを 1 回実行した後にクエリを削除します。
  • クエリを 1 回実行し、データが完全に更新されている場合にのみ再度実行する場合は、 once パラメーターを追加フローで True に設定します。 SQL では、 INSERT INTO ONCEを使用します。

次の例では、ストリーミング テーブルに履歴データを追加するクエリを実行します。

Python

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

詳細な例については、「パイプラインを使用 した履歴データのバックフィル」を参照してください。

例: 追加フロー処理ではなく、追加フロー処理を使用する UNION

UNION句でクエリを使用する代わりに、追加フロー クエリを使用して複数のソースを結合し、1 つのストリーミング テーブルに書き込むことができます。 UNIONの代わりに追加フロー クエリを使用すると、完全な更新を実行せずに、複数のソースからストリーミング テーブルに追加できます。

次の Python の例には、複数のデータ ソースと UNION 句を組み合わせたクエリが含まれています。

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

次の例では、 UNION クエリを追加フロー クエリに置き換えます。

Python

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );