Important
sink API はパブリック プレビュー段階です。
このページでは、Lakeflow Spark 宣言型パイプライン sink API と、それを フロー と共に使用して、パイプラインによって変換されたレコードを外部データ シンクに書き込む方法について説明します。 外部データ シンクには、Unity カタログのマネージド テーブルと外部テーブル、および Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスが含まれます。 データ シンクを使用して、そのデータ ソースの Python コードを記述することで、カスタム データ ソースに書き込むこともできます。
注
-
sinkAPI は Python でのみ使用できます。 - ForEachBatch API を使用してカスタム シンクを作成できます。 「ForEachBatch を使用してパイプライン内の任意のデータ シンクに書き込む」を参照してください。
シンクとは
シンクは、パイプライン内のフローのターゲットです。 既定では、パイプライン フローはストリーミング テーブルまたは具体化されたビュー ターゲットにデータを出力します。 これらはどちらも Azure Databricks マネージド Delta テーブルです。 シンクは、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスや、Unity カタログによって管理される外部テーブルなどのターゲットに変換されたデータを書き込む際に使用する代替ターゲットです。 シンクを使用すると、パイプラインの出力を保持するためのオプションが追加されました。
シンクを使用する必要がある場合
Databricks では、次の必要がある場合にシンクを使用することをお勧めします。
- 不正行為の検出、リアルタイム分析、顧客の推奨事項などの運用上のユース ケースを構築します。 運用上のユース ケースでは、通常、Apache Kafka トピックなどのメッセージ バスからデータを読み取り、待ち時間の短いデータを処理し、処理されたレコードをメッセージ バスに書き戻します。 この方法では、クラウド ストレージからの書き込みや読み取りを行わないと、待機時間を短縮できます。
- Unity カタログのマネージド テーブルや外部テーブルなど、外部 Delta インスタンスによって管理されるテーブルに、フローから変換されたデータを書き込みます。
- Databricks の外部にある Apache Kafka のトピックなどのシンクに対して、逆 ETL(Extract-Transform-Load)を実行します。 この方法を使用すると、Unity カタログ テーブルやその他の Databricks で管理されているストレージの外部でデータを読み取ったり使用したりする必要があるユース ケースを効果的にサポートできます。
- Azure Databricks で直接サポートされていないデータ形式に書き込む必要があります。 Python カスタム データ ソースを使用すると、カスタム Python コードを使用して任意のデータ ソースに書き込むシンクを作成できます。 PySpark カスタム データ ソースを参照してください。
シンクを使用する方法
イベント データがストリーミング ソースからパイプラインに取り込まれると、パイプライン内の変換でこのデータを処理および調整します。 次に、追加フロー処理を使用して、変換されたデータ レコードをシンクにストリーミングします。 このシンクは、create_sink() 関数を使用して作成します。
create_sink関数の詳細については、シンク API リファレンスを参照してください。
ストリーミング イベント データを作成または処理し、書き込み用のデータ レコードを準備するパイプラインがある場合は、シンクを使用する準備が整います。
シンクの実装は、次の 2 つの手順で構成されます。
- シンクを作成します。
- 準備されたレコードをシンクに書き込むには、追加フロー を使用します。
シンクを作成する
Databricks では、ストリーム データから処理されたレコードを書き込む複数の種類の宛先シンクがサポートされています。
- デルタ テーブル シンク (Unity カタログのマネージド テーブルと外部テーブルを含む)
- Apache Kafka シンク
- Azure Event Hubs シンク
- Python カスタム データ ソースを使用して Python で記述されたカスタム シンク
Delta、Kafka、Azure Event Hubs シンク、Python カスタム データ ソースの構成の例を次に示します。
デルタシンク
ファイル パスで差分シンクを作成するには:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
完全修飾カタログとスキーマ パスを用いて、テーブル名によりデルタシンクを作成するには:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
Kafka および Azure Event Hubs シンク
このコードは、Apache Kafka シンクと Azure Event Hubs シンクの両方で機能します。
credential_name = "<service-credential>"
eh_namespace_name = "dp-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
topic_name = "dp-sink"
dp.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"databricks.serviceCredential": credential_name,
"kafka.bootstrap.servers": bootstrap_servers,
"topic": topic_name
}
)
credential_nameは、Unity カタログ サービスの資格情報への参照です。 詳細については、「 Unity カタログ サービスの資格情報を使用して外部クラウド サービスに接続する」を参照してください。
Python カスタム データ ソース
python カスタム データ ソースが my_custom_datasource として登録されていると仮定すると、次のコードはそのデータ ソースに書き込むことができます。
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python streaming
# data source that writes data to your system.
# Create LDP sink using my_custom_datasource
dp.create_sink(
name="custom_sink",
format="my_custom_datasource",
options={
<options-needed-for-custom-datasource>
}
)
# Create append flow to send data to RequestBin
@dp.append_flow(name="flow_to_custom_sink", target="custom_sink")
def flow_to_custom_sink():
return read_stream("my_source_data")
Python でのカスタム データ ソースの作成の詳細については、「 PySpark カスタム データ ソース」を参照してください。
create_sink 関数の使用方法の詳細については、シンク API リファレンスを参照してください。
シンクが作成されたら、処理されたレコードをシンクにストリーミングし始めることができます。
追加フローを使用してシンクに書き込む
シンクを作成したら、次に、追加フローによって出力されるレコードのターゲットとして指定してから、それに処理されたレコードを書き込みます。 これを行うには、シンクを target デコレーターの append_flow 値として指定します。
- Unity カタログのマネージド テーブルと外部テーブルの場合は、
delta形式を使用し、オプションでパスまたはテーブル名を指定します。 Unity カタログを使用するようにパイプラインを構成する必要があります。 - Apache Kafka トピックの場合は、
kafka形式を使用し、オプションでトピック名、接続情報、および認証情報を指定します。 これらは、Spark Structured Streaming Kafka シンクでサポートされているのと同じオプションです。 Kafka 構造化ストリーミング ライター の構成を参照してください。 - Azure Event Hubs の場合は、
kafka形式を使用し、オプションで Event Hubs の名前、接続情報、および認証情報を指定します。 これらは、Kafka インターフェイスを使用する Spark Structured Streaming Event Hubs シンクでサポートされているのと同じオプションです。 「Microsoft Entra ID と Azure Event Hubs を使用したサービス プリンシパル認証」を参照してください。
パイプラインによって処理されたレコードを使用して Delta、Kafka、Azure Event Hubs シンクに書き込むフローを設定する方法の例を次に示します。
デルタシンク
@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Kafka および Azure Event Hubs シンク
@dp.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
Azure Event Hubs シンクには、value パラメーターが必須です。
key、partition、headers、topic などの追加パラメーターは省略可能です。
append_flowデコレーターの詳細については、「複数のフローを使用して 1 つのターゲットに書き込む」を参照してください。
制限事項
Python API のみがサポートされています。 SQL はサポートされていません。
ストリーミング クエリのみがサポートされています。 バッチ クエリはサポートされていません。
シンクへの書き込みには、
append_flowのみを使用できます。create_auto_cdc_flowなどの他のフローはサポートされていないため、パイプライン データセット定義でシンクを使用することはできません。 たとえば、以下はサポートされていません。@table("from_sink_table") def fromSink(): return read_stream("my_sink")Delta シンクの場合、テーブル名は完全修飾名にする必要があります。 具体的には、Unity カタログのマネージド外部テーブルの場合、テーブル名は
<catalog>.<schema>.<table>形式である必要があります。 Hive メタストアの場合は、<schema>.<table>形式である必要があります。完全更新 を実行しても、シンク内の以前に計算された結果データはクリーンアップされません。 つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。
パイプラインの期待値はサポートされていません。