本頁說明如何在串流檢查點無效或損毀時,在 Lakeflow Spark 宣告式管線中復原管線。
什麼是串流檢查點?
在 Apache Spark 結構化串流中,檢查點是用來保存串流查詢狀態的機制。 此狀態包括:
- 進度資訊:已處理來源的偏移量。
-
中繼狀態:需要跨微批次維護以進行具狀態作業的資料 (例如,彙總、
mapGroupsWithState)。 - 中繼資料:串流查詢執行的相關資訊。
檢查點對於確保串流應用程式中的容錯和資料一致性至關重要:
- 容錯:如果串流應用程式失敗 (例如,由於節點故障、應用程式當機),檢查點可讓應用程式從上次成功的檢查點狀態重新啟動,而不是從頭開始重新處理所有資料。 這可以防止資料遺失並確保增量處理。
- 精確一次處理:對於許多串流來源,檢查點與冪等接收器結合,可啟用精確一次處理,保證每筆記錄只處理一次,即使面對失敗,也可防止重複或遺漏。
- 狀態管理:對於有狀態轉換,檢查點會保留這些操作的內部狀態,讓串流查詢能夠根據累積的歷史狀態正確地繼續處理新資料。
管線檢查點
管線以結構化串流為基礎,並抽象化大部分基礎檢查點管理,提供宣告式方法。 當您在管線中定義串流資料表時,每個寫入串流資料表的流程都有檢查點狀態。 這些檢查點位置位於管線內部,使用者無法存取。
您通常不需要管理或瞭解串流資料表的基礎檢查點,但下列情況除外:
- 倒帶和重播:如果您想要重新處理特定時間點的資料,同時保留資料表的目前狀態,則必須重設串流資料表的檢查點。
-
從檢查點失敗或損毀中復原:如果寫入串流資料表的查詢因檢查點相關錯誤而失敗,則會導致硬性失敗,且查詢無法進一步進行。 您可以使用三種方法從此類失敗中恢復:
- 完整表格重新整理:這會重設表格並清除現有資料。
- 使用備份和回填的完整表格重新整理:在執行完整表格重新整理和回填舊資料之前,您會先備份表格,但這非常昂貴,應該是最後的手段。
- 重設檢查點並以累加方式繼續:如果您無法承受遺失現有資料的承受能力,則必須針對受影響的串流流程執行選擇性檢查點重設。
範例:由於程式碼變更而導致管道失敗
假設您有一個數據管線,這個管線可以處理來自 Amazon S3 等雲端儲存系統上的變更資料流,並將初始表格快照寫入 SCD-1 串流資料表。
管線有兩個串流流程:
-
customers_incremental_flow:以累加方式讀取customer來源資料表 CDC 摘要,篩選掉重複的記錄,並將它們更新插入到目標資料表中。 -
customers_snapshot_flow:一次性讀取來源資料表的customers初始快照集,並將記錄更新插入目標資料表。
@dp.temporary_view(name="customers_incremental_view")
def query():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.load(customers_incremental_path)
.dropDuplicates(["customer_id"])
)
@dp.temporary_view(name="customers_snapshot_view")
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(customers_snapshot_path)
.select("*")
)
dp.create_streaming_table("customers")
dp.create_auto_cdc_flow(
flow_name = "customers_incremental_flow",
target = "customers",
source = "customers_incremental_view",
keys = ["customer_id"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
dp.create_auto_cdc_flow(
flow_name = "customers_snapshot_flow",
target = "customers",
source = "customers_snapshot_view",
keys = ["customer_id"],
sequence_by = lit(0),
stored_as_scd_type = 1,
once = True
)
部署此管線之後,它會成功執行,並開始處理變更資料摘要和初始快照集。
稍後,您意識到查詢中的 customers_incremental_view 重複資料刪除邏輯是多餘的,並導致效能瓶頸。 您可以移除 以 dropDuplicates() 改善效能:
@dp.temporary_view(name="customers_raw_view")
def query():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.load()
# .dropDuplicates()
)
移除 dropDuplicates() API 並重新部署管線之後,更新會失敗,並出現下列錯誤:
Streaming stateful operator name does not match with the operator in state metadata.
This is likely to happen when a user adds/removes/changes stateful operators of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)];
Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST
此錯誤表示由於檢查點狀態與目前查詢定義不相符,不允許變更,從而阻止管道進一步進行。
與檢查點相關的故障可能由於各種原因而發生,而不僅僅是刪除 dropDuplicates API。 常見情況包括:
- 在現有的串流查詢中新增或移除可設定狀態運算子 (例如,引進或卸除
dropDuplicates()或彙總)。 - 在先前檢查點的查詢中新增、移除或合併串流來源 (例如,將現有的串流查詢與新的聯集查詢聯集,或從現有聯集作業中新增/移除來源)。
- 修改具狀態串流作業的狀態結構描述 (例如變更用於重複資料刪除或彙總的資料行)。
如需支援和不支援變更的完整清單,請參閱 Spark 結構化串流指南 和 結構化串流查詢中的變更類型。
復原選項
有三種復原策略,視您的資料持久性需求和資源限制而定:
| Methods | 複雜性 | 費用 | 潛在資料遺失 | 潛在的資料重複 | 需要初始快照 | 完整表格重設 |
|---|---|---|---|---|---|---|
| 完整表格重新整理 | Low | 中等 | 是 (如果沒有可用的初始快照,或原始檔案已在來源刪除。 | 否 (適用於套用變更目標資料表。 | Yes | Yes |
| 具有備份和回填的完整表格重新整理 | 中等 | High | 否 | 否 (對於冪等接收器。例如,自動 CDC。 | 否 | 否 |
| 重設表格檢查點 | Medium-High (提供不可變位移的僅附加來源的媒介。 | Low | 否(需要仔細考慮。 | 否(對於冪等作家。例如,僅自動 CDC 至目標資料表。 | 否 | 否 |
Medium-High 複雜度取決於串流來源型別和查詢的複雜性。
Recommendations
- 如果您不想處理檢查點重設的複雜性,請使用完整表格重新整理,而且可以重新計算整個表格。 這也將為您提供進行程式碼變更的選項。
- 如果您不想處理檢查點重設的複雜性,並且可以接受備份和回填歷史資料的額外成本,請使用具有備份和回填的完整表格重新整理。
- 如果您必須保留表格中的現有資料,並繼續以累加方式處理新資料,請使用重設表格檢查點。 不過,此方法需要仔細處理檢查點重設,以檢查資料表中的現有資料是否遺失,以及管線是否可以繼續處理新資料。
重設檢查點並以增量方式繼續
若要重設檢查點並繼續以增量方式處理,請遵循下列步驟:
停止管線:確定管線沒有正在執行的作用中更新。
確定新檢查點的起始位置:確定您要繼續處理的最後一個成功偏移或時間戳記。 這通常是失敗發生前成功處理的最新偏移。
在上述範例中,因為您是使用自動載入器讀取 JSON 檔案,因此您可以使用選項
modifiedAfter來指定新檢查點的起始位置。 此選項可讓您設定自動載入器何時開始處理新檔案的時間戳記。對於 Kafka 來源,您可以使用選項
startingOffsets來指定串流查詢應從中開始處理新資料的位移量。針對 Delta Lake 來源,您可以使用選項
startingVersion來指定串流查詢應該從中開始處理新資料的版本。進行代碼更改: 您可以修改流式查詢以刪除
dropDuplicates()API 或進行其他更改。 此外,並檢查您是否已將該modifiedAfter選項添加到自動加載器讀取路徑。@dp.temporary_view(name="customers_incremental_view") def query(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .option("cloudFiles.includeExistingFiles", "true") .option("modifiedAfter", "2025-04-09T06:15:00") .load(customers_incremental_path) # .dropDuplicates(["customer_id"]) )備註
提供不正確的
modifiedAfter時間戳記可能會導致資料遺失或重複。 檢查時間戳記是否設定正確,以避免再次處理舊資料或遺失新資料。如果您的查詢具有串流聯結或串流聯集,您必須針對所有參與的串流來源套用上述策略。 例如:
cdc_1 = spark.readStream.format("cloudFiles")... cdc_2 = spark.readStream.format("cloudFiles")... cdc_source = cdc_1..union(cdc_2)識別與您要重設檢查點之串流資料表相關聯的流程名稱。 在範例中,它是
customers_incremental_flow。 您可以在管線程式碼中找到流程名稱,或檢查管線 UI 或管線事件記錄檔。重設檢查點:建立 Python 筆記本,並將它附加至 Azure Databricks 叢集。
您需要下列資訊才能重設檢查點:
- Azure Databricks 工作區 URL
- 管線標識碼
- 您要重設檢查點的流程名稱 ()
import requests import json # Define your Databricks instance and pipeline ID databricks_instance = "<DATABRICKS_URL>" token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() pipeline_id = "<YOUR_PIPELINE_ID>" flows_to_reset = ["<YOUR_FLOW_NAME>"] # Set up the API endpoint endpoint = f"{databricks_instance}/api/2.0/pipelines/{pipeline_id}/updates" # Set up the request headers headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } # Define the payload payload = { "reset_checkpoint_selection": flows_to_reset } # Make the POST request response = requests.post(endpoint, headers=headers, data=json.dumps(payload)) # Check the response if response.status_code == 200: print("Pipeline update started successfully.") else: print(f"Error: {response.status_code}, {response.text}")執行管道:管道從指定的起始位置開始處理新數據,並使用新的檢查點,保留現有的表格數據,同時繼續增量處理。
最佳做法
- 避免在生產環境中使用私人預覽功能。
- 在生產環境中進行變更之前,先測試您的變更。
- 建立測試管線,最好是在較低的環境中。 如果無法做到這一點,請嘗試使用不同的目錄和結構描述進行測試。
- 重現錯誤。
- 套用變更。
- 驗證結果並在 go/no-go做出決定。
- 將變更推出至您的生產管線。