Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym artykule opisano sposób przenoszenia tabel przesyłania strumieniowego i zmaterializowanych widoków między potokami. Po przeniesieniu przepływu, potok, do którego został przeniesiony przepływ, aktualizuje tabelę, a nie oryginalny potok. Jest to przydatne w wielu scenariuszach, w tym:
- Podziel duży rurociąg na mniejsze.
- Połącz wiele rurek w jedną większą.
- Zmień częstotliwość odświeżania niektórych tabel w pipeline.
- Przenieś tabele z potoku, który używa starszego trybu publikowania do domyślnego trybu publikowania. Aby uzyskać szczegółowe informacje na temat starszego trybu publikowania, zobacz Starszy tryb publikowania dla potoków. Aby zobaczyć, jak można migrować tryb publikowania dla całego potoku jednocześnie, zobacz sekcję Włączanie domyślnego trybu publikowania w potoku.
- Przenoszenie tabel między potokami w różnych obszarach roboczych.
Requirements
Poniżej przedstawiono wymagania dotyczące przenoszenia tabeli między potokami.
Podczas uruchamiania
ALTER ...polecenia należy użyć środowiska Databricks Runtime 16.3 lub nowszego, a środowisko Databricks Runtime 17.2 umożliwia przenoszenie tabeli między obszarami roboczymi.Zarówno potoki źródłowe, jak i docelowe muszą spełniać następujące warunki:
- Należące do konta użytkownika usługi Azure Databricks lub jednostki usługi uruchamiającego operację
- W środowiskach roboczych współużytkujących metasklep. Aby sprawdzić magazyn metadanych, zobacz
current_metastorefunkcja.
Potok docelowy musi używać domyślnego trybu publikowania. Dzięki temu można publikować tabele w wielu katalogach i schematach.
Alternatywnie, oba wersje muszą używać starszego trybu publikowania i mieć tę samą wartość katalogu oraz docelową w ustawieniach. Aby uzyskać informacje na temat starszego trybu publikowania, zobacz LIVE schema (starsza wersja).
Uwaga / Notatka
Ta funkcja nie obsługuje przenoszenia potoku przy użyciu domyślnego trybu publikowania do innego potoku przy użyciu starszego trybu publikowania.
Przenieś tabelę między potokami
Poniższe instrukcje opisują sposób przenoszenia tabeli strumieniowej lub widoku materializowanego z jednego potoku do drugiego.
Zatrzymaj potok źródłowy, jeśli jest uruchomiony. Poczekaj na całkowite zatrzymanie.
Usuń definicję tabeli z kodu potoku źródłowego i zapisz ją w innym miejscu na potrzeby przyszłego odwołania.
Uwzględnij wszelkie pomocnicze zapytania lub kod, który jest wymagany do prawidłowego działania potoku.
W notesie lub edytorze SQL uruchom następujące polecenie SQL, aby ponownie przypisać tabelę z potoku źródłowego do potoku docelowego:
ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name> SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");Należy pamiętać, że polecenie SQL musi być uruchamiane z obszaru roboczego potoku źródłowego.
Polecenie używa
ALTER MATERIALIZED VIEWdla zarządzanych zmaterializowanych widoków katalogu Unity orazALTER STREAMING TABLEdla tabel przesyłania strumieniowego. Aby wykonać tę samą akcję w tabeli magazynu metadanych Hive, użyj poleceniaALTER TABLE.Jeśli na przykład chcesz przenieść tabelę przesyłania strumieniowego o nazwie
salesdo potoku o identyfikatorzeabcd1234-ef56-ab78-cd90-1234efab5678, uruchom następujące polecenie:ALTER STREAMING TABLE sales SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");Uwaga / Notatka
Musi
pipelineIdbyć prawidłowym identyfikatorem potoku. Wartośćnulljest niedozwolona.Dodaj definicję tabeli do kodu potoku docelowego.
Uwaga / Notatka
Jeśli wykaz lub schemat docelowy różnią się między źródłem i miejscem docelowym, kopiowanie zapytania może nie działać. Częściowo kwalifikowane tabele w definicji mogą być rozpoznawane inaczej. Podczas przechodzenia na pełne kwalifikowanie nazw tabel może być konieczne zaktualizowanie definicji.
Uwaga / Notatka
Usuń lub oznacz jako komentarz wszelkie przepływy dodawane jednorazowo (w Pythonie zapytania z append_flow(once=True), w SQL zapytania z użyciemINSERT INTO ONCE) z kodu potoku docelowego. Aby uzyskać więcej informacji, zobacz Ograniczenia.
Przeniesienie zostało ukończone. Teraz możesz uruchomić zarówno potoki źródłowe, jak i docelowe. Potok docelowy aktualizuje tabelę.
Rozwiązywanie problemów
W poniższej tabeli opisano błędy, które mogą wystąpić podczas przenoszenia tabeli między pipeline'ami.
| Error | Description |
|---|---|
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE |
Potok źródłowy jest w domyślnym trybie publikowania, a miejsce docelowe używa trybu schematu LIVE (starsza wersja). Nie jest to obsługiwane. Jeśli źródło używa domyślnego trybu publikowania, miejsce docelowe również musi. |
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE |
Obsługiwane jest tylko przenoszenie tabel między potokami. Przenoszenie tabel przesyłania strumieniowego i zmaterializowanych widoków utworzonych za pomocą usługi Databricks SQL nie jest obsługiwane. |
DESTINATION_PIPELINE_NOT_FOUND |
Element pipelines.pipelineId musi być prawidłowym potokiem. Parametr pipelineId nie może mieć wartości null. |
| Nie można zaktualizować tabeli w miejscu docelowym po przeniesieniu. | Aby szybko rozwiązać ten problem, przenieś tabelę z powrotem do potoku źródłowego, stosując te same instrukcje. |
PIPELINE_PERMISSION_DENIED_NOT_OWNER |
Zarówno przepływy źródłowe, jak i docelowe muszą być własnością użytkownika wykonującego operację transferu. |
TABLE_ALREADY_EXISTS |
Tabela wymieniona w komunikacie o błędzie już istnieje. Może się tak zdarzyć, jeśli tabela zapasowa potoku już istnieje. W tym przypadku DROP tabela wymieniona w błędzie. |
Przykład z wieloma tabelami w pipeline
Potoki mogą zawierać więcej niż jedną tabelę. Można nadal przenosić jedną tabelę naraz między potokami. W tym scenariuszu istnieją trzy tabele (table_a, table_b, table_c), które odczytują się nawzajem sekwencyjnie w źródłowym potoku przetwarzania. Chcemy przenieść jedną tabelę, table_b, do innego potoku.
Początkowy kod potoku źródłowego:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Table to be moved to new pipeline:
@dp.table
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
Przesuwamy table_b do innego potoku, kopiujemy i usuwamy definicję tabeli ze źródła oraz aktualizujemy table_b identyfikator potoku.
Najpierw wstrzymaj wszystkie harmonogramy i poczekaj na ukończenie aktualizacji zarówno w potokach źródłowych, jak i docelowych. Następnie zmodyfikuj potok źródłowy, aby usunąć kod dla przenoszonej tabeli. Przykładowy kod dla zaktualizowanego potoku źródłowego staje się:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Removed, to be in new pipeline:
# @dp.table
# def table_b():
# return (
# spark.read.table("table_a")
# .select(col("column1"), col("column2"))
# )
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
Przejdź do edytora SQL, aby uruchomić ALTER pipelineId polecenie .
ALTER MATERIALIZED VIEW table_b
SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");
Następnie przejdź do potoku docelowego i dodaj definicję table_b. Jeśli domyślny katalog i schemat są takie same w ustawieniach potoku, nie są wymagane żadne zmiany kodu.
Docelowy kod potoku:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="table_b")
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
Jeśli domyślny wykaz i schemat różnią się od tych określonych w ustawieniach potoku, należy dodać w pełni kwalifikowaną nazwę, korzystając z wykazu i schematu potoku.
Na przykład docelowy kod pipeline może wyglądać tak:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
return (
spark.read.table("source_catalog.source_schema.table_a")
.select(col("column1"), col("column2"))
)
Uruchom (lub ponownie włącz harmonogramy) dla potoków źródłowych i docelowych.
Rurociągi są teraz rozłączne. Zapytanie dla table_c odczytuje z table_b (teraz w potoku docelowym) i table_b odczytuje z table_a (w potoku źródłowym). Jeśli wykonywanie wyzwalane w potoku table_b źródłowym nie zostanie zaktualizowane, ponieważ nie jest już zarządzane przez potok źródłowy. Potok źródłowy traktuje table_b jako tabelę zewnętrzną dla potoku. Jest to porównywalne do definiowania zmaterializowanego widoku z tabeli Delta w Unity Catalog, który nie jest zarządzany przez potok.
Ograniczenia
Poniżej przedstawiono ograniczenia dotyczące przenoszenia tabel między potokami.
- Zmaterializowane widoki i tabele przesyłania strumieniowego utworzone za pomocą usługi Databricks SQL nie są obsługiwane.
- Przepływy dołączane raz — append_flow(once=True) w Pythonie oraz INSERT INTO ONCE w SQL — nie są obsługiwane. Ich stan uruchomienia nie jest zachowywany. Mogą być uruchamiane ponownie w potoku docelowym. Usuń lub wykomentuj przepływy dodawane jednokrotnie z potoku docelowego, aby uniknąć ich ponownego uruchamiania.
- Prywatne tabele lub widoki nie są obsługiwane.
- Potoki źródłowe i docelowe muszą być potokami. Przepływy danych o wartości null nie są obsługiwane.
- Potoki źródłowe i docelowe muszą znajdować się w tym samym obszarze roboczym lub w różnych obszarach roboczych współużytkujących ten sam magazyn metadanych.
- Zarówno potoki źródłowe, jak i docelowe muszą być własnością użytkownika uruchamiającego operację przenoszenia.
- Jeśli potok źródłowy używa domyślnego trybu publikowania, potok docelowy musi również używać domyślnego trybu publikowania. Nie można przenieść tabeli z potoku, który używa domyślnego trybu publikowania, do potoku korzystającego ze schematu LIVE (starsza wersja). Zobacz LIVE schema (legacy).
- Jeśli źródłowe i docelowe rurociągi używają schematu LIVE (starsza wersja), muszą mieć te same wartości
catalogitargetw ustawieniach.