共用方式為


從串流檢查點失敗中復原管線

本頁說明如何在串流檢查點無效或損毀時,在 Lakeflow Spark 宣告式管線中復原管線。

什麼是串流檢查點?

在 Apache Spark 結構化串流中,檢查點是用來保存串流查詢狀態的機制。 此狀態包括:

  • 進度資訊:已處理來源的偏移量。
  • 中繼狀態:需要跨微批次維護以進行具狀態作業的資料 (例如,彙總、 mapGroupsWithState)。
  • 中繼資料:串流查詢執行的相關資訊。

檢查點對於確保串流應用程式中的容錯和資料一致性至關重要:

  • 容錯:如果串流應用程式失敗 (例如,由於節點故障、應用程式當機),檢查點可讓應用程式從上次成功的檢查點狀態重新啟動,而不是從頭開始重新處理所有資料。 這可以防止資料遺失並確保增量處理。
  • 精確一次處理:對於許多串流來源,檢查點與冪等接收器結合,可啟用精確一次處理,保證每筆記錄只處理一次,即使面對失敗,也可防止重複或遺漏。
  • 狀態管理:對於有狀態轉換,檢查點會保留這些操作的內部狀態,讓串流查詢能夠根據累積的歷史狀態正確地繼續處理新資料。

管線檢查點

管線以結構化串流為基礎,並抽象化大部分基礎檢查點管理,提供宣告式方法。 當您在管線中定義串流資料表時,每個寫入串流資料表的流程都有檢查點狀態。 這些檢查點位置位於管線內部,使用者無法存取。

您通常不需要管理或瞭解串流資料表的基礎檢查點,但下列情況除外:

  • 倒帶和重播:如果您想要重新處理特定時間點的資料,同時保留資料表的目前狀態,則必須重設串流資料表的檢查點。
  • 從檢查點失敗或損毀中復原:如果寫入串流資料表的查詢因檢查點相關錯誤而失敗,則會導致硬性失敗,且查詢無法進一步進行。 您可以使用三種方法從此類失敗中恢復:
    • 完整表格重新整理:這會重設表格並清除現有資料。
    • 使用備份和回填的完整表格重新整理:在執行完整表格重新整理和回填舊資料之前,您會先備份表格,但這非常昂貴,應該是最後的手段。
    • 重設檢查點並以累加方式繼續:如果您無法承受遺失現有資料的承受能力,則必須針對受影響的串流流程執行選擇性檢查點重設。

範例:由於程式碼變更而導致管道失敗

假設您有一個數據管線,這個管線可以處理來自 Amazon S3 等雲端儲存系統上的變更資料流,並將初始表格快照寫入 SCD-1 串流資料表。

管線有兩個串流流程:

  • customers_incremental_flow:以累加方式讀取 customer 來源資料表 CDC 摘要,篩選掉重複的記錄,並將它們更新插入到目標資料表中。
  • customers_snapshot_flow:一次性讀取來源資料表的 customers 初始快照集,並將記錄更新插入目標資料表。

從檢查點失敗中復原的管線 CDC 範例

@dp.temporary_view(name="customers_incremental_view")
  def query():
    return (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .load(customers_incremental_path)
        .dropDuplicates(["customer_id"])
    )

@dp.temporary_view(name="customers_snapshot_view")
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(customers_snapshot_path)
        .select("*")
    )

dp.create_streaming_table("customers")

dp.create_auto_cdc_flow(
    flow_name = "customers_incremental_flow",
    target = "customers",
    source = "customers_incremental_view",
    keys = ["customer_id"],
    sequence_by = col("sequenceNum"),
    apply_as_deletes = expr("operation = 'DELETE'"),
    apply_as_truncates = expr("operation = 'TRUNCATE'"),
    except_column_list = ["operation", "sequenceNum"],
    stored_as_scd_type = 1
)
dp.create_auto_cdc_flow(
    flow_name = "customers_snapshot_flow",
    target = "customers",
    source = "customers_snapshot_view",
    keys = ["customer_id"],
    sequence_by = lit(0),
    stored_as_scd_type = 1,
    once = True
)

部署此管線之後,它會成功執行,並開始處理變更資料摘要和初始快照集。

稍後,您意識到查詢中的 customers_incremental_view 重複資料刪除邏輯是多餘的,並導致效能瓶頸。 您可以移除 以 dropDuplicates() 改善效能:

@dp.temporary_view(name="customers_raw_view")
  def query():
    return (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .load()
        # .dropDuplicates()
    )

移除 dropDuplicates() API 並重新部署管線之後,更新會失敗,並出現下列錯誤:

Streaming stateful operator name does not match with the operator in state metadata.
This is likely to happen when a user adds/removes/changes stateful operators of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)];
Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST

此錯誤表示由於檢查點狀態與目前查詢定義不相符,不允許變更,從而阻止管道進一步進行。

與檢查點相關的故障可能由於各種原因而發生,而不僅僅是刪除 dropDuplicates API。 常見情況包括:

  • 在現有的串流查詢中新增或移除可設定狀態運算子 (例如,引進或卸除 dropDuplicates() 或彙總)。
  • 在先前檢查點的查詢中新增、移除或合併串流來源 (例如,將現有的串流查詢與新的聯集查詢聯集,或從現有聯集作業中新增/移除來源)。
  • 修改具狀態串流作業的狀態結構描述 (例如變更用於重複資料刪除或彙總的資料行)。

如需支援和不支援變更的完整清單,請參閱 Spark 結構化串流指南結構化串流查詢中的變更類型

復原選項

有三種復原策略,視您的資料持久性需求和資源限制而定:

Methods 複雜性 費用 潛在資料遺失 潛在的資料重複 需要初始快照 完整表格重設
完整表格重新整理 Low 中等 是 (如果沒有可用的初始快照,或原始檔案已在來源刪除。 否 (適用於套用變更目標資料表。 Yes Yes
具有備份和回填的完整表格重新整理 中等 High 否 (對於冪等接收器。例如,自動 CDC。
重設表格檢查點 Medium-High (提供不可變位移的僅附加來源的媒介。 Low 否(需要仔細考慮。 否(對於冪等作家。例如,僅自動 CDC 至目標資料表。

Medium-High 複雜度取決於串流來源型別和查詢的複雜性。

Recommendations

  • 如果您不想處理檢查點重設的複雜性,請使用完整表格重新整理,而且可以重新計算整個表格。 這也將為您提供進行程式碼變更的選項。
  • 如果您不想處理檢查點重設的複雜性,並且可以接受備份和回填歷史資料的額外成本,請使用具有備份和回填的完整表格重新整理。
  • 如果您必須保留表格中的現有資料,並繼續以累加方式處理新資料,請使用重設表格檢查點。 不過,此方法需要仔細處理檢查點重設,以檢查資料表中的現有資料是否遺失,以及管線是否可以繼續處理新資料。

重設檢查點並以增量方式繼續

若要重設檢查點並繼續以增量方式處理,請遵循下列步驟:

  1. 停止管線:確定管線沒有正在執行的作用中更新。

  2. 確定新檢查點的起始位置:確定您要繼續處理的最後一個成功偏移或時間戳記。 這通常是失敗發生前成功處理的最新偏移。

    在上述範例中,因為您是使用自動載入器讀取 JSON 檔案,因此您可以使用選項 modifiedAfter 來指定新檢查點的起始位置。 此選項可讓您設定自動載入器何時開始處理新檔案的時間戳記。

    對於 Kafka 來源,您可以使用選項 startingOffsets 來指定串流查詢應從中開始處理新資料的位移量。

    針對 Delta Lake 來源,您可以使用選項 startingVersion 來指定串流查詢應該從中開始處理新資料的版本。

  3. 進行代碼更改: 您可以修改流式查詢以刪除 dropDuplicates() API 或進行其他更改。 此外,並檢查您是否已將該 modifiedAfter 選項添加到自動加載器讀取路徑。

    @dp.temporary_view(name="customers_incremental_view")
    def query():
        return (
        spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.includeExistingFiles", "true")
            .option("modifiedAfter", "2025-04-09T06:15:00")
            .load(customers_incremental_path)
            # .dropDuplicates(["customer_id"])
        )
    

    備註

    提供不正確的 modifiedAfter 時間戳記可能會導致資料遺失或重複。 檢查時間戳記是否設定正確,以避免再次處理舊資料或遺失新資料。

    如果您的查詢具有串流聯結或串流聯集,您必須針對所有參與的串流來源套用上述策略。 例如:

    cdc_1 = spark.readStream.format("cloudFiles")...
    cdc_2 = spark.readStream.format("cloudFiles")...
    cdc_source = cdc_1..union(cdc_2)
    
  4. 識別與您要重設檢查點之串流資料表相關聯的流程名稱。 在範例中,它是 customers_incremental_flow。 您可以在管線程式碼中找到流程名稱,或檢查管線 UI 或管線事件記錄檔。

  5. 重設檢查點:建立 Python 筆記本,並將它附加至 Azure Databricks 叢集。

    您需要下列資訊才能重設檢查點:

    • Azure Databricks 工作區 URL
    • 管線標識碼
    • 您要重設檢查點的流程名稱 ()
    import requests
    import json
    
    # Define your Databricks instance and pipeline ID
    databricks_instance = "<DATABRICKS_URL>"
    token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
    pipeline_id = "<YOUR_PIPELINE_ID>"
    flows_to_reset = ["<YOUR_FLOW_NAME>"]
    # Set up the API endpoint
    endpoint = f"{databricks_instance}/api/2.0/pipelines/{pipeline_id}/updates"
    
    
    # Set up the request headers
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    
    # Define the payload
    payload = {
        "reset_checkpoint_selection": flows_to_reset
    }
    
    # Make the POST request
    response = requests.post(endpoint, headers=headers, data=json.dumps(payload))
    
    # Check the response
    if response.status_code == 200:
        print("Pipeline update started successfully.")
    else:
        print(f"Error: {response.status_code}, {response.text}")
    
  6. 執行管道:管道從指定的起始位置開始處理新數據,並使用新的檢查點,保留現有的表格數據,同時繼續增量處理。

最佳做法

  • 避免在生產環境中使用私人預覽功能。
  • 在生產環境中進行變更之前,先測試您的變更。
    • 建立測試管線,最好是在較低的環境中。 如果無法做到這一點,請嘗試使用不同的目錄和結構描述進行測試。
    • 重現錯誤。
    • 套用變更。
    • 驗證結果並在 go/no-go做出決定。
    • 將變更推出至您的生產管線。