Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Change Data Capture (CDC) ist ein Datenintegrationsmuster, das Änderungen an Daten in einem Quellsystem erfasst, z. B. Einfügungen, Aktualisierungen und Löschungen. Diese Änderungen, dargestellt als Liste, werden häufig als CDC-Feed bezeichnet. Sie können Ihre Daten viel schneller verarbeiten, wenn Sie mit einem CDC-Feed arbeiten, anstatt das gesamte Quelldatenset zu lesen. Transaktionsdatenbanken wie SQL Server, MySQL und Oracle generieren CDC-Feeds. Delta-Tabellen generieren ihren eigenen CDC-Feed, der als Änderungsdatenfeed (CDF) bezeichnet wird.
Das folgende Diagramm zeigt, dass beim Aktualisieren einer Zeile in einer Quelltabelle, die Mitarbeiterdaten enthält, ein neuer Satz von Zeilen in einem CDC-Feed generiert wird, der nur die Änderungen enthält. Jede Zeile des CDC-Feeds enthält in der Regel zusätzliche Metadaten, einschließlich des Vorgangs wie UPDATE z. B. und einer Spalte, die verwendet werden kann, um jede Zeile im CDC-Feed deterministisch zu ordnen, sodass Sie Out-of-Order-Updates verarbeiten können. Die sequenceNum-Spalte im folgenden Diagramm bestimmt die Reihenfolge der Zeilen im CDC-Feed.
Verarbeiten eines Änderungsdatenfeeds: Behalten Sie nur die neuesten Daten im Vergleich zu historischen Versionen von Daten bei
Die Verarbeitung eines geänderten Datenfeeds wird als langsam ändernde Dimensionen (SCD) bezeichnet. Wenn Sie einen CDC-Feed verarbeiten, haben Sie folgende Wahl:
- Behalten Sie nur die neuesten Daten bei (d. a. vorhandene Daten überschreiben)? Dies wird als SCD-Typ 1 bezeichnet.
- Oder behalten Sie einen Verlauf der Änderungen an den Daten bei? Dies wird als SCD-Typ 2 bezeichnet.
Die SCD-Typ 1-Verarbeitung umfasst das Überschreiben alter Daten mit neuen Daten, wenn eine Änderung erfolgt. Dies bedeutet, dass keine Historie der Änderungen beibehalten wird. Nur die neueste Version der Daten ist verfügbar. Es ist ein einfacher Ansatz und wird häufig verwendet, wenn der Verlauf von Änderungen nicht wichtig ist, z. B. Das Korrigieren von Fehlern oder das Aktualisieren nicht kritischer Felder wie Kunden-E-Mail-Adressen.
Die SCD Typ 2-Verarbeitung verwaltet einen historischen Datensatz von Datenänderungen, indem zusätzliche Datensätze erstellt werden, um unterschiedliche Versionen der Daten im Laufe der Zeit zu erfassen. Jede Version der Daten ist zeitstempelt oder mit Metadaten markiert, mit denen Benutzer nachverfolgen können, wann eine Änderung erfolgt ist. Dies ist nützlich, wenn es wichtig ist, die Entwicklung von Daten nachzuverfolgen, z. B. das Nachverfolgen von Kundenadressenänderungen im Laufe der Zeit für Analysezwecke.
Beispiele für die SCD-Typ 1- und Typ 2-Verarbeitung mit Lakeflow Spark Declarative Pipelines
In den Beispielen in diesem Abschnitt erfahren Sie, wie Sie SCD Type 1 und Type 2 verwenden.
Schritt 1: Vorbereiten von Beispieldaten
In diesem Beispiel generieren Sie einen CDC-Beispielfeed. Erstellen Sie zunächst ein Notizbuch, und fügen Sie den folgenden Code in das Notizbuch ein. Aktualisieren Sie die Variablen am Anfang des Codeblocks auf einen Katalog und ein Schema, in dem Sie über die Berechtigung zum Erstellen von Tabellen und Ansichten verfügen.
Dieser Code erstellt eine neue Delta-Tabelle, die mehrere Änderungsdatensätze enthält. Das Schema lautet wie folgt:
-
id- Ganzzahl, eindeutiger Bezeichner dieses Mitarbeiters -
name- String, Name des Mitarbeiters -
role- Zeichenfolge, Rolle des Mitarbeiters -
country- Zeichenfolge, Ländercode, wo Mitarbeiter arbeiten -
operation- Änderungstyp (z. B.INSERT,UPDATE, oderDELETE) -
sequenceNum- Ganze Zahl, identifiziert die logische Reihenfolge von CDC-Ereignissen in den Quelldaten. Lakeflow Spark Declarative Pipelines verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die außerhalb der Reihenfolge eingehen.
# update these to the catalog and schema where you have permissions
# to create tables and views.
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
def write_employees_cdf_to_delta():
data = [
(1, "Alex", "chef", "FR", "INSERT", 1),
(2, "Jessica", "owner", "US", "INSERT", 2),
(3, "Mikhail", "security", "UK", "INSERT", 3),
(4, "Gary", "cleaner", "UK", "INSERT", 4),
(5, "Chris", "owner", "NL", "INSERT", 6),
# out of order update, this should be dropped from SCD Type 1
(5, "Chris", "manager", "NL", "UPDATE", 5),
(6, "Pat", "mechanic", "NL", "DELETE", 8),
(6, "Pat", "mechanic", "NL", "INSERT", 7)
]
columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")
write_employees_cdf_to_delta()
Sie können diese Daten mithilfe des folgenden SQL-Befehls in der Vorschau anzeigen:
SELECT *
FROM mycatalog.myschema.employees_cdf
Schritt 2: Verwenden des SCD-Typs 1, um nur die neuesten Daten beizubehalten
Es wird empfohlen, die AUTO CDC API in einer Lakeflow Spark Declarative Pipelines zu verwenden, um einen Änderungsdatenfeed in eine SCD-Typ 1-Tabelle zu verarbeiten.
- Erstellen eines neuen Notebooks.
- Fügen Sie den folgenden Code in ihn ein.
- Erstellen und Herstellen einer Verbindung mit einer Pipeline
Die employees_cdf Funktion liest die Tabelle, die wir soeben als Datenstrom erstellt haben, da die create_auto_cdc_flow API, die Sie für die Verarbeitung der Änderungsdatenerfassung verwenden werden, einen Datenstrom von Änderungen als Eingabe erwartet. Sie umschließen es mit einem Dekorateur @dp.temporary_view, da Sie diesen Datenfluss nicht in eine Tabelle materialisieren möchten.
Anschließend verwenden Sie dp.create_target_table, um eine Streamingtabelle zu erstellen, die das Ergebnis der Verarbeitung dieses Änderungsdatenfeeds enthält.
Schließlich verwenden Sie dp.create_auto_cdc_flow für die Verarbeitung des Änderungsdatenfeeds. Sehen wir uns jedes Argument an:
-
target– Die Zielstreamingtabelle, die Sie zuvor definiert haben. -
source– Die Ansicht über den Datenstrom von Änderungsdatensätzen, die Sie zuvor definiert haben. -
keys– Identifiziert eindeutige Zeilen im Änderungsfeed. Da Sieidals eindeutigen Bezeichner verwenden, stellen Sie einfach nuridals die einzige identifizierende Spalte bereit. -
sequence_by- Der Spaltenname, der die logische Reihenfolge von CDC-Ereignissen in den Quelldaten angibt. Sie benötigen diese Sequenzierung, um Änderungsereignisse zu behandeln, die außerhalb der Reihenfolge ankommen. Verwenden SiesequenceNumals Sequenzierungsspalte. -
apply_as_deletes- Da die Beispieldaten Löschvorgänge enthalten, verwenden Sieapply_as_deletes, um anzugeben, wann ein CDC-Ereignis alsDELETEanstelle eines Upserts behandelt werden soll. -
except_column_list- Enthält eine Liste von Spalten, die Sie nicht in die Zieltabelle aufnehmen möchten. In diesem Beispiel verwenden Sie dieses Argument, um auszuschließensequenceNumundoperation. -
stored_as_scd_type– Gibt den SCD-Typ an, den Sie verwenden möchten.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"
@dp.temporary_view
def employees_cdf():
return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")
dp.create_target_table(f"{catalog}.{schema}.{employees_table_current}")
dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_current}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
Führen Sie diese Pipeline aus, indem Sie auf "Start" klicken.
Führen Sie dann die folgende Abfrage im SQL-Editor aus, um zu überprüfen, ob die Änderungsdatensätze ordnungsgemäß verarbeitet wurden:
SELECT *
FROM mycatalog.myschema.employees_current
Hinweis
Das verfrühte Update für Mitarbeiter Chris wurde ordnungsgemäß gelöscht, da seine Rolle weiterhin auf "Besitzer" statt auf "Manager" festgelegt ist.
Schritt 3: Verwenden von SCD-Typ 2, um verlaufsgeschichtliche Daten beizubehalten
In diesem Beispiel erstellen Sie eine zweite Zieltabelle namens employees_historical", die einen vollständigen Verlauf der Änderungen an Mitarbeiterdatensätzen enthält.
Fügen Sie diesen Code zu Ihrer Pipeline hinzu. Der einzige Unterschied hier ist, dass stored_as_scd_type anstelle von 1 auf 2 festgelegt ist.
dp.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")
dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_historical}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 2
)
Führen Sie diese Pipeline aus, indem Sie auf "Start" klicken.
Führen Sie dann die folgende Abfrage im SQL-Editor aus, um zu überprüfen, ob die Änderungsdatensätze ordnungsgemäß verarbeitet wurden:
SELECT *
FROM mycatalog.myschema.employees_historical
Sie sehen alle Änderungen an Mitarbeitern, einschließlich der Mitarbeiter, die gelöscht wurden, z. B. Pat.
Schritt 4: Bereinigen von Ressourcen
Wenn Sie fertig sind, bereinigen Sie Ressourcen, indem Sie die folgenden Schritte ausführen:
Löschen Sie die Pipeline:
Hinweis
Wenn Sie die Pipeline löschen, werden die Tabellen
employeesundemployees_historicalautomatisch gelöscht.- Klicken Sie auf Aufträge und Pipelines, und suchen Sie dann den Namen der zu löschenden Pipeline.
- Klicken Sie auf das
Klicken Sie in derselben Zeile wie der Pipelinename, und klicken Sie dann auf "Löschen".
Löschen Sie das Notizbuch.
Löschen Sie die Tabelle, die den Änderungsdatenfeed enthält:
- Klicken Sie auf "Neue > Abfrage".
- Fügen Sie den folgenden SQL-Code ein und führen Sie ihn aus, indem Sie den Katalog und das Schema entsprechend anpassen.
DROP TABLE mycatalog.myschema.employees_cdf
Nachteile der Verwendung MERGE INTO und foreachBatch für die Änderungsdatenerfassung
Databricks stellt einen MERGE INTO SQL-Befehl bereit, den Sie mit der foreachBatch API verwenden können, um Zeilen in eine Delta-Tabelle zu upsert. In diesem Abschnitt wird erläutert, wie diese Technik für einfache Anwendungsfälle verwendet werden kann, aber diese Methode wird immer komplexer und zerbrechlicher, wenn sie auf reale Szenarien angewendet wird.
In diesem Beispiel verwenden Sie denselben Beispieländerungsdatenfeed, der in den vorherigen Beispielen verwendet wird.
Naive Implementierung mit MERGE INTO und foreachBatch
Erstellen Sie ein Notizbuch, und kopieren Sie den folgenden Code in das Notizbuch. Ändern Sie die Variablen catalog, schema und employees_table entsprechend. Die Variablen catalog und schema sollten auf Speicherorte im Unity-Katalog festgelegt werden, wo Sie Tabellen erstellen können.
Wenn Sie das Notizbuch ausführen, wird Folgendes ausgeführt:
- Erstellt die Zieltabelle in der
create_table. Im Gegensatz zumcreate_auto_cdc_flowautomatischen Behandeln dieses Schritts müssen Sie das Schema angeben. - Liest den Änderungsdatenfeed als Datenstrom. Jeder Mikrobatch wird mithilfe der
upsertToDeltaMethode verarbeitet, die einenMERGE INTOBefehl ausführt.
catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"
def upsertToDelta(microBatchDF, batchId):
microBatchDF.createOrReplaceTempView("updates")
microBatchDF.sparkSession.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING)
""")
create_table()
cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")
cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()
Führen Sie die folgende SQL-Abfrage aus, um die Ergebnisse anzuzeigen:
SELECT *
FROM mycatalog.myschema.employees_merge
Leider sind die Ergebnisse falsch, wie gezeigt:
Mehrere Aktualisierungen desselben Schlüssels in derselben Mikrobatch
Das erste Problem besteht darin, dass der Code nicht mehrere Aktualisierungen desselben Schlüssels in derselben Mikrobatch behandelt. Sie verwenden INSERT z. B. zum Einfügen des Mitarbeiters Chris und anschließendes Aktualisieren ihrer Rolle von Besitzer zu Manager. Dies sollte zu einer Zeile führen, aber stattdessen gibt es zwei Zeilen.
Welche Änderung gewinnt, wenn mehrere Updates für denselben Schlüssel in einem Mikrobatch vorhanden sind?
Die Logik wird komplexer. Das folgende Codebeispiel ruft die neueste Zeile anhand von sequenceNum ab und führt nur diese Daten wie folgt in die Zieltabelle zusammen:
- Gruppiert nach dem Primärschlüssel,
id. - Nimmt alle Spalten der Zeile, die im Batch für diesen Schlüssel den maximalen Wert von
sequenceNumhat. - Explodiert die Zeile wieder aus.
Aktualisieren Sie die upsertToDelta Methode wie folgt, und führen Sie dann den Code aus:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Wenn Sie die Zieltabelle abfragen, sehen Sie, dass der Mitarbeiter namens Chris die richtige Rolle hat, aber es gibt noch weitere Probleme zu lösen, da Sie weiterhin gelöschte Datensätze in der Zieltabelle angezeigt haben.
Out-of-Order-Updates über Mikrobatches hinweg
In diesem Abschnitt wird das Problem von nicht ordnungsgemäßen Aktualisierungen bei Mikrobatches untersucht. Das folgende Diagramm veranschaulicht das Problem: Was geschieht, wenn die Zeile für Chris einen UPDATE Vorgang in der ersten Mikrobatch hat, gefolgt von einem INSERT in einem nachfolgenden Mikrobatch? Der Code behandelt dies nicht ordnungsgemäß.
Welche Änderung gewinnt, wenn es nicht geordnete Aktualisierungen desselben Schlüssels über mehrere Mikro-Batches hinweg gibt?
Um dies zu beheben, erweitern Sie den Code, um eine Version in jeder Zeile wie folgt zu speichern:
- Speichern Sie den Zeitpunkt der
sequenceNumletzten Aktualisierung einer Zeile. - Überprüfen Sie für jede neue Zeile, ob der Zeitstempel größer als der gespeicherte ist, und wenden Sie dann die folgende Logik an:
- Wenn größer, verwenden Sie die neuen Daten aus dem Ziel.
- Behalten Sie andernfalls die Daten in der Quelle bei.
Aktualisieren Sie zunächst die createTable Methode, um die sequenceNum Methode zu speichern, da Sie sie für die Versionsverwaltung für jede Zeile verwenden:
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING, sequenceNum INT)
""")
Aktualisieren Sie upsertToDelta, um Zeilenversionen zu verarbeiten. Die UPDATE SET Klausel von MERGE INTO muss jede Spalte separat behandeln.
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Behandeln von Löschvorgangen
Leider hat der Code noch ein Problem. Es behandelt DELETE keine Vorgänge, wie dadurch belegt wird, dass der Mitarbeiter Pat noch in der Zieltabelle ist.
Nehmen wir an, dass Löschungen im gleichen Mikrobatch eintreffen. Um sie zu behandeln, aktualisieren Sie die upsertToDelta Methode erneut, um die Zeile zu löschen, wenn der Änderungsdatensatz den Löschvorgang wie folgt anzeigt:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Behandeln von Updates, die nach Löschungen außer der Reihenfolge eintreffen.
Leider ist der obige Code immer noch nicht ganz korrekt, da er Fälle nicht behandelt, in denen ein DELETE von einem UPDATE folgt, der über Mikrochargen hinweg außerhalb der Reihenfolge ist.
Der Algorithmus zur Bearbeitung dieses Falls muss sich an Löschungen erinnern, damit er nachfolgende Updates in falscher Reihenfolge verarbeiten kann. Um dies zu tun:
- Anstatt Zeilen sofort zu löschen, entfernen Sie sie vorübergehend mit einem Zeitstempel oder
sequenceNum. Vorläufig gelöschte Zeilen werden versteinert. - Leiten Sie alle Ihre Benutzer zu einer Ansicht um, die Grabsteine herausfiltert.
- Erstellen Sie einen Bereinigungsauftrag, der die Grabsteine im Laufe der Zeit entfernt.
Verwenden Sie den folgenden Code:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Ihre Benutzer können die Zieltabelle nicht direkt verwenden. Erstellen Sie daher eine Ansicht, die sie abfragen können:
CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL
Erstellen Sie schließlich einen Bereinigungsauftrag, der in regelmäßigen Abständen versteinerte Zeilen entfernt:
DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY