Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Przechwytywanie zmian danych (CDC) to wzorzec integracji danych, który przechwytuje zmiany wprowadzone w systemie źródłowym, takie jak wstawianie, aktualizacje i usuwanie. Te zmiany, reprezentowane jako lista, są często określane jako źródło danych CDC. Dane można przetwarzać znacznie szybciej, jeśli pracujesz na kanale informacyjnym CDC, zamiast odczytywać cały źródłowy zestaw danych. Transakcyjne bazy danych, takie jak SQL Server, MySQL i Oracle, generują źródła danych CDC. Tabele Delta generują własny strumień danych CDC, znany jako strumień zmian danych (CDF).
Na poniższym diagramie pokazano, że gdy wiersz w tabeli źródłowej zawierającej dane pracownika zostanie zaktualizowany, wygeneruje nowy zestaw wierszy w kanale informacyjnym CDC zawierającym tylko zmiany. Każdy wiersz kanału informacyjnego CDC zazwyczaj zawiera dodatkowe metadane, w tym operację, taką jak UPDATE i kolumnę, która może służyć do określania kolejności każdego wiersza w kanale informacyjnym CDC, dzięki czemu można obsługiwać aktualizacje poza kolejnością. Na przykład kolumna w poniższym diagramie sequenceNum określa kolejność wierszy w strumieniu danych CDC.
Przetwarzanie zestawienia danych zmian: zachowaj tylko najnowsze dane, a nie zachowaj historycznych wersji danych
Przetwarzanie zmienionego źródła danych jest nazywane powolnym zmienianiem wymiarów (SCD). Podczas przetwarzania kanału CDC musisz dokonać wyboru:
- Czy przechowujesz tylko najnowsze dane (czyli zastępowanie istniejących danych)? Jest to nazywane typem SCD 1.
- Czy też zachowasz historię zmian danych? Jest to nazywane typem SCD 2.
Przetwarzanie typu 1 protokołu SCD obejmuje zastępowanie starych danych nowymi danymi za każdym razem, gdy nastąpi zmiana. Oznacza to, że nie jest przechowywana żadna historia zmian. Dostępna jest tylko najnowsza wersja danych. Jest to proste podejście i jest często używane, gdy historia zmian nie jest ważna, takich jak poprawianie błędów lub aktualizowanie pól niekrytycznych, takich jak adresy e-mail klientów.
Przetwarzanie typu 2 protokołu SCD utrzymuje historyczny rekord zmian danych przez utworzenie dodatkowych rekordów w celu przechwycenia różnych wersji danych w czasie. Każda wersja danych jest oznaczona znacznikiem czasu lub oznakowana metadanymi, które umożliwiają użytkownikom śledzenie, kiedy nastąpiła zmiana. Jest to przydatne, gdy ważne jest śledzenie ewolucji danych, takich jak śledzenie zmian adresów klientów w czasie na potrzeby analizy.
Przykłady przetwarzania typu SCD 1 i typu SCD 2 za pomocą potoków deklaratywnych Lakeflow i Spark
W przykładach w tej sekcji pokazano, jak używać typu SCD 1 i typu 2.
Krok 1. Przygotowywanie przykładowych danych
W tym przykładzie wygenerujesz przykładowe źródło danych CDC. Najpierw utwórz notes i wklej do niego następujący kod. Zaktualizuj zmienne na początku bloku kodu do katalogu i schematu, w którym masz uprawnienia do tworzenia tabel i widoków.
Ten kod tworzy nową tabelę delty zawierającą kilka rekordów zmian. Schemat jest następujący:
-
id- Liczba całkowita, unikatowy identyfikator tego pracownika -
name- Łańcuch znaków, nazwa pracownika -
role- Ciąg znaków, rola pracownika -
country- Ciąg, kod kraju, w którym pracownik pracuje -
operation- Zmień typ (na przykład ,INSERT,UPDATElubDELETE) -
sequenceNum- Liczba całkowita, która identyfikuje logiczną kolejność zdarzeń CDC w danych źródłowych. Lakeflow Spark Deklaratywne Potoki używają tego sekwencjonowania do obsługi zdarzeń zmiany, które przychodzą w niewłaściwej kolejności.
# update these to the catalog and schema where you have permissions
# to create tables and views.
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
def write_employees_cdf_to_delta():
data = [
(1, "Alex", "chef", "FR", "INSERT", 1),
(2, "Jessica", "owner", "US", "INSERT", 2),
(3, "Mikhail", "security", "UK", "INSERT", 3),
(4, "Gary", "cleaner", "UK", "INSERT", 4),
(5, "Chris", "owner", "NL", "INSERT", 6),
# out of order update, this should be dropped from SCD Type 1
(5, "Chris", "manager", "NL", "UPDATE", 5),
(6, "Pat", "mechanic", "NL", "DELETE", 8),
(6, "Pat", "mechanic", "NL", "INSERT", 7)
]
columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")
write_employees_cdf_to_delta()
Możesz wyświetlić podgląd tych danych przy użyciu następującego polecenia SQL:
SELECT *
FROM mycatalog.myschema.employees_cdf
Krok 2. Użyj typu SCD 1, aby zachować tylko najnowsze dane
Zalecamy używanie interfejsu AUTO CDC API w potokach deklaratywnych Lakeflow Spark do przetwarzania strumienia danych zmian w tabeli SCD Typu 1.
- Utwórz nowy notes.
- Wklej do niego następujący kod.
- Utwórz potok i nawiąż z nim połączenie.
Funkcja employees_cdf odczytuje tabelę, którą właśnie utworzyliśmy powyżej, jako strumień, ponieważ create_auto_cdc_flow interfejs API, którego użyjesz do przetwarzania danych w celu przechwytywania zmian oczekuje strumienia zmian jako wejścia. Owijasz go dekoratorem @dp.temporary_view, ponieważ nie chcesz przekształcać tego strumienia w tabelę.
Następnie użyj polecenia dp.create_target_table, aby utworzyć tabelę strumieniową zawierającą wynik przetwarzania tego strumienia danych zmiany.
Na koniec używasz dp.create_auto_cdc_flow do przetwarzania strumienia danych o zmianach. Przyjrzyjmy się każdemu argumentowi:
-
target— docelowa tabela przesyłania strumieniowego, która została wcześniej zdefiniowana. -
source— Widok strumienia rekordów zmian zdefiniowanych wcześniej. -
keys- Identyfikuje unikatowe wiersze w strumieniu zmian. Ponieważ używaszidjako unikatowego identyfikatora, wystarczy podaćidjako jedyną kolumnę identyfikującą. -
sequence_by- Nazwa kolumny, która określa kolejność logiczną zdarzeń CDC w danych źródłowych. Ta sekwencjonowanie jest potrzebne do obsługi zdarzeń zmiany, które docierają w niewłaściwej kolejności. PodajsequenceNumjako kolumnę sekwencjonowania. -
apply_as_deletes— Ponieważ przykładowe dane zawierają operacje usuwania, należy użyćapply_as_deletespolecenia , aby wskazać, kiedy zdarzenie CDC powinno być traktowane jakoDELETEa nie upsert. -
except_column_list— Zawiera listę kolumn, które nie mają być uwzględniane w tabeli docelowej. W tym przykładzie użyjesz tego argumentu do wykluczeniasequenceNumioperation. -
stored_as_scd_type— wskazuje typ SCD, którego chcesz użyć.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"
@dp.temporary_view
def employees_cdf():
return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")
dp.create_target_table(f"{catalog}.{schema}.{employees_table_current}")
dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_current}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
Uruchom ten potok, klikając Uruchom.
Następnie uruchom następujące zapytanie w edytorze SQL, aby sprawdzić, czy rekordy zmian zostały prawidłowo przetworzone:
SELECT *
FROM mycatalog.myschema.employees_current
Uwaga / Notatka
Aktualizacja nietrzymana w kolejce dla pracownika Chrisa została pominięta poprawnie, ponieważ jego rola jest nadal ustawiona na Właściciel zamiast Menedżer.
Krok 3. Przechowywanie danych historycznych przy użyciu typu SCD 2
W tym przykładzie utworzysz drugą tabelę docelową o nazwie employees_historical, która zawiera pełną historię zmian w rekordach pracowników.
Dodaj ten kod do potoku przetwarzania danych. Jedyną różnicą jest to, że stored_as_scd_type ustawiono wartość 2 zamiast 1.
dp.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")
dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_historical}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 2
)
Uruchom ten potok, klikając Uruchom.
Następnie uruchom następujące zapytanie w edytorze SQL, aby sprawdzić, czy rekordy zmian zostały prawidłowo przetworzone:
SELECT *
FROM mycatalog.myschema.employees_historical
Zobaczysz wszystkie zmiany pracowników, w tym tych pracowników, którzy zostali usunięci, takich jak Pat.
Krok 4. Czyszczenie zasobów
Po zakończeniu wyczyść zasoby, wykonując następujące kroki:
Usuń potok danych
Uwaga / Notatka
Usunięcie potoku powoduje automatyczne usunięcie tabel
employeesiemployees_historical.- Kliknij pozycję Zadania i potoki, a następnie znajdź nazwę potoku do usunięcia.
- Kliknij
W tym samym wierszu co nazwa potoku kliknij Usuń.
Usuń notatnik.
Usuń tabelę zawierającą zestawienie danych zmian:
- Kliknij pozycję Nowe > zapytanie.
- Wklej i uruchom następujący kod SQL, dostosowując katalog i schemat odpowiednio:
DROP TABLE mycatalog.myschema.employees_cdf
Wady użycia MERGE INTO i foreachBatch w przechwytywaniu danych zmian
Usługa Databricks udostępnia polecenie SQL MERGE INTO, którego można używać za pomocą interfejsu API foreachBatch do uaktualniania wierszy w tabeli Delta. W tej sekcji opisano, jak ta technika może być używana w prostych przypadkach użycia, ale ta metoda staje się coraz bardziej złożona i krucha w przypadku zastosowania do rzeczywistych scenariuszy.
W tym przykładzie użyjesz tego samego przykładowego zestawienia danych zmian używanego w poprzednich przykładach.
Naiwna implementacja z MERGE INTO i foreachBatch
Utwórz notes i skopiuj do niego następujący kod.
catalogW razie potrzeby zmień zmienne , schemaiemployees_table. Zmienne catalog i schema powinny być ustawione na lokalizacje w katalogu Unity, gdzie można tworzyć tabele.
Po uruchomieniu notebooku wykonuje on następujące czynności:
- Tworzy tabelę docelową w elemencie
create_table. W przeciwieństwie docreate_auto_cdc_flowprogramu , który automatycznie obsługuje ten krok, należy określić schemat. - Odczytuje zestawienie danych zmian jako strumień. Każda mikropartia jest przetwarzana przy użyciu
upsertToDeltametody, która uruchamiaMERGE INTOpolecenie.
catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"
def upsertToDelta(microBatchDF, batchId):
microBatchDF.createOrReplaceTempView("updates")
microBatchDF.sparkSession.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING)
""")
create_table()
cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")
cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()
Aby wyświetlić wyniki, uruchom następujące zapytanie SQL:
SELECT *
FROM mycatalog.myschema.employees_merge
Niestety wyniki są niepoprawne, jak pokazano poniżej:
Wiele aktualizacji tych samych kluczy w tej samej mikropartii
Pierwszym problemem jest to, że kod nie obsługuje wielu aktualizacji tego samego klucza w tym samym mikrobajtach. Możesz użyć INSERT do wstawienia pracownika Chrisa, a następnie zaktualizować jego rolę z Właściciela na Menedżera. Powinno to spowodować jeden wiersz, ale zamiast tego istnieją dwa wiersze.
Która zmiana wygrywa, gdy jest wiele aktualizacji tego samego klucza w mikropartii?
Logika staje się bardziej złożona. Poniższy przykład kodu pobiera najnowszy wiersz według sequenceNum i scala tylko te dane z tabelą docelową w następujący sposób:
- Grupy według klucza podstawowego,
id. - Pobiera wszystkie kolumny dla wiersza, który ma najwyższą wartość
sequenceNumw zestawie dla tego klucza. - Eksploduje wiersz z powrotem.
Zaktualizuj metodę upsertToDelta , jak pokazano poniżej, a następnie uruchom kod:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Podczas wykonywania zapytania względem tabeli docelowej zobaczysz, że pracownik o nazwie Chris ma prawidłową rolę, ale nadal istnieją inne problemy do rozwiązania, ponieważ nadal usunięto rekordy wyświetlane w tabeli docelowej.
Aktualizacje poza kolejnością w mikropartiach
W tej sekcji omówiono problem aktualizacji poza kolejnością w mikropartiach. Na poniższym diagramie przedstawiono problem: co zrobić, jeśli wiersz Chris ma operację UPDATE w pierwszej mikropartii, a następnie INSERT w kolejnej mikropartii? Kod nie obsługuje tego poprawnie.
Która zmiana wygrywa w przypadku aktualizacji poza kolejnością tego samego klucza w wielu mikrobajtach?
Aby rozwiązać ten problem, rozwiń kod, aby przechowywać wersję w każdym wierszu w następujący sposób:
-
sequenceNumZapisz czas ostatniej aktualizacji wiersza. - Dla każdego nowego wiersza sprawdź, czy znacznik czasu nowego wiersza jest większy od przechowywanego, a następnie zastosuj następującą logikę:
- Jeśli wartość jest większa, użyj nowych danych z obiektu docelowego.
- W przeciwnym razie zachowaj dane w źródle.
Najpierw zaktualizuj metodę createTable do przechowywania sequenceNum , ponieważ użyjesz jej do obsługi wersji każdego wiersza:
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING, sequenceNum INT)
""")
Następnie zaktualizuj polecenie upsertToDelta , aby obsługiwać wersje wierszy. Klauzula UPDATE SETMERGE INTO musi obsługiwać każdą kolumnę oddzielnie.
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Obsługa usuwania
Niestety kod nadal ma problem. Nie obsługuje DELETE operacji, co świadczy o tym, że pracownik Pat jest nadal w tabeli docelowej.
Załóżmy, że usunięcia docierają do tego samego mikrobajta. Aby je obsłużyć, zaktualizuj metodę upsertToDelta ponownie, aby usunąć wiersz, gdy rekord zmiany danych wskazuje usunięcie, jak pokazano poniżej:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Zarządzanie aktualizacjami przychodzącymi poza kolejność po operacjach usuwania
Niestety, powyższy kod nadal nie jest do końca poprawny, ponieważ nie obsługuje przypadków, gdy po DELETE następuje UPDATE nie w kolejności w mikropartiach.
Algorytm w celu obsługi tego przypadku musi pamiętać o usunięciach, aby móc skutecznie obsługiwać kolejne aktualizacje poza kolejnością. Aby to zrobić:
- Zamiast natychmiast usuwać wiersze, wykonaj miękkie usunięcie, dodając znacznik czasu lub
sequenceNum. Wiersze usunięte nietrwale są nagrobowane. - Przekieruj wszystkich użytkowników do widoku, który filtruje grobowce.
- Skompiluj zadanie oczyszczania, które usuwa grobowce w czasie.
Użyj następującego kodu:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Użytkownicy nie mogą bezpośrednio używać tabeli docelowej, więc utwórz widok, który może wykonywać zapytania:
CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL
Na koniec utwórz zadanie oczyszczania, które okresowo usuwa oznaczone do usunięcia wiersze.
DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY