データ エンジニアリングでは、 バックフィル とは、現在のデータまたはストリーミング データを処理するように設計されたデータ パイプラインを通じて履歴データをさかのぼって処理するプロセスを指します。
通常、これは既存のテーブルにデータを送信する別のフローです。 次の図は、パイプライン内のブロンズ テーブルに履歴データを送信するバックフィル フローを示しています。
バックフィルが必要なシナリオには、次のような場合があります。
- 従来のシステムからの履歴データを処理して、機械学習 (ML) モデルをトレーニングするか、履歴傾向分析ダッシュボードを作成します。
- アップストリーム データ ソースのデータ品質の問題が原因で、データのサブセットを再処理します。
- ビジネス要件が変更され、最初のパイプラインでカバーされていない別の期間のデータをバックフィルする必要があります。
- ビジネス ロジックが変更され、履歴データと現在のデータの両方を再処理する必要があります。
Lakeflow Spark 宣言パイプラインのバックフィルは、 ONCE オプションを使用する特殊な追加フローでサポートされています。
オプションの詳細については、append_flowまたは ONCE を参照してください。
履歴データをストリーミング テーブルにバックフィルするときの考慮事項
- 通常は、ブロンズ ストリーミング テーブルにデータを追加します。 ダウンストリームのシルバーレイヤーとゴールドレイヤーは、ブロンズレイヤーから新しいデータを取得します。
- 同じデータが複数回追加された場合に備え、パイプラインで重複データを適切に処理できることを確認します。
- 履歴データ スキーマが現在のデータ スキーマと互換性があることを確認します。
- データ ボリュームのサイズと必要な処理時間の SLA を考慮し、それに応じてクラスターとバッチ サイズを構成します。
例: 既存のパイプラインへのバックフィルの追加
この例では、2025 年 1 月 1 日から、クラウド ストレージ ソースから生のイベント登録データを取り込むパイプラインがあるとします。 その後、ダウンストリームレポートと分析のユースケースのために、過去 3 年間の履歴データをバックフィルする必要があることに気付きます。 すべてのデータは、年、月、日ごとに JSON 形式でパーティション分割された 1 つの場所にあります。
初期パイプライン
クラウド ストレージから生イベント登録データを増分的に取り込む開始パイプライン コードを次に示します。
Python
from pyspark import pipelines as dp
source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
incremental_load_path = f"{source_root_path}/*/*/*"
# create a streaming table and the default flow to ingest streaming events
@dp.table(name="registration_events_raw", comment="Raw registration events")
def ingest():
return (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.maxFilesPerTrigger", 100)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("modifiedAfter", "2025-01-01T00:00:00.000+00:00")
.load(incremental_load_path)
.where(f"year(timestamp) >= {begin_year}") # safeguard to not process data before begin_year
)
SQL
-- create a streaming table and the default flow to ingest streaming events
CREATE OR REFRESH STREAMING LIVE TABLE registration_events_raw AS
SELECT * FROM read_files(
"/Volumes/gc/demo/apps_raw/event_registration/*/*/*",
format => "json",
inferColumnTypes => true,
maxFilesPerTrigger => 100,
schemaEvolutionMode => "addNewColumns",
modifiedAfter => "2024-12-31T23:59:59.999+00:00"
)
WHERE year(timestamp) >= '2025'; -- safeguard to not process data before begin_year
ここでは、 modifiedAfter 自動ローダー オプションを使用して、クラウド ストレージ パスのすべてのデータが処理されていないことを確認します。 増分処理は、その境界でカットオフされます。
ヒント
Kafka、Kinesis、Azure Event Hubs などの他のデータ ソースには、同じ動作を実現するための同等のリーダー オプションがあります。
過去 3 年間のデータをバックフィルする
次に、1 つ以上のフローを追加して、前のデータをバックフィルします。 この例では、次の手順を実行します。
-
append onceフローを使用します。 1 回限りのバックフィルが実行され、その後は実行しません。 コードはパイプラインに残り、パイプラインが完全に更新された場合は、バックフィルが再実行されます。 - 年ごとに 1 つずつ、3 つのバックフィル フローを作成します (この場合、データはパス内で年単位で分割されます)。 Python の場合、フローの作成をパラメーター化しますが、SQL では、フローごとに 1 回、コードを 3 回繰り返します。
独自のプロジェクトで作業していて、サーバーレス コンピューティングを使用していない場合は、パイプラインの最大ワーカー数を更新できます。 最大ワーカー数を増やすと、予想される SLA 内で現在のストリーミング データを引き続き処理しながら履歴データを処理するためのリソースが確保されます。
ヒント
自動スケールが強化されたサーバーレス コンピューティング (既定) を使用する場合、負荷が増加すると、クラスターのサイズが自動的に増加します。
Python
from pyspark import pipelines as dp
source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
backfill_years = spark.conf.get("backfill_years") # e.g. "2024,2023,2022"
incremental_load_path = f"{source_root_path}/*/*/*"
# meta programming to create append once flow for a given year (called later)
def setup_backfill_flow(year):
backfill_path = f"{source_root_path}/year={year}/*/*"
@dp.append_flow(
target="registration_events_raw",
once=True,
name=f"flow_registration_events_raw_backfill_{year}",
comment=f"Backfill {year} Raw registration events")
def backfill():
return (
spark
.read
.format("json")
.option("inferSchema", "true")
.load(backfill_path)
)
# create the streaming table
dp.create_streaming_table(name="registration_events_raw", comment="Raw registration events")
# append the original incremental, streaming flow
@dp.append_flow(
target="registration_events_raw",
name="flow_registration_events_raw_incremental",
comment="Raw registration events")
def ingest():
return (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.maxFilesPerTrigger", 100)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("modifiedAfter", "2024-12-31T23:59:59.999+00:00")
.load(incremental_load_path)
.where(f"year(timestamp) >= {begin_year}")
)
# parallelize one time multi years backfill for faster processing
# split backfill_years into array
for year in backfill_years.split(","):
setup_backfill_flow(year) # call the previously defined append_flow for each year
SQL
-- create the streaming table
CREATE OR REFRESH STREAMING TABLE registration_events_raw;
-- append the original incremental, streaming flow
CREATE FLOW
registration_events_raw_incremental
AS INSERT INTO
registration_events_raw BY NAME
SELECT * FROM STREAM read_files(
"/Volumes/gc/demo/apps_raw/event_registration/*/*/*",
format => "json",
inferColumnTypes => true,
maxFilesPerTrigger => 100,
schemaEvolutionMode => "addNewColumns",
modifiedAfter => "2024-12-31T23:59:59.999+00:00"
)
WHERE year(timestamp) >= '2025';
-- one time backfill 2024
CREATE FLOW
registration_events_raw_backfill_2024
AS INSERT INTO ONCE
registration_events_raw BY NAME
SELECT * FROM read_files(
"/Volumes/gc/demo/apps_raw/event_registration/year=2024/*/*",
format => "json",
inferColumnTypes => true
);
-- one time backfill 2023
CREATE FLOW
registration_events_raw_backfill_2023
AS INSERT INTO ONCE
registration_events_raw BY NAME
SELECT * FROM read_files(
"/Volumes/gc/demo/apps_raw/event_registration/year=2023/*/*",
format => "json",
inferColumnTypes => true
);
-- one time backfill 2022
CREATE FLOW
registration_events_raw_backfill_2022
AS INSERT INTO ONCE
registration_events_raw BY NAME
SELECT * FROM read_files(
"/Volumes/gc/demo/apps_raw/event_registration/year=2022/*/*",
format => "json",
inferColumnTypes => true
);
この実装では、いくつかの重要なパターンが強調表示されています。
懸念事項の分離
- 増分処理は、バックフィル操作とは無関係です。
- 各フローには、独自の構成と最適化の設定があります。
- 増分操作とバックフィル操作には明確な違いがあります。
制御された実行
-
ONCEオプションを使用すると、各バックフィルが 1 回だけ実行されます。 - バックフィル フローはパイプライン グラフに残りますが、完了するとアイドル状態になります。 完全更新時に自動的に使用する準備が整いました。
- パイプライン定義には、バックフィル操作の明確な監査証跡があります。
処理の最適化
- 処理を高速化したり、処理を制御したりするために、大きなバックフィルを複数の小さなバックフィルに分割できます。
- 拡張自動スケーリングを使用すると、現在のクラスターの負荷に基づいてクラスター サイズが動的にスケーリングされます。
スキーマの進化
-
schemaEvolutionMode="addNewColumns"を使用すると、スキーマの変更が適切に処理されます。 - 履歴データと現在のデータ間で一貫したスキーマ推論が行われます。
- 新しいデータでは、新しい列が安全に処理されます。