次の方法で共有


ForEachBatch を使用してパイプライン内の任意のデータ シンクに書き込む

Important

foreach_batch_sink API はパブリック プレビュー段階です

ForEachBatch シンクを使用すると、ストリームを一連のマイクロバッチとして処理できます。 各バッチは、Apache Spark 構造化ストリーミングの foreachBatchと同様のカスタム ロジックを使用して Python で処理できます。 Lakeflow Spark 宣言パイプライン (SDP) ForEachBatch シンクを使用すると、ストリーミング 書き込みをネイティブにサポートしていない 1 つ以上のターゲットにストリーミング データを変換、マージ、または書き込むことができます。 このページでは、ForEachBatch シンクの設定について説明し、例を示し、主な考慮事項について説明します。

ForEachBatch シンクには、次の機能があります。

  • 各マイクロバッチのカスタム ロジック: ForEachBatch は柔軟なストリーミング シンクです。 Python コードを使用して、任意のアクション (外部テーブルへのマージ、複数の宛先への書き込み、アップサートの実行など) を適用できます。
  • 完全更新のサポート: パイプラインはフローごとにチェックポイントを管理するため、パイプラインの完全な更新を実行するとチェックポイントが自動的にリセットされます。 ForEachBatch シンクでは、このような場合にダウンストリーム データリセットを管理する必要があります。
  • Unity カタログのサポート: ForEachBatch シンクでは、Unity カタログのボリュームまたはテーブルの読み取りや書き込みなど、すべての Unity カタログ機能がサポートされます。
  • 限定的なハウスキーピング: パイプラインは ForEachBatch シンクから書き込まれたデータを追跡しないため、そのデータをクリーンアップできません。 ダウンストリームのデータ管理は、お客様が行う必要があります。
  • イベント ログ エントリ: パイプライン イベント ログには、各 ForEachBatch シンクの作成と使用状況が記録されます。 Python 関数がシリアル化できない場合は、イベント ログに警告エントリが表示され、追加の候補が表示されます。

  • ForEachBatch シンクは、 append_flowなどのストリーミング クエリ用に設計されています。 バッチのみのパイプラインや AutoCDC セマンティクスを対象としたものではありません。
  • このページで説明する ForEachBatch シンクはパイプライン用です。 Apache Spark Structured Streaming では、 foreachBatchもサポートされています。 構造化ストリーミング foreachBatchの詳細については、「 foreachBatch を使用して任意のデータ シンクに書き込む」を参照してください。

ForEachBatch シンクを使用するタイミング

パイプラインで、 deltakafkaなどの組み込みのシンク形式では使用できない機能が必要な場合は常に、ForEachBatch シンクを使用します。 一般的なユース ケースは次のとおりです。

  • Delta Lake テーブルへのマージまたはアップサート: マイクロバッチごとにカスタム マージ ロジックを実行します (たとえば、更新されたレコードの処理)。
  • 複数またはサポートされていない宛先への書き込み: ストリーミング書き込みをサポートしていない複数のテーブルまたは外部ストレージ システム (特定の JDBC シンクなど) に各バッチの出力を書き込みます。
  • カスタム ロジックまたは変換の適用: Python でデータを直接操作します (たとえば、特殊なライブラリや高度な変換を使用)。

組み込みのシンクの詳細、または Python を使用したカスタム シンクの作成については、「 Lakeflow Spark 宣言型パイプラインのシンク」を参照してください。

構文

@dp.foreach_batch_sink()装飾を使用して、ForEachBatch シンクを生成します。 その後、これをフロー定義の target として参照できます (たとえば、 @dp.append_flow)。

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.
パラメーター Description
name Optional. パイプライン内のシンクを識別する一意の名前。 含まれていない場合は、自動的に UDF の名前が既定として使用されます。
batch_handler これは、マイクロバッチごとに呼び出されるユーザー定義関数 (UDF) です。
df 現在のマイクロバッチのデータを含む Spark DataFrame。
batch_id マイクロバッチの整数 ID。 Spark は、トリガー間隔ごとにこの ID をインクリメントします。
batch_id0は、ストリームの開始または完全な更新の開始を表します。 foreach_batch_sink コードは、ダウンストリーム データ ソースの完全な更新を適切に処理する必要があります。 詳細については、次のセクションを参照してください。

完全更新

ForEachBatch はストリーミング クエリを使用するため、パイプラインは各フローのチェックポイント ディレクトリを追跡します。 完全更新時:

  • チェックポイント ディレクトリがリセットされます。
  • シンク関数 (foreach_batch_sink UDF) には、0 から始まるまったく新しい batch_id サイクルが表示されます。
  • ターゲット システム内のデータは、パイプラインによって自動的にクリーンアップ されません (データが書き込まれる場所がパイプラインで認識されないため)。 クリーン スレート シナリオが必要な場合は、ForEachBatch シンクが設定する外部テーブルまたは場所を手動で削除または切り捨てる必要があります。

Unity カタログ機能の使用

Spark Structured Streaming foreach_batch_sink の既存の Unity カタログ機能はすべて引き続き使用できます。

これには、マネージドまたは外部の Unity カタログ テーブルへの書き込みが含まれます。 マイクロバッチは、Apache Spark Structured Streaming ジョブの場合とまったく同じように、Unity カタログのマネージド テーブルまたは外部テーブルに書き込むことができます。

イベント ログ エントリ

ForEachBatch シンクを作成すると、SinkDefinitionを含む"format": "foreachBatch" イベントがパイプラインのイベント ログに追加されます。

これにより、ForEachBatch シンクの使用状況を追跡し、シンクに関する警告を確認できます。

Databricks Connect での使用

指定した関数が シリアル化できない 場合 (Databricks Connect の重要な要件)、イベント ログには、Databricks Connect のサポートが必要な場合にコードを簡略化またはリファクタリングすることを推奨する WARN エントリが含まれます。

たとえば、 dbutils を使用して ForEachBatch UDF 内のパラメーターを取得する場合は、代わりに引数を取得してから UDF で使用できます。

# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
  value = dbutils.widgets.get ("X") + str (i)

# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")

def foreach_batch(df, batchId):
  value = argX + str (i)

ベスト プラクティス

  1. ForEachBatch 関数を簡潔に保つ: スレッド処理、大量のライブラリ依存関係、または大規模なメモリ内データ操作を回避します。 複雑またはステートフルなロジックは、シリアル化エラーやパフォーマンスのボトルネックにつながる可能性があります。
  2. チェックポイント フォルダーを監視する: ストリーミング クエリの場合、SDP はシンクではなくフローによってチェックポイントを管理します。 パイプラインに複数のフローがある場合、各フローには独自のチェックポイント ディレクトリがあります。
  3. 外部依存関係を検証する: 外部システムまたはライブラリに依存している場合は、それらがすべてのクラスター ノードまたはコンテナーにインストールされていることを確認します。
  4. Databricks Connect に注意してください。環境が将来 Databricks Connect に移行する可能性がある場合は、コードがシリアル化可能であり、dbutils UDF 内のforeach_batch_sinkに依存していないことを確認してください。

制限事項

  • ForEachBatch のハウスキーピングなし: カスタム Python コードはどこでもデータを書き込む可能性があるため、パイプラインではそのデータをクリーンアップしたり追跡したりできません。 書き込む宛先の独自のデータ管理または保持ポリシーを処理する必要があります。
  • マイクロバッチのメトリック: パイプラインはストリーミング メトリックを収集しますが、一部のシナリオでは、ForEachBatch を使用すると不完全または異常なメトリックが発生する可能性があります。 これは、ForEachBatch の基礎となる柔軟性により、データ フローと行の追跡がシステムで困難になります。
  • 複数の読み取りなしで複数の宛先への書き込みをサポートする: 一部のお客様は、ForEachBatch を使用してソースから 1 回読み取った後、複数の宛先に書き込む場合があります。 これを実現するには、ForEachBatch 関数内に df.persist または df.cache を含める必要があります。 これらのオプションを使用すると、Azure Databricks は 1 回だけデータの準備を試みます。 これらのオプションがないと、クエリは複数の読み取りになります。 これは、次のコード例には含まれていません。
  • Databricks Connect での使用: パイプラインが Databricks Connect で実行されている場合、 foreachBatch ユーザー定義関数 (UDF) はシリアル化可能である必要があり、 dbutilsを使用できません。 シリアル化できない UDF が検出された場合、パイプラインは警告を生成しますが、パイプラインは失敗しません。
  • シリアル化できないロジック: ローカル オブジェクト、クラス、または選択できないリソースを参照するコードは、Databricks Connect コンテキストで中断する可能性があります。 純粋な Python モジュールを使用し、Databricks Connect が要件である場合は参照 ( dbutils など) が使用されていないことを確認します。

例示

基本構文例

from pyspark import pipelines as dp

# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
  # Custom logic here. You can perform merges,
  # write to multiple destinations, etc.
  return

# Create source data for example:
@dp.table()
def example_source_data():
  return spark.range(5)

# Add sink to an append flow:
@dp.append_flow(
    target="my_foreachbatch_sink",
)
def my_flow():
  return spark.readStream.format("delta").table("example_source_data")

単純なパイプラインにサンプル データを使用する

この例では、NYC タクシーのサンプルを使用します。 ワークスペース管理者が Databricks パブリック データセット カタログを有効にしていることを前提としています。 シンクの場合は、 my_catalog.my_schema を、アクセス権を持つカタログとスキーマに変更します。

from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp

# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
    # Custom logic here. You can perform merges,
    # write to multiple destinations, etc.
    # For this example, we are adding a timestamp column.
    enriched = df.withColumn("processed_timestamp", current_timestamp())
    # Write to a Delta location
    enriched.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("my_catalog.my_schema.trips_sink_delta")
    # Return is optional here, but generally not used for the sink
    return

# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
    target="my_foreach_sink",
)
def taxi_source():
  df = spark.readStream.table("samples.nyctaxi.trips")
  return df

複数の宛先への書き込み

この例では、複数の宛先に書き込みます。 txnVersiontxnAppIdを使用して Delta Lake テーブルに冪等性を持つ書き込みを行う方法を示します。 詳細については、foreachBatchでのべき等テーブルの書き込みを参照してください。

table_atable_bの 2 つのテーブルに書き込んでいるとします。バッチ内では、table_aへの書き込みが失敗する間、table_bへの書き込みが成功するとします。 バッチを再実行すると、(txnVersiontxnAppId) ペアを使用すると、Delta は table_aへの重複する書き込みを無視し、バッチを table_bにのみ書き込みます。

from pyspark import pipelines as dp

app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId

# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
    # Optionally do transformations, logging, or merging logic
    # ...

    # Write to a Delta table
    df.write \
     .format("delta") \
     .mode("append") \
     .option("txnVersion", batch_id) \
     .option("txnAppId", app_id) \
     .saveAsTable("my_catalog.my_schema.example_table_1")

    # Also write to a JSON file location
    df.write \
      .format("json") \
      .mode("append") \
      .option("txnVersion", batch_id) \
      .option("txnAppId", app_id) \
      .save("/tmp/json_target")
    return

# Create source data for example
@dp.table()
def example_source():
  return spark.range(5)


# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
    return spark.readStream.format("delta").table("example_source")

spark.sql() の使用

次の例のように、forEachBatch シンクで spark.sql() を使用できます。

from pyspark import pipelines as dp
from pyspark.sql import Row

@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
  df.createOrReplaceTempView("df_view")
  df.sparkSession.sql("MERGE INTO target_table AS tgt " +
            "USING df_view AS src ON tgt.id = src.id " +
            "WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
            "WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
          )
  return

# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")

# Create source table
@dp.table()
def src_table():
  return spark.range(5)

@dp.append_flow(
    target="example_sink",
)
def example_flow():
  return spark.readStream.format("delta").table("source_table")

よく寄せられる質問 (FAQ)

ForEachBatch シンクで dbutils を使用できますか?

Databricks Connect 以外の環境でパイプラインを実行する予定がある場合は、 dbutils が機能する可能性があります。 ただし、Databricks Connect を使用する場合、 dbutilsforeachBatch 関数内ではアクセスできません。 パイプラインは、破損を回避するために dbutils 使用状況を検出した場合に警告を発生させる可能性があります。

1 つの ForEachBatch シンクで複数のフローを使用できますか?

Yes. 同じシンク名をターゲットとする複数のフロー ( @dp.append_flow) を定義できますが、それぞれが独自のチェックポイントを保持します。

パイプラインはターゲットのデータ保持またはクリーンアップを処理しますか?

No. ForEachBatch シンクは任意の場所またはシステムに書き込むことができるため、パイプラインでは、そのターゲット内のデータを自動的に管理または削除することはできません。 これらの操作は、カスタム コードまたは外部プロセスの一部として処理する必要があります。

ForEachBatch 関数でのシリアル化エラーまたは失敗をトラブルシューティングする方法

クラスター ドライバー ログまたはパイプライン イベント ログを確認します。 Spark Connect 関連のシリアル化の問題の場合は、関数がシリアル化可能な Python オブジェクトにのみ依存し、許可されていないオブジェクト (開いているファイル ハンドルや dbutilsなど) を参照していないことを確認します。