次の方法で共有


パイプラインを使用して履歴データを埋め戻す

データ エンジニアリングでは、 バックフィル とは、現在のデータまたはストリーミング データを処理するように設計されたデータ パイプラインを通じて履歴データをさかのぼって処理するプロセスを指します。

通常、これは既存のテーブルにデータを送信する別のフローです。 次の図は、パイプライン内のブロンズ テーブルに履歴データを送信するバックフィル フローを示しています。

履歴データを既存のワークフローに追加するバックフィル フロー

バックフィルが必要なシナリオには、次のような場合があります。

  • 従来のシステムからの履歴データを処理して、機械学習 (ML) モデルをトレーニングするか、履歴傾向分析ダッシュボードを作成します。
  • アップストリーム データ ソースのデータ品質の問題が原因で、データのサブセットを再処理します。
  • ビジネス要件が変更され、最初のパイプラインでカバーされていない別の期間のデータをバックフィルする必要があります。
  • ビジネス ロジックが変更され、履歴データと現在のデータの両方を再処理する必要があります。

Lakeflow Spark 宣言パイプラインのバックフィルは、 ONCE オプションを使用する特殊な追加フローでサポートされています。 オプションの詳細については、append_flowまたは ONCE を参照してください。

履歴データをストリーミング テーブルにバックフィルするときの考慮事項

  • 通常は、ブロンズ ストリーミング テーブルにデータを追加します。 ダウンストリームのシルバーレイヤーとゴールドレイヤーは、ブロンズレイヤーから新しいデータを取得します。
  • 同じデータが複数回追加された場合に備え、パイプラインで重複データを適切に処理できることを確認します。
  • 履歴データ スキーマが現在のデータ スキーマと互換性があることを確認します。
  • データ ボリュームのサイズと必要な処理時間の SLA を考慮し、それに応じてクラスターとバッチ サイズを構成します。

例: 既存のパイプラインへのバックフィルの追加

この例では、2025 年 1 月 1 日から、クラウド ストレージ ソースから生のイベント登録データを取り込むパイプラインがあるとします。 その後、ダウンストリームレポートと分析のユースケースのために、過去 3 年間の履歴データをバックフィルする必要があることに気付きます。 すべてのデータは、年、月、日ごとに JSON 形式でパーティション分割された 1 つの場所にあります。

初期パイプライン

クラウド ストレージから生イベント登録データを増分的に取り込む開始パイプライン コードを次に示します。

Python

from pyspark import pipelines as dp

source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
incremental_load_path = f"{source_root_path}/*/*/*"

# create a streaming table and the default flow to ingest streaming events
@dp.table(name="registration_events_raw", comment="Raw registration events")
def ingest():
    return (
        spark
        .readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.maxFilesPerTrigger", 100)
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("modifiedAfter", "2025-01-01T00:00:00.000+00:00")
        .load(incremental_load_path)
        .where(f"year(timestamp) >= {begin_year}") # safeguard to not process data before begin_year
    )

SQL

-- create a streaming table and the default flow to ingest streaming events
CREATE OR REFRESH STREAMING LIVE TABLE registration_events_raw AS
SELECT * FROM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/*/*/*",
  format => "json",
  inferColumnTypes => true,
  maxFilesPerTrigger => 100,
  schemaEvolutionMode => "addNewColumns",
  modifiedAfter => "2024-12-31T23:59:59.999+00:00"
)
WHERE year(timestamp) >= '2025'; -- safeguard to not process data before begin_year

ここでは、 modifiedAfter 自動ローダー オプションを使用して、クラウド ストレージ パスのすべてのデータが処理されていないことを確認します。 増分処理は、その境界でカットオフされます。

ヒント

Kafka、Kinesis、Azure Event Hubs などの他のデータ ソースには、同じ動作を実現するための同等のリーダー オプションがあります。

過去 3 年間のデータをバックフィルする

次に、1 つ以上のフローを追加して、前のデータをバックフィルします。 この例では、次の手順を実行します。

  • append once フローを使用します。 1 回限りのバックフィルが実行され、その後は実行しません。 コードはパイプラインに残り、パイプラインが完全に更新された場合は、バックフィルが再実行されます。
  • 年ごとに 1 つずつ、3 つのバックフィル フローを作成します (この場合、データはパス内で年単位で分割されます)。 Python の場合、フローの作成をパラメーター化しますが、SQL では、フローごとに 1 回、コードを 3 回繰り返します。

独自のプロジェクトで作業していて、サーバーレス コンピューティングを使用していない場合は、パイプラインの最大ワーカー数を更新できます。 最大ワーカー数を増やすと、予想される SLA 内で現在のストリーミング データを引き続き処理しながら履歴データを処理するためのリソースが確保されます。

ヒント

自動スケールが強化されたサーバーレス コンピューティング (既定) を使用する場合、負荷が増加すると、クラスターのサイズが自動的に増加します。

Python

from pyspark import pipelines as dp

source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
backfill_years = spark.conf.get("backfill_years") # e.g. "2024,2023,2022"
incremental_load_path = f"{source_root_path}/*/*/*"

# meta programming to create append once flow for a given year (called later)
def setup_backfill_flow(year):
    backfill_path = f"{source_root_path}/year={year}/*/*"
    @dp.append_flow(
        target="registration_events_raw",
        once=True,
        name=f"flow_registration_events_raw_backfill_{year}",
        comment=f"Backfill {year} Raw registration events")
    def backfill():
        return (
            spark
            .read
            .format("json")
            .option("inferSchema", "true")
            .load(backfill_path)
        )

# create the streaming table
dp.create_streaming_table(name="registration_events_raw", comment="Raw registration events")

# append the original incremental, streaming flow
@dp.append_flow(
        target="registration_events_raw",
        name="flow_registration_events_raw_incremental",
        comment="Raw registration events")
def ingest():
    return (
        spark
        .readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.maxFilesPerTrigger", 100)
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("modifiedAfter", "2024-12-31T23:59:59.999+00:00")
        .load(incremental_load_path)
        .where(f"year(timestamp) >= {begin_year}")
    )

# parallelize one time multi years backfill for faster processing
# split backfill_years into array
for year in backfill_years.split(","):
    setup_backfill_flow(year) # call the previously defined append_flow for each year

SQL

-- create the streaming table
CREATE OR REFRESH STREAMING TABLE registration_events_raw;

-- append the original incremental, streaming flow
CREATE FLOW
  registration_events_raw_incremental
AS INSERT INTO
  registration_events_raw BY NAME
SELECT * FROM STREAM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/*/*/*",
  format => "json",
  inferColumnTypes => true,
  maxFilesPerTrigger => 100,
  schemaEvolutionMode => "addNewColumns",
  modifiedAfter => "2024-12-31T23:59:59.999+00:00"
)
WHERE year(timestamp) >= '2025';


-- one time backfill 2024
CREATE FLOW
  registration_events_raw_backfill_2024
AS INSERT INTO ONCE
  registration_events_raw BY NAME
SELECT * FROM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/year=2024/*/*",
  format => "json",
  inferColumnTypes => true
);

-- one time backfill 2023
CREATE FLOW
  registration_events_raw_backfill_2023
AS INSERT INTO ONCE
  registration_events_raw BY NAME
SELECT * FROM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/year=2023/*/*",
  format => "json",
  inferColumnTypes => true
);

-- one time backfill 2022
CREATE FLOW
  registration_events_raw_backfill_2022
AS INSERT INTO ONCE
  registration_events_raw BY NAME
SELECT * FROM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/year=2022/*/*",
  format => "json",
  inferColumnTypes => true
);

この実装では、いくつかの重要なパターンが強調表示されています。

懸念事項の分離

  • 増分処理は、バックフィル操作とは無関係です。
  • 各フローには、独自の構成と最適化の設定があります。
  • 増分操作とバックフィル操作には明確な違いがあります。

制御された実行

  • ONCE オプションを使用すると、各バックフィルが 1 回だけ実行されます。
  • バックフィル フローはパイプライン グラフに残りますが、完了するとアイドル状態になります。 完全更新時に自動的に使用する準備が整いました。
  • パイプライン定義には、バックフィル操作の明確な監査証跡があります。

処理の最適化

  • 処理を高速化したり、処理を制御したりするために、大きなバックフィルを複数の小さなバックフィルに分割できます。
  • 拡張自動スケーリングを使用すると、現在のクラスターの負荷に基づいてクラスター サイズが動的にスケーリングされます。

スキーマの進化

  • schemaEvolutionMode="addNewColumns"を使用すると、スキーマの変更が適切に処理されます。
  • 履歴データと現在のデータ間で一貫したスキーマ推論が行われます。
  • 新しいデータでは、新しい列が安全に処理されます。

その他のリソース