次の方法で共有


ストリーミング チェックポイントエラーからパイプラインを復旧する

このページでは、ストリーミング チェックポイントが無効になったり破損したりしたときに、Lakeflow Spark 宣言パイプラインでパイプラインを復旧する方法について説明します。

ストリーミング チェックポイントとは

Apache Spark 構造化ストリーミングでは、チェックポイントはストリーミング クエリの状態を保持するために使用されるメカニズムです。 この状態には、次のものが含まれます。

  • 進行状況情報: ソースからのオフセットが処理されました。
  • 中間状態: ステートフル操作 (集計、 mapGroupsWithStateなど) のためにマイクロ バッチ間で保持する必要があるデータ。
  • メタデータ: ストリーミング クエリの実行に関する情報。

チェックポイントは、ストリーミング アプリケーションでフォールト トレランスとデータの一貫性を確保するために不可欠です。

  • フォールト トレランス: ストリーミング アプリケーションが失敗した場合 (たとえば、ノードの障害、アプリケーションのクラッシュなど)、チェックポイントを使用すると、最初からすべてのデータを再処理するのではなく、最後に正常に完了したチェックポイント状態からアプリケーションを再起動できます。 これにより、データが失われなくなり、増分処理が保証されます。
  • 厳密に 1 回の処理: 多くのストリーミング ソースの場合、チェックポイントはべき等シンクと組み合わせて、正確に 1 回の処理を有効にすると、障害が発生した場合でも各レコードが 1 回だけ処理され、重複や省略が防止されます。
  • 状態管理: ステートフル変換の場合、チェックポイントはこれらの操作の内部状態を保持するため、ストリーミング クエリは、蓄積された履歴状態に基づいて新しいデータの処理を正しく続行できます。

パイプライン チェックポイント

パイプラインは構造化ストリーミング上に構築され、基になるチェックポイント管理の大部分を抽象化し、宣言型アプローチを提供します。 パイプラインでストリーミング テーブルを定義すると、ストリーミング テーブルに書き込むフローごとにチェックポイント状態が発生します。 これらのチェックポイントの場所はパイプラインの内部にあり、ユーザーはアクセスできません。

通常、次の場合を除き、ストリーミング テーブルの基になるチェックポイントを管理または理解する必要はありません。

  • 巻き戻しと再生: テーブルの現在の状態を維持しながら特定の時点からデータを再処理する場合は、ストリーミング テーブルのチェックポイントをリセットする必要があります。
  • チェックポイントエラーまたは破損からの復旧: チェックポイント関連のエラーが原因でストリーミング テーブルへのクエリ書き込みが失敗した場合、ハード エラーが発生し、クエリをさらに進めることはできません。 このクラスの障害から復旧するには、次の 3 つの方法を使用できます。
    • テーブルの完全更新: テーブルがリセットされ、既存のデータがワイプされます。
    • バックアップとバックフィルを使用したテーブルの完全な更新: テーブルの完全な更新と古いデータのバックフィルを実行する前に、テーブルのバックアップを作成しますが、これは非常にコストがかかり、最後の手段である必要があります。
    • チェックポイントをリセットして増分的に続行する: 既存のデータを失う余裕がない場合は、影響を受けるストリーミング フローに対して選択的チェックポイント リセットを実行する必要があります。

例: コード変更によるパイプラインエラー

Amazon S3 などのクラウド ストレージ システムからの初期テーブル スナップショットと共に変更データ フィードを処理し、SCD-1 ストリーミング テーブルに書き込むパイプラインがあるシナリオを考えてみましょう。

パイプラインには、次の 2 つのストリーミング フローがあります。

  • customers_incremental_flow: customer ソース テーブルの CDC フィードを増分読み取りし、重複するレコードを除外して、ターゲット テーブルにアップサートします。
  • customers_snapshot_flow: customers ソース テーブルの初期スナップショットを 1 回読み取り、レコードをターゲット テーブルにアップサートします。

チェックポイントエラーからの復旧のためのパイプライン CDC の例

@dp.temporary_view(name="customers_incremental_view")
  def query():
    return (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .load(customers_incremental_path)
        .dropDuplicates(["customer_id"])
    )

@dp.temporary_view(name="customers_snapshot_view")
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(customers_snapshot_path)
        .select("*")
    )

dp.create_streaming_table("customers")

dp.create_auto_cdc_flow(
    flow_name = "customers_incremental_flow",
    target = "customers",
    source = "customers_incremental_view",
    keys = ["customer_id"],
    sequence_by = col("sequenceNum"),
    apply_as_deletes = expr("operation = 'DELETE'"),
    apply_as_truncates = expr("operation = 'TRUNCATE'"),
    except_column_list = ["operation", "sequenceNum"],
    stored_as_scd_type = 1
)
dp.create_auto_cdc_flow(
    flow_name = "customers_snapshot_flow",
    target = "customers",
    source = "customers_snapshot_view",
    keys = ["customer_id"],
    sequence_by = lit(0),
    stored_as_scd_type = 1,
    once = True
)

このパイプラインをデプロイすると、正常に実行され、変更データ フィードと初期スナップショットの処理が開始されます。

後で、 customers_incremental_view クエリの重複除去ロジックが冗長であり、パフォーマンスのボトルネックの原因になっていることに気付きます。 パフォーマンスを向上させるには、 dropDuplicates() を削除します。

@dp.temporary_view(name="customers_raw_view")
  def query():
    return (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .load()
        # .dropDuplicates()
    )

dropDuplicates() API を削除してパイプラインを再デプロイした後、更新は次のエラーで失敗します。

Streaming stateful operator name does not match with the operator in state metadata.
This is likely to happen when a user adds/removes/changes stateful operators of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)];
Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST

このエラーは、チェックポイントの状態と現在のクエリ定義の不一致が原因で変更が許可されていないことを示し、パイプラインがさらに進行するのを防ぎます。

チェックポイント関連のエラーは、 dropDuplicates API を削除するだけでなく、さまざまな理由で発生する可能性があります。 一般的なシナリオは、次のとおりです。

  • 既存のストリーミング クエリでステートフル演算子 ( dropDuplicates() や集計の導入や削除など) を追加または削除する。
  • 以前にチェックポイントが設定されたクエリでストリーミング ソースを追加、削除、または結合する (たとえば、既存のストリーミング クエリを新しいクエリと結合する、既存の共用体操作からソースを追加/削除するなど)。
  • ステートフル ストリーミング操作の状態スキーマの変更 (重複除去や集計に使用される列の変更など)。

サポートされている変更とサポートされていない変更の包括的な一覧については、 Spark 構造化ストリーミング ガイドと Structured Streamingクエリでの変更の種類を参照してください

回復オプション

データの持続性の要件とリソースの制約に応じて、次の 3 つの復旧戦略があります。

Methods 複雑さ 費用 データ損失の可能性 潜在的なデータの重複 初期スナップショットが必要 テーブルの完全なリセット
テーブルの完全更新 Low ミディアム はい (初期スナップショットが使用できない場合、またはソースで生ファイルが削除されている場合)。 いいえ (変更を適用するターゲット テーブルの場合)。 イエス イエス
バックアップとバックフィルを使用したテーブルの完全な更新 ミディアム High いいえ いいえ (べき等シンクの場合)。たとえば、自動 CDC など)。 いいえ いいえ
テーブル チェックポイントのリセット Medium-High (変更できないオフセットを提供する追加専用ソースの場合は中)。 Low いいえ (慎重に検討する必要があります)。 いいえ (べき等ライターの場合)。たとえば、ターゲット テーブルへの自動 CDC のみ)。 いいえ いいえ

Medium-High 複雑さは、ストリーミング ソースの種類とクエリの複雑さによって異なります。

推奨事項

  • チェックポイントのリセットの複雑さに対処せず、テーブル全体を再計算できる場合は、テーブルの完全更新を使用します。 これにより、コードを変更するオプションも提供されます。
  • チェックポイントリセットの複雑さに対処する必要がない場合は、バックアップとバックフィルでテーブルの完全更新を使用します。バックアップを作成して履歴データをバックフィルする追加コストは問題ありません。
  • テーブル内の既存のデータを保持し、新しいデータの増分処理を続行する必要がある場合は、テーブルのリセット チェックポイントを使用します。 ただし、この方法では、テーブル内の既存のデータが失われず、パイプラインで新しいデータの処理を続行できることを確認するために、チェックポイント リセットを慎重に処理する必要があります。

チェックポイントをリセットして増分的に続行する

チェックポイントをリセットし、増分処理を続行するには、次の手順に従います。

  1. パイプラインを停止する: パイプラインにアクティブな更新プログラムが実行されていないことを確認します。

  2. 新しいチェックポイントの開始位置を決定する: 処理を続行する最後に成功したオフセットまたはタイムスタンプを特定します。 これは通常、エラーが発生する前に正常に処理された最新のオフセットです。

    上記の例では、自動ローダーを使用して JSON ファイルを読み取るため、 modifiedAfter オプションを使用して新しいチェックポイントの開始位置を指定できます。 このオプションを使用すると、自動ローダーが新しいファイルの処理を開始するタイミングのタイムスタンプを設定できます。

    Kafka ソースの場合は、 startingOffsets オプションを使用して、ストリーミング クエリが新しいデータの処理を開始するオフセットを指定できます。

    Delta Lake ソースの場合は、 startingVersion オプションを使用して、ストリーミング クエリが新しいデータの処理を開始するバージョンを指定できます。

  3. コードを変更する: ストリーミング クエリを変更して、 dropDuplicates() API を削除したり、他の変更を加えたりすることができます。 また、オートローダーの読み取りパスに modifiedAfter オプションが追加されていることを確認します。

    @dp.temporary_view(name="customers_incremental_view")
    def query():
        return (
        spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.includeExistingFiles", "true")
            .option("modifiedAfter", "2025-04-09T06:15:00")
            .load(customers_incremental_path)
            # .dropDuplicates(["customer_id"])
        )
    

    正しくない modifiedAfter タイムスタンプを指定すると、データの損失や重複につながる可能性があります。 古いデータを再度処理したり、新しいデータが見つからないりしないように、タイムスタンプが正しく設定されていることを確認します。

    クエリにストリーム ストリーム結合またはストリーム ストリーム共用体がある場合は、参加しているすべてのストリーミング ソースに上記の戦略を適用する必要があります。 例えば次が挙げられます。

    cdc_1 = spark.readStream.format("cloudFiles")...
    cdc_2 = spark.readStream.format("cloudFiles")...
    cdc_source = cdc_1..union(cdc_2)
    
  4. チェックポイントをリセットするストリーミング テーブルに関連付けられているフロー名を特定します。 この例では、 customers_incremental_flowです。 パイプライン コードで、またはパイプライン UI またはパイプライン イベント ログを確認して、フロー名を見つけることができます。

  5. チェックポイントをリセットする: Python ノートブックを作成し、Azure Databricks クラスターにアタッチします。

    チェックポイントをリセットするには、次の情報が必要です。

    • Azure Databricks ワークスペースの URL
    • パイプライン ID
    • チェックポイントをリセットするフロー名
    import requests
    import json
    
    # Define your Databricks instance and pipeline ID
    databricks_instance = "<DATABRICKS_URL>"
    token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
    pipeline_id = "<YOUR_PIPELINE_ID>"
    flows_to_reset = ["<YOUR_FLOW_NAME>"]
    # Set up the API endpoint
    endpoint = f"{databricks_instance}/api/2.0/pipelines/{pipeline_id}/updates"
    
    
    # Set up the request headers
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    
    # Define the payload
    payload = {
        "reset_checkpoint_selection": flows_to_reset
    }
    
    # Make the POST request
    response = requests.post(endpoint, headers=headers, data=json.dumps(payload))
    
    # Check the response
    if response.status_code == 200:
        print("Pipeline update started successfully.")
    else:
        print(f"Error: {response.status_code}, {response.text}")
    
  6. パイプラインの実行: パイプラインは、指定された開始位置から新しいデータの処理を新しいチェックポイントで開始し、増分処理を続行しながら既存のテーブル データを保持します。

ベスト プラクティス

  • 運用環境ではプライベート プレビュー機能を使用しないでください。
  • 運用環境で変更を行う前に、変更をテストします。
    • 低い環境でテスト パイプラインを作成するのが理想的です。 これが不可能な場合は、テストに別のカタログとスキーマを使用してみてください。
    • エラーを再現します。
    • 変更を適用します。
    • 結果を検証し、外出先/no-goで決定します。
    • 運用環境のパイプラインに対する変更をロールアウトします。