Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
U kunt gegevens uit een brontabel, weergave of DataFrame invoegen of bijwerken in een doeltabel van Delta met behulp van de MERGE SQL-bewerking. Delta Lake biedt ondersteuning voor invoegingen, updates en verwijderingen in MERGE, en het ondersteunt uitgebreide syntaxis buiten de SQL-standaarden om geavanceerde gebruiksvoorbeelden mogelijk te maken.
Stel dat u een brontabel hebt met de naam people10mupdates of een bronpad waarop nieuwe gegevens voor een doeltabel met de naam /tmp/delta/people-10m-updates of het doelpad people10m staan/tmp/delta/people-10m. Sommige van deze nieuwe records zijn mogelijk al aanwezig in de doelgegevens. Om de nieuwe gegevens samen te voegen, wilt u rijen bijwerken waar de persoon id al bestaat en de nieuwe rijen invoegen waar geen overeenkomende id aanwezig is. U kunt de volgende query uitvoeren:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Belangrijk
Slechts één rij uit de brontabel kan overeenkomen met een bepaalde rij in de doeltabel. In Databricks Runtime 16.0 en hoger evalueert MERGE de voorwaarden die zijn opgegeven in de WHEN MATCHED en ON clausules om dubbele overeenkomsten te bepalen. In Databricks Runtime 15.4 LTS en oudere versies worden alleen voorwaarden voor bewerkingen overwogen die zijn opgegeven in de MERGE clausule.
Zie de Delta Lake API-documentatie voor de details over de syntaxis van Scala en Python. Zie MERGE INTO voor meer informatie over de SQL-syntaxis
Alle niet-overeenkomende rijen wijzigen met behulp van samenvoegen
In Databricks SQL en Databricks Runtime 12.2 LTS en hoger kunt u de WHEN NOT MATCHED BY SOURCE component gebruiken voor UPDATE of DELETE records in de doeltabel zonder bijbehorende records in de brontabel. Databricks raadt aan een optionele voorwaardelijke component toe te voegen om te voorkomen dat de doeltabel volledig wordt herschreven.
In het volgende codevoorbeeld ziet u de basissyntaxis van het gebruik hiervan voor verwijderingen, waarbij de doeltabel wordt overschreven met de inhoud van de brontabel en niet-overeenkomende records in de doeltabel worden verwijderd. Zie Incrementeel Delta-tabel synchroniseren met bron voor een meer schaalbaar patroon voor tabellen waarin bronupdates en verwijderingen tijdsgebonden zijn.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
In het volgende voorbeeld worden voorwaarden aan de WHEN NOT MATCHED BY SOURCE component toegevoegd en worden waarden opgegeven die moeten worden bijgewerkt in niet-overeenkomende doelrijen.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Samenvoegoperatiesemantiek
Hier volgt een gedetailleerde beschrijving van de semantiek van de merge programmatische bewerking.
Er kan een willekeurig aantal
whenMatchedenwhenNotMatchedclausules zijn.whenMatchedclausules worden uitgevoerd wanneer een bronrij overeenkomt met een doelrij op basis van de overeenkomstvoorwaarde. Deze clausules hebben de volgende semantiek.whenMatchedcomponenten kunnen maximaal éénupdateen ééndeleteactie hebben. Metupdatede actie wordenmergealleen de opgegeven kolommen (vergelijkbaar met deupdatebewerking) van de overeenkomende doelrij bijgewerkt. Metdeletede actie wordt de overeenkomende rij verwijderd.Elke
whenMatchedcomponent kan een optionele voorwaarde hebben. Als deze componentvoorwaarde bestaat, wordt deupdateofdeleteactie alleen uitgevoerd voor een overeenkomend brondoelrijpaar wanneer aan de componentvoorwaarde wordt voldaan.Als er meerdere
whenMatchedcomponenten zijn, worden ze geëvalueerd in de volgorde waarin ze zijn opgegeven. AllewhenMatchedcomponenten, met uitzondering van de laatste, moeten voorwaarden hebben.Als geen van de
whenMatchedvoorwaarden waar is voor een bron- en doelrijpaar dat overeenkomt met de samenvoegvoorwaarde, blijft de doelrij ongewijzigd.Als u alle kolommen van de doel-Delta-tabel wilt bijwerken met de bijbehorende kolommen van de brongegevensset, gebruikt u
whenMatched(...).updateAll(). Dit komt overeen met:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))voor alle kolommen van de doel-Delta-tabel. Daarom gaat deze actie ervan uit dat de brontabel dezelfde kolommen heeft als die in de doeltabel, anders genereert de query een analysefout.
Notitie
Dit gedrag verandert wanneer automatische schemaontwikkeling is ingeschakeld. Zie de automatische schemaontwikkeling voor meer informatie.
whenNotMatchedclausules worden uitgevoerd wanneer een bronrij niet overeenkomt met een doelrij op basis van een overeenkomstvoorwaarde. Deze clausules hebben de volgende semantiek.whenNotMatchedclausules kunnen alleen deinsertactie hebben. De nieuwe rij wordt gegenereerd op basis van de opgegeven kolom en bijbehorende expressies. U hoeft niet alle kolommen in de doeltabel op te geven. Voor niet-opgegeven doelkolommen wordtNULLingevoegd.Elke
whenNotMatchedcomponent kan een optionele voorwaarde hebben. Als de clausulevoorwaarde aanwezig is, wordt er alleen een bronrij ingevoegd als die voorwaarde waar is voor die rij. Anders wordt de bronkolom genegeerd.Als er meerdere
whenNotMatchedcomponenten zijn, worden ze geëvalueerd in de volgorde waarin ze zijn opgegeven. AllewhenNotMatchedcomponenten, met uitzondering van de laatste, moeten voorwaarden hebben.Als u alle kolommen van de doel-Delta-tabel wilt invoegen met de bijbehorende kolommen van de brongegevensset, gebruikt u
whenNotMatched(...).insertAll(). Dit komt overeen met:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))voor alle kolommen van de doel-Delta-tabel. Daarom gaat deze actie ervan uit dat de brontabel dezelfde kolommen heeft als die in de doeltabel, anders genereert de query een analysefout.
Notitie
Dit gedrag verandert wanneer automatische schemaontwikkeling is ingeschakeld. Zie de automatische schemaontwikkeling voor meer informatie.
whenNotMatchedBySourceclausules worden uitgevoerd wanneer een doelrij niet overeenkomt met een bronrij op basis van de samengaanvoorwaarde. Deze clausules hebben de volgende semantiek.-
whenNotMatchedBySourceclausules kunnendeleteenupdateacties specificeren. - Elke
whenNotMatchedBySourcecomponent kan een optionele voorwaarde hebben. Als de clausulevoorwaarde aanwezig is, wordt een doelrij alleen aangepast als die voorwaarde waar is voor die rij. Anders blijft de doelrij ongewijzigd. - Als er meerdere
whenNotMatchedBySourcecomponenten zijn, worden ze geëvalueerd in de volgorde waarin ze zijn opgegeven. AllewhenNotMatchedBySourcecomponenten, met uitzondering van de laatste, moeten voorwaarden hebben. - Clausules hebben per definitie
whenNotMatchedBySourcegeen bronrij om kolomwaarden uit op te halen, waardoor er niet naar bronkolommen kan worden verwezen. Voor elke kolom die moet worden gewijzigd, kunt u een letterlijke waarde opgeven of een actie uitvoeren op de doelkolom, zoalsSET target.deleted_count = target.deleted_count + 1.
-
Belangrijk
- Een
mergebewerking kan mislukken als meerdere rijen van de brongegevensset overeenkomen en de samenvoegbewerking probeert dezelfde rijen van de delta-doeltabel bij te werken. Volgens de SQL-semantiek van samenvoegen is een dergelijke updatebewerking dubbelzinnig omdat het onduidelijk is welke bronrij moet worden gebruikt om de overeenkomende doelrij bij te werken. U kunt de brontabel vooraf verwerken om te voorkomen dat er meerdere overeenkomsten zijn. - U kunt een SQL-bewerking
MERGEalleen toepassen op een SQL-WEERGAVE als de weergave is gedefinieerd alsCREATE VIEW viewName AS SELECT * FROM deltaTable.
Gegevensontdubbeling bij het schrijven naar Delta-tabellen
Een veelvoorkomend ETL-gebruiksvoorbeeld is het verzamelen van logboeken in de Delta-tabel door ze toe te voegen aan een tabel. Vaak kunnen de bronnen echter dubbele logboekrecords genereren en zijn ontdubbelingsstappen in een latere fase nodig om ze aan te pakken. Met mergekunt u voorkomen dat u de dubbele records invoegt.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Notitie
De gegevensset met de nieuwe logboeken moet op zichzelf worden ontdubbeld. Door de SQL-semantiek van samenvoegen worden de nieuwe gegevens gematcht en ontdubbeld met de bestaande gegevens in de tabel, maar als er dubbele gegevens in de nieuwe gegevensset zijn, wordt deze ingevoegd. Ontdubbel de nieuwe gegevens daarom voordat u deze samenvoegt in de tabel.
Als u weet dat u slechts een paar dagen dubbele records krijgt, kunt u de query verder optimaliseren door de tabel op datum te partitioneren en vervolgens het datumbereik van de doeltabel op te geven waarop moet worden vergeleken.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Dit is efficiënter dan de vorige opdracht, omdat er alleen in de afgelopen zeven dagen aan logboeken naar duplicaten wordt gezocht, niet de hele tabel. Bovendien kunt u deze invoegbewerking alleen gebruiken met Structured Streaming om continue ontdubbeling van de logboeken uit te voeren.
- In een streamingquery kunt u de samenvoegbewerking
foreachBatchgebruiken om continu streaminggegevens naar een Delta-tabel te schrijven met ontdubbeling. Zie het volgende streamingvoorbeeld voor meer informatie overforeachBatch. - In een andere streamingquery kunt u continu ontdubbelde gegevens uit deze Delta-tabel lezen. Dit is mogelijk omdat met een samenvoegbewerking alleen nieuwe gegevens worden toegevoegd aan de Delta-tabel.
Langzaam veranderende gegevens (SCD) en gegevensveranderingsdetectie (CDC) met Delta Lake
Lakeflow Spark-declaratieve pijplijnen biedt systeemeigen ondersteuning voor het bijhouden en toepassen van SCD-type 1 en Type 2. Gebruik AUTO CDC ... INTO in combinatie met declaratieve pipelines van Lakeflow Spark om ervoor te zorgen dat gegevens buiten volgorde correct worden afgehandeld bij het verwerken van CDC-feeds. Zie de AUTO CDC-API's: Het vastleggen van wijzigingsgegevens vereenvoudigen met pijplijnen.
Delta-tabel incrementeel synchroniseren met bron
In Databricks SQL en Databricks Runtime 12.2 LTS en hoger kunt u met WHEN NOT MATCHED BY SOURCE willekeurige voorwaarden maken om op atomaire wijze een deel van een tabel te verwijderen en te vervangen. Dit kan met name handig zijn wanneer u een brontabel hebt waarin records enkele dagen na de initiële gegevensinvoer kunnen worden gewijzigd of verwijderd, maar uiteindelijk een definitieve status hebben.
In de volgende query ziet u hoe u dit patroon gebruikt om vijf dagen records uit de bron te selecteren, overeenkomende records in het doel bij te werken, nieuwe records van de bron in te voegen aan het doel en alle niet-overeenkomende records uit de afgelopen 5 dagen in het doel te verwijderen.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Door hetzelfde Booleaanse filter op te geven voor de bron- en doeltabellen, kunt u wijzigingen van uw bron dynamisch doorgeven aan doeltabellen, inclusief verwijderingen.
Notitie
Hoewel dit patroon kan worden gebruikt zonder voorwaardelijke componenten, zou dit leiden tot het volledig herschrijven van de doeltabel die duur kan zijn.