您可以使用 SQL 作業,將源數據表、檢視或 DataFrame 中的數據向上插入目標 Delta 數據表 MERGE 。 Delta Lake 支援 在 中 MERGE插入、更新和刪除 ,並支援超越 SQL 標準的擴充語法,以利進階使用案例。
假設您有名為 people10mupdates 的源數據表或 來源 /tmp/delta/people-10m-updates 路徑,其中包含名為 people10m 的目標數據表的新數據,或位於 /tmp/delta/people-10m的目標路徑。 其中一些新記錄可能已存在於目標數據中。 若要合併新數據,您需要更新人員 id 已存在的數據列,並在無相符 id 的情況下插入新的數據列。 您可以執行下列查詢:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
程式語言 Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
重要
只有源數據表中的單一數據列可以比對目標數據表中的指定數據列。 在 Databricks Runtime 16.0 及更高版本中,MERGE評估WHEN MATCHED和ON子句中指定的條件,以確定重複項。 在版本 15.4 LTS 及以下的 Databricks Runtime 中,MERGE 運算只會考慮指定於子句中的 ON 條件。
如需 Scala 和 Python 語法詳細數據, 請參閱 Delta Lake API 檔 。 如需 SQL 語法詳細數據,請參閱 MERGE INTO
使用合併修改所有不相符的數據列
在 Databricks SQL 和 Databricks Runtime 12.2 LTS 及更高版本中,您可以使用 WHEN NOT MATCHED BY SOURCE 子句,對於那些在源數據表中沒有相應記錄的目標數據表中的記錄執行UPDATE或DELETE操作。 Databricks 建議新增選擇性條件子句,以避免完全重寫目標數據表。
下列程式代碼範例示範使用這個 進行刪除的基本語法、以源數據表的內容覆寫目標數據表,以及刪除目標數據表中不相符的記錄。 如需對來源更新和刪除進行時間約束的表格之更具擴充性的模式,請參閱增量同步 Delta 表與來源。
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
程式語言 Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
下列範例會在 WHEN NOT MATCHED BY SOURCE 子句中新增條件,並指定要更新不相符目標列的值。
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
程式語言 Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
合併作業語意
以下是程式設計作業語意的詳細描述 merge 。
可以有任意數目的
whenMatched和whenNotMatched子句。whenMatched當符合比對條件時,源資料列與目標表格列匹配,則執行子句。 這些子句具有下列語意。whenMatched子句最多可以有一個update和一個delete動作。update中的merge動作只會更新相符目標數據列的指定數據行(類似於update作業)。 動作delete會刪除相符的數據列。每個
whenMatched子句都可以有選擇性條件。 如果這個子句條件存在,則只有在子句條件為 true 時,update才會針對任何相符的來源目標數據列組執行 或delete動作。如果有多個
whenMatched子句,則會依照指定的順序來評估它們。 除了最後一個子句之外,所有whenMatched子句都必須有條件。如果沒有任何
whenMatched條件對符合合併條件的來源和目標數據列組評估為 true,則目標數據列將保持不變。若要使用來源資料集的對應資料列來更新目標 Delta 資料表的所有資料列,請使用
whenMatched(...).updateAll()。 這相當於:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))針對目標 Delta 資料表的所有欄。 因此,此動作假設源數據表的數據行與目標數據表中的數據行相同,否則查詢會擲回分析錯誤。
注意
啟用自動架構演進時,此行為會變更。 如需詳細資訊,請參閱 自動架構演進 。
whenNotMatched當來源數據列不符合任何以比對條件為基礎的目標數據列時,就會執行 子句。 這些子句具有下列語意。whenNotMatched子句只能有insert動作。 新的數據列會根據指定的數據行和對應的表達式產生。 您不需要指定目標資料表中的所有資料列。 針對未指定的目標欄位,會插入NULL。每個
whenNotMatched子句都可以有選擇性條件。 如果子句條件存在,則只有在該數據列的條件為 true 時,才會插入源數據列。 否則,會忽略源欄。如果有多個
whenNotMatched子句,則會依照指定的順序來評估它們。 除了最後一個子句之外,所有whenNotMatched子句都必須有條件。若要使用來源資料集的對應資料列插入目標 Delta 資料表的所有資料列,請使用
whenNotMatched(...).insertAll()。 這相當於:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))針對目標 Delta 資料表的所有欄。 因此,此動作假設源數據表的數據行與目標數據表中的數據行相同,否則查詢會擲回分析錯誤。
注意
啟用自動架構演進時,此行為會變更。 如需詳細資訊,請參閱 自動架構演進 。
whenNotMatchedBySource子句會在目標資料行不符合合併條件的任何源資料行時執行。 這些子句具有下列語意。-
whenNotMatchedBySource子句可以指定delete和update動作。 - 每個
whenNotMatchedBySource子句都可以有選擇性條件。 如果子句條件存在,只有當該數據列的條件為 true 時,才會修改目標數據列。 否則,目標數據列會維持不變。 - 如果有多個
whenNotMatchedBySource子句,則會依照指定的順序來評估它們。 除了最後一個子句之外,所有whenNotMatchedBySource子句都必須有條件。 - 根據定義,
whenNotMatchedBySource子句沒有從中提取數據行值的來源數據列,因此無法參考源數據行。 若要修改每個資料行,您可以指定常值或對該資料行執行動作,例如SET target.deleted_count = target.deleted_count + 1。
-
重要
-
merge如果源數據集的多個數據列相符,且合併嘗試更新目標 Delta 數據表的相同數據列,作業可能會失敗。 根據合併的 SQL 語意,這類更新作業模棱兩可,因為不清楚應該使用哪個來源數據列來更新相符的目標數據列。 您可以預先處理來源表格,以排除多個相符項目的可能性。 - 只有當檢視已定義為
MERGE時,才可以在 SQL VIEW 上套用 SQLCREATE VIEW viewName AS SELECT * FROM deltaTable作業。
寫入 Delta 資料表時資料去重
常見的 ETL 使用案例是將記錄收集後附加到表格中,並將其匯聚至 Delta 數據表。 不過,通常來源可能會產生重複的記錄檔記錄,而需要下游重複數據刪除步驟才能處理它們。 使用 merge時,您可以避免插入重複的記錄。
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
程式語言 Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
JAVA
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
注意
包含新記錄的數據集必須自行去重。 根據合併的 SQL 語意,它會對照並去除數據表中現有數據與新數據之間的重複數據,但如果新數據集中有重複數據,這些重複數據仍會被插入。 因此,將新資料去重後再合併至資料表。
如果您知道只有幾天可能會取得重複的記錄,您可以藉由依日期分割數據表,然後指定要比對的目標數據表日期範圍,進一步優化查詢。
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
程式語言 Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
JAVA
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
這比上一個命令更有效率,因為它只會在過去 7 天的記錄中尋找重複專案,而不是整個數據表。 此外,您可以使用這個僅限插入的合併與結構化串流來持續執行日誌的去重。
- 在串流查詢中,您可以使用
foreachBatch中的合併作業,以持續將任何串流數據寫入 Delta 資料表並進行重複資料刪除。 如需有關的詳細資訊,請參閱下列foreachBatch。 - 在另一個串流查詢中,您可以持續讀取此 Delta 資料表的去重資料。 這是可能的,因為僅插入合併只會將新數據附加至 Delta 數據表。
使用 Delta Lake 緩時變資料 (SCD) 和異動資料擷取 (CDC)
Lakeflow Spark 宣告式管線具備原生支援功能,以追蹤和套用 SCD 第 1 類型和第 2 類型。 與 Lakeflow Spark 宣告式管線搭配使用 AUTO CDC ... INTO,以確保在處理 CDC 資料流時能正確處理非排序記錄。 請參閱 AUTOTO CDC API:使用管線簡化變更資料擷取。
增量同步 Delta 表和來源端
在 Databricks SQL 和 Databricks Runtime 12.2 LTS 及以上版本中,您可以使用 WHEN NOT MATCHED BY SOURCE 來建立任意條件,以原子方式刪除並替換資料表的一部分。 當您有一個源表時,記錄可能在初始數據輸入後的數天內變更或被刪除,但最終會達到一個最終狀態,這將特別有幫助。
下列查詢顯示使用此模式從來源選取 5 天的記錄、更新目標中的相符記錄、將來源的新記錄插入目標,以及從目標中刪除過去 5 天的所有不相符記錄。
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
藉由在來源和目標數據表上提供相同的布爾篩選,您就可以動態地將變更從來源傳播到目標數據表,包括刪除。
注意
雖然此模式可以在沒有任何條件子句的情況下使用,但這會導致重寫目標表,這可能會非常昂貴。