Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Możesz za pomocą operacji SQL MERGE upsertować dane z tabeli źródłowej, widoku lub ramy danych do docelowej tabeli Delta. Usługa Delta Lake obsługuje wstawianie, aktualizacje i usuwanie w MERGE oraz obsługuje rozszerzoną składnię wykraczającą poza standardy SQL, aby ułatwić zaawansowane przypadki użycia.
Mając tabelę źródłową o nazwie people10mupdates lub ścieżkę źródłową /tmp/delta/people-10m-updates, które zawierają nowe dane dla docelowej tabeli o nazwie people10m lub ścieżki docelowej pod adresem /tmp/delta/people-10m. Niektóre z tych nowych rekordów mogą już znajdować się w danych docelowych. Aby scalić nowe dane, chcesz zaktualizować wiersze, w których osoba z id już jest obecna, oraz wstawić nowe wiersze, w których dopasowany id nie jest obecny. Możesz uruchomić następujące zapytanie:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Skala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Ważne
Tylko jeden wiersz z tabeli źródłowej może być zgodny z danym wierszem w tabeli docelowej. W środowisku Databricks Runtime 16.0 lub nowszym, system ocenia warunki określone w klauzulach MERGE oraz WHEN MATCHED, aby określić zduplikowane dopasowania. W Databricks Runtime 15.4 LTS i wcześniejszych, MERGE operacje uwzględniają tylko warunki określone w klauzuli ON.
Zapoznaj się z dokumentacją interfejsu API usługi Delta Lake, aby uzyskać szczegółowe informacje o składni języka Scala i Python. Aby uzyskać szczegółowe informacje o składni SQL, zobacz MERGE INTO
Modyfikowanie wszystkich niedopasowanych wierszy przy użyciu scalania
W Databricks SQL i Databricks Runtime 12.2 LTS i nowszych można użyć WHEN NOT MATCHED BY SOURCE klauzuli do UPDATE lub DELETE rekordów w tabeli docelowej, które nie mają odpowiednich rekordów w tabeli źródłowej. Usługa Databricks zaleca dodanie opcjonalnej klauzuli warunkowej, aby uniknąć całkowitego ponownego zapisywania tabeli docelowej.
Poniższy przykład kodu przedstawia podstawową składnię używania tej metody do usuwania, zastępowanie tabeli docelowej zawartością tabeli źródłowej i usuwanie niezgodnych rekordów w tabeli docelowej. Aby uzyskać bardziej skalowalny wzorzec dla tabel, w których aktualizacje źródłowe i usunięcia są powiązane czasowo, zobacz Incrementally sync Delta table with source (Przyrostowa synchronizacja tabeli różnicowej ze źródłem).
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Skala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Poniższy przykład dodaje warunki do klauzuli WHEN NOT MATCHED BY SOURCE i określa wartości, które mają być aktualizowane w niedopasowanych wierszach docelowych.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Skala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Semantyka operacji scalania
Poniżej przedstawiono szczegółowy opis merge semantyki operacji programowych.
Może istnieć dowolna liczba klauzul
whenMatchediwhenNotMatched.whenMatchedKlauzule są wykonywane, gdy wiersz źródłowy pasuje do wiersza tabeli docelowej na podstawie warunku dopasowania. Te klauzule mają następującą semantyka.whenMatchedKlauzule mogą zawierać co najwyżej jednąupdateakcję i jednądelete. Akcjaupdatew programiemergeaktualizuje tylko określone kolumny (podobne doupdateoperacji) dopasowanego wiersza docelowego. Akcjadeleteusuwa dopasowany wiersz.Każda klauzula
whenMatchedmoże mieć opcjonalny warunek. Jeśli ten warunek klauzuli istnieje, akcjaupdatelubdeletejest wykonywana dla dowolnej pasującej pary wierszy źródłowych docelowych tylko wtedy, gdy warunek klauzuli jest spełniony.Jeśli istnieje wiele
whenMatchedklauzul, są one oceniane w kolejności, w której są określone. WszystkiewhenMatchedklauzule, z wyjątkiem ostatniego, muszą mieć warunki.Jeśli żadna z
whenMatchedwarunków nie zwróci wartości true dla pary wierszy źródłowych i docelowych pasujących do warunku scalania, wiersz docelowy pozostanie niezmieniony.Aby zaktualizować wszystkie kolumny docelowej tabeli delty z odpowiednimi kolumnami źródłowego zestawu danych, użyj polecenia
whenMatched(...).updateAll(). Jest to odpowiednik:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))dla wszystkich kolumn docelowej tabeli Delta. W związku z tym ta akcja zakłada, że tabela źródłowa ma te same kolumny co w tabeli docelowej. W przeciwnym razie zapytanie zgłasza błąd analizy.
Uwaga
To zachowanie zmienia się, gdy włączono automatyczną ewolucję schematu. Aby uzyskać szczegółowe informacje, zobacz automatyczną ewolucję schematu.
whenNotMatchedKlauzule są wykonywane, gdy wiersz źródłowy nie pasuje do żadnego wiersza docelowego na podstawie warunku dopasowania. Te klauzule mają następującą semantyka.whenNotMatchedKlauzule mogą mieć tylkoinsertdziałanie. Nowy wiersz jest generowany na podstawie określonej kolumny i odpowiednich wyrażeń. Nie trzeba określać wszystkich kolumn w tabeli docelowej. W przypadku kolumn docelowych, które nie są określone, wstawiany jestNULL.Każda klauzula
whenNotMatchedmoże mieć opcjonalny warunek. Jeśli warunek klauzuli jest obecny, wiersz źródłowy jest wstawiany tylko wtedy, gdy ten warunek jest spełniony dla tego wiersza. W przeciwnym razie kolumna źródłowa jest ignorowana.Jeśli istnieje wiele
whenNotMatchedklauzul, są one oceniane w kolejności, w której są określone. WszystkiewhenNotMatchedklauzule, z wyjątkiem ostatniego, muszą mieć warunki.Aby wstawić wszystkie kolumny docelowej tabeli delty z odpowiednimi kolumnami źródłowego zestawu danych, użyj polecenia
whenNotMatched(...).insertAll(). Jest to odpowiednik:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))dla wszystkich kolumn docelowej tabeli Delta. W związku z tym ta akcja zakłada, że tabela źródłowa ma te same kolumny co w tabeli docelowej. W przeciwnym razie zapytanie zgłasza błąd analizy.
Uwaga
To zachowanie zmienia się, gdy włączono automatyczną ewolucję schematu. Aby uzyskać szczegółowe informacje, zobacz automatyczną ewolucję schematu.
whenNotMatchedBySourceKlauzule są wykonywane, gdy wiersz docelowy nie pasuje do żadnego wiersza źródłowego na podstawie warunku scalania. Te klauzule mają następującą semantyka.-
whenNotMatchedBySourceklauzule mogą określaćdeleteiupdateakcje. - Każda klauzula
whenNotMatchedBySourcemoże mieć opcjonalny warunek. Jeśli warunek klauzuli jest obecny, wiersz docelowy jest modyfikowany tylko wtedy, gdy ten warunek jest spełniony dla tego wiersza. W przeciwnym razie wiersz docelowy pozostanie niezmieniony. - Jeśli istnieje wiele
whenNotMatchedBySourceklauzul, są one oceniane w kolejności, w której są określone. WszystkiewhenNotMatchedBySourceklauzule, z wyjątkiem ostatniego, muszą mieć warunki. - Z definicji
whenNotMatchedBySourceklauzule nie mają wiersza źródłowego do ściągania wartości kolumn z, więc nie można odwoływać się do kolumn źródłowych. Dla każdej kolumny do zmodyfikowania można określić literał lub wykonać akcję w kolumnie docelowej, na przykładSET target.deleted_count = target.deleted_count + 1.
-
Ważne
- Operacja
mergemoże zakończyć się niepowodzeniem, jeśli wiele wierszy źródłowego zestawu danych pasuje, a scalanie próbuje zaktualizować te same wiersze w docelowej Tabeli Delta. Według semantyki SQL scalania taka operacja aktualizacji jest niejednoznaczna, ponieważ nie jest jasne, który wiersz źródłowy powinien być używany do aktualizowania dopasowanego wiersza docelowego. Możesz wstępnie przetworzyć tabelę źródłową, aby wyeliminować możliwość wielu dopasowań. - Operację SQL można zastosować w widoku SQL
MERGEtylko wtedy, gdy widok został zdefiniowany jakoCREATE VIEW viewName AS SELECT * FROM deltaTable.
Deduplikacja danych podczas zapisywania w tabelach Delta
Typowy przypadek użycia ETL polega na zbieraniu dzienników do tabeli Delta poprzez ich dodawanie. Jednak często źródła mogą generować zduplikowane rekordy dziennika, a kroki deduplikacji podrzędnej są potrzebne do ich obsługi. W programie mergemożna uniknąć wstawiania zduplikowanych rekordów.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Skala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Uwaga
Zestaw danych zawierający nowe dzienniki musi być deduplikowany wewnętrznie. Semantyka scalania w SQL dopasowuje nowe dane do istniejących i usuwa duplikaty, ale jeśli w nowym zestawie danych występują duplikaty, zostaną one wstawione. Dlatego usuń duplikaty nowych danych przed scaleniem z tabelą.
Jeśli wiesz, że możesz uzyskać zduplikowane rekordy tylko przez kilka dni, możesz zoptymalizować zapytanie jeszcze bardziej, poprzez partycjonowanie tabeli według daty, a następnie określając zakres dat tabeli docelowej, aby się z nim zgadzał.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Skala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Jest to bardziej wydajne niż poprzednie polecenie, ponieważ wyszukuje duplikaty tylko w ciągu ostatnich 7 dni dzienników, a nie w całej tabeli. Ponadto można użyć scalania tylko dodającego w połączeniu ze Strukturami Strumieniowymi do ciągłej deduplikacji dzienników.
- W zapytaniu strumieniowym można użyć operacji scalania w
foreachBatch, aby stale zapisywać dane strumieniowe do tabeli Delty z deduplikacją. Aby uzyskać więcej informacji, zobacz poniższy przykład przesyłania strumieniowego dotyczącyforeachBatch. - W innym zapytaniu przesyłanym strumieniowo można stale odczytywać deduplikowane dane z tej tabeli delty. Jest to możliwe, ponieważ scalanie z tylko dodawaniem dołącza nowe dane do Tabeli Delta.
Powolne zmienianie danych (SCD) i przechwytywanie zmian w danych (CDC) z Delta Lake
Potoki deklaratywne Spark Lakeflow mają natywną obsługę śledzenia i stosowania typów SCD Type 1 i Type 2. Użyj funkcji AUTO CDC ... INTO z potokami deklaratywnymi Lakeflow Spark, aby upewnić się, że rekordy poza kolejnością są prawidłowo obsługiwane podczas przetwarzania strumieni danych CDC. Zobacz Interfejsy API auto cdC: upraszczanie przechwytywania danych zmian za pomocą potoków.
Przyrostowa synchronizacja tabeli delty ze źródłem
W Databricks SQL i Databricks Runtime 12.2 LTS i nowszych można użyć WHEN NOT MATCHED BY SOURCE do tworzenia dowolnych warunków w celu atomowego usunięcia i zastąpienia części tabeli. Może to być szczególnie przydatne, gdy masz tabelę źródłową, w której rekordy mogą ulec zmianie lub zostaną usunięte przez kilka dni po początkowym wpisie danych, ale ostatecznie osiedlą się w stanie końcowym.
Poniższe zapytanie pokazuje użycie tego wzorca do wybrania 5 dni rekordów ze źródła, zaktualizowanie pasujących rekordów w obiekcie docelowym, wstawienie nowych rekordów ze źródła do miejsca docelowego i usunięcie wszystkich niezgodnych rekordów z ostatnich 5 dni w obiekcie docelowym.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Udostępniając ten sam filtr logiczny w tabelach źródłowych i docelowych, można dynamicznie propagować zmiany ze źródła do tabel docelowych, w tym usuwania.
Uwaga
Chociaż ten wzorzec może być używany bez żadnych klauzul warunkowych, może to prowadzić do pełnego ponownego zapisania tabeli docelowej, która może być kosztowna.