この記事では、パイプライン間でストリーミング テーブルと具体化されたビューを移動する方法について説明します。 移動後は、フローを移動した先のパイプラインによってテーブルが更新され、元のパイプラインでは更新されません。 これは、次のような多くのシナリオで役立ちます。
- 大きなパイプラインをより小さなパイプラインに分割します。
- 複数のパイプラインを 1 つの大きなパイプラインにマージします。
- パイプライン内の一部のテーブルの更新頻度を変更します。
- レガシ発行モードを使用するパイプラインから既定の発行モードにテーブルを移動します。 従来の発行モードの詳細については、パイプラインの従来の発行モードに関するページを参照してください。 パイプライン全体の発行モードを一度に移行する方法については、「パイプライン で既定の発行モードを有効にする」を参照してください。
- 異なるワークスペース内のパイプライン間でテーブルを移動します。
Requirements
パイプライン間でテーブルを移動するための要件を次に示します。
ALTER ...コマンドを実行するときは Databricks Runtime 16.3 以降を使用し、ワークスペース間のテーブル移動には Databricks Runtime 17.2 を使用する必要があります。ソースパイプラインと宛先パイプラインはどちらも次の要件を満たさなければなりません。
- 操作を実行している Azure Databricks ユーザー アカウントまたはサービス プリンシパルによって所有されている
- メタストアを共有するワークスペース内。 メタストアを確認するには、
current_metastore関数を参照してください。
宛先パイプラインでは、既定の発行モードを使用する必要があります。 これにより、複数のカタログとスキーマにテーブルを発行できます。
または、両方のパイプラインで従来の発行モードを使用する必要があり、両方とも設定で同じカタログ値とターゲット値を持っている必要があります。 従来の発行モードの詳細については、 LIVE スキーマ (レガシ) を参照してください。
注
この機能では、既定の発行モードを使用するパイプラインを、従来の発行モードを使用したパイプラインへの移動はサポートされていません。
パイプライン間でテーブルを移動する
次の手順では、ストリーミング テーブルまたは具体化されたビューをパイプライン間で移動する方法について説明します。
ソースパイプラインが実行中の場合は停止します。 完全に停止するまで待ちます。
ソース パイプラインのコードからテーブルの定義を削除し、後で参照するためにどこかに格納します。
パイプラインを正しく実行するために必要なサポート クエリまたはコードを含めます。
ノートブックまたは SQL エディターから、次の SQL コマンドを実行して、ソース パイプラインからコピー先パイプラインにテーブルを再割り当てします。
ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name> SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");SQL コマンドは、ソース パイプラインのワークスペースから実行する必要があることに注意してください。
このコマンドでは、Unity Catalog マネージド マテリアライズド ビューとストリーミング テーブルに対してそれぞれ
ALTER MATERIALIZED VIEWとALTER STREAMING TABLEが使用されます。 Hive メタストア テーブルに対して同じアクションを実行するには、ALTER TABLEを使用します。たとえば、
salesという名前のストリーミング テーブルを IDabcd1234-ef56-ab78-cd90-1234efab5678を持つパイプラインに移動する場合は、次のコマンドを実行します。ALTER STREAMING TABLE sales SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");注
pipelineIdは有効なパイプライン識別子である必要があります。null値は使用できません。テーブルの定義を宛先パイプラインのコードに追加します。
注
ソースと宛先の間でカタログまたはターゲット スキーマが異なる場合、クエリを正確にコピーできない可能性があります。 定義内の部分的に修飾されたテーブルは、異なる方法で解決できます。 テーブル名を完全に修飾するには、移動中に定義の更新が必要になる場合があります。
注
コピー先パイプラインのコードから、追加の 1 回のフロー (Python では 、append_flow (once=True)、SQL では INSERT INTO ONCE を使用するクエリ) を削除またはコメント アウトします。 詳細については、「 制限事項」を参照してください。
移動が完了しました。 これで、ソース パイプラインと宛先パイプラインの両方を実行できるようになりました。 送信先パイプラインによってテーブルが更新されます。
トラブルシューティング
次の表では、パイプライン間でテーブルを移動するときに発生する可能性があるエラーについて説明します。
| エラー | Description |
|---|---|
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE |
ソース パイプラインは既定の発行モードであり、宛先は LIVE スキーマ (レガシ) モードを使用します。 これはサポートされていません。 ソースが既定の発行モードを使用する場合は、移行先も必要です。 |
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE |
パイプライン間でのテーブルの移動のみがサポートされています。 Databricks SQL で作成されたストリーミング テーブルと具体化されたビューの移動はサポートされていません。 |
DESTINATION_PIPELINE_NOT_FOUND |
pipelines.pipelineId は有効なパイプラインである必要があります。
pipelineId を null にすることはできません。 |
| 移動後、移動先でテーブルの更新に失敗します。 | この場合をすばやく軽減するには、同じ手順に従ってテーブルをソース パイプラインに戻します。 |
PIPELINE_PERMISSION_DENIED_NOT_OWNER |
移動操作を実行するユーザーは、ソース パイプラインとターゲット パイプラインの両方を所有している必要があります。 |
TABLE_ALREADY_EXISTS |
エラー メッセージに一覧表示されているテーブルは既に存在します。 これは、パイプラインのバッキング テーブルが既に存在する場合に発生する可能性があります。 この場合は、エラーに一覧表示されているテーブルを DROP します。 |
パイプライン内の複数のテーブルの例
パイプラインには、複数のテーブルを含めることができます。 パイプライン間で一度に 1 つのテーブルを移動することもできます。 このシナリオでは、ソース パイプライン内で相互に順番に読み取る 3 つのテーブル (table_a、 table_b、 table_c) があります。 1 つのテーブル ( table_b) を別のパイプラインに移動します。
最初のソース パイプライン コード:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Table to be moved to new pipeline:
@dp.table
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
table_bを別のパイプラインに移動するには、ソースからテーブル定義をコピーして削除し、table_bの pipelineId を更新します。
まず、スケジュールを一時停止し、ソース パイプラインとターゲット パイプラインの両方で更新が完了するまで待ちます。 次に、移動するテーブルのコードを削除するようにソース パイプラインを変更します。 更新されたソース パイプラインのサンプル コードは次のようになります。
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Removed, to be in new pipeline:
# @dp.table
# def table_b():
# return (
# spark.read.table("table_a")
# .select(col("column1"), col("column2"))
# )
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
SQL エディターに移動して、 ALTER pipelineId コマンドを実行します。
ALTER MATERIALIZED VIEW table_b
SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");
次に、宛先パイプラインに移動し、 table_bの定義を追加します。 既定のカタログとスキーマがパイプライン設定で同じ場合、コードの変更は必要ありません。
ターゲット パイプライン コード:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="table_b")
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
既定のカタログとスキーマがパイプライン設定で異なる場合は、パイプラインのカタログとスキーマを使用して完全修飾名を追加する必要があります。
たとえば、ターゲット パイプライン コードは次のようになります。
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
return (
spark.read.table("source_catalog.source_schema.table_a")
.select(col("column1"), col("column2"))
)
ソース パイプラインとターゲット パイプラインの両方に対してスケジュールを実行 (または再有効化) します。
パイプラインが不整合になりました。 ターゲットパイプライン内の table_cから読み取ったものと、table_b ソースパイプライン内の table_b から読み取った table_a のクエリ。 ソース パイプラインでトリガーされた実行を実行すると、ソース パイプラインによって管理されなくなったため、 table_b は更新されません。 ソース パイプラインは、 table_b をパイプラインの外部のテーブルとして扱います。 これは、パイプラインで管理されていない Unity カタログの Delta テーブルからの具体化されたビューの読み取りを定義することに相当します。
制限事項
パイプライン間でテーブルを移動する場合の制限事項を次に示します。
- Databricks SQL で作成された具体化されたビューとストリーミング テーブルはサポートされていません。
- 追加 1 回のフロー (Python append_flow(once=True) フローと SQL INSERT INTO ONCE フロー) はサポートされていません。 実行状態は保持されず、送信先パイプラインで再度実行される可能性があります。 宛先パイプラインから一度だけ追加されるフローを削除またはコメントアウトして、再実行を防ぐためにします。
- プライベート テーブルまたはビューはサポートされていません。
- ソース パイプラインと宛先パイプラインはパイプラインである必要があります。 Null パイプラインはサポートされていません。
- ソースと宛先のパイプラインは、同じワークスペース内にあるか、同じメタストアを共有する異なるワークスペース内にある必要があります。
- 移動操作を実行しているユーザーは、ソース パイプラインとターゲット パイプラインの両方を所有している必要があります。
- ソース パイプラインで既定の発行モードを使用する場合は、ターゲット パイプラインも既定の発行モードを使用している必要があります。 既定の発行モードを使用するパイプラインから、LIVE スキーマ (レガシ) を使用するパイプラインにテーブルを移動することはできません。 LIVE スキーマ (レガシー)を参照してください。
- ソース パイプラインと宛先パイプラインの両方が LIVE スキーマ (レガシ) を使用している場合は、設定で同じ
catalog値とtarget値を持つ必要があります。