Udostępnij przez


Stwórz proces katalogu Unity, klonując proces metasklepu Hive

W tym artykule opisano żądanie clone a pipeline w interfejsie API REST usługi Databricks oraz sposób jego użycia do kopiowania istniejącego potoku danych publikowanego w magazynie metadanych Hive do nowego potoku danych publikowanego w Unity Catalog. Po wywołaniu clone a pipeline żądania:

  • Kopiuje kod źródłowy i konfigurację z istniejącego potoku do nowego, stosując wszelkie określone nadpisania konfiguracji.
  • Aktualizuje definicje zmaterializowanego widoku i tabeli strumieniowej oraz odwołania z wymaganymi zmianami, aby te obiekty były zarządzane przez Unity Catalog.
  • Uruchamia aktualizację potoku, aby przeprowadzić migrację istniejących danych i metadanych, takich jak punkty kontrolne, dotyczących wszystkich tabel strumieniowych w tym potoku. Dzięki temu te tabele przesyłania strumieniowego mogą wznowić przetwarzanie w tym samym punkcie co oryginalny potok.

Po zakończeniu operacji klonowania zarówno oryginalne, jak i nowe potoki mogą być uruchamiane niezależnie.

Ten artykuł zawiera przykłady bezpośredniego wywoływania żądania API oraz wykonania skryptu w języku Python w notesie usługi Databricks.

Zanim rozpoczniesz

Przed sklonowaniem pipeline wymagane są następujące elementy:

  • Aby sklonować potok metadanych Hive, tabele i widoki zdefiniowane w tym potoku muszą zostać opublikowane w schemacie docelowym. Aby dowiedzieć się, jak dodać schemat docelowy do potoku, zobacz Konfigurowanie potoku do publikowania w magazynie metadanych Hive.

  • Odwołania do zarządzanych tabel lub widoków magazynu metadanych Hive w potoku do klonowania muszą być w pełni kwalifikowane przy użyciu wykazu (hive_metastore), schematu i nazwy tabeli. Na przykład w poniższym kodzie tworzącym customers zestaw danych, argument nazwy tabeli musi zostać zaktualizowany do elementu hive_metastore.sales.customers:

    @dp.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • Nie edytuj kodu źródłowego potoku metadanych Hive, gdy trwa operacja klonowania, w tym notesów skonfigurowanych jako część potoku oraz wszystkich modułów przechowywanych w folderach Git lub plikach obszaru roboczego.

  • Źródłowy potok magazynu metadanych Hive nie może być uruchomiony przed rozpoczęciem operacji klonowania. Jeśli aktualizacja jest uruchomiona, zatrzymaj ją lub zaczekaj na jej ukończenie.

Poniżej przedstawiono inne ważne zagadnienia przed sklonowaniem potoku:

  • Jeśli tabele w potoku magazynu metadanych Hive określ lokalizację magazynu przy użyciu argumentu path w języku Python lub LOCATION w języku SQL, przekaż "pipelines.migration.ignoreExplicitPath": "true" konfigurację do żądania klonowania. Ustawienie tej konfiguracji jest zawarte w poniższych instrukcjach.
  • Jeśli potok metadanych Hive zawiera źródło Auto Loader, które określa wartość cloudFiles.schemaLocation opcji, a potok metadanych Hive pozostanie operacyjny po utworzeniu klonu Unity Catalog, należy ustawić opcję mergeSchema na true zarówno w potoku magazynu metadanych Hive, jak i w sklonowanym potoku Unity Catalog. Dodanie tej opcji do potoku magazynu metadanych Hive przed sklonowaniem spowoduje skopiowanie opcji do nowego potoku.

Klonowanie potoku za pomocą Databricks REST API

W poniższym przykładzie polecenie curl jest używane do wywołania żądania clone a pipeline w interfejsie API REST usługi Databricks.

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

Replace:

  • <personal-access-token> z osobistym tokenem dostępu usługi Databricks.
  • <databricks-instance> z nazwą instancji obszaru roboczego usługi Azure Databricks, na przykład adb-1234567890123456.7.azuredatabricks.net
  • <pipeline-id> z unikatowym identyfikatorem potoku magazynu metadanych Hive do sklonowania. Identyfikator potoku można znaleźć w interfejsie użytkownika potoków.

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

Replace:

  • <target-catalog-name> z nazwą katalogu w Unity Catalog, do którego powinien zostać opublikowany nowy pipeline. Musi to być istniejący wykaz.
  • <target-schema-name> z nazwą schematu w katalogu Unity Catalog, do którego powinien zostać opublikowany nowy potok, o ile różni się od bieżącej nazwy schematu. Ten parametr jest opcjonalny i jeśli nie zostanie określony, używana jest istniejąca nazwa schematu.
  • <new-pipeline-name> z opcjonalną nazwą nowego ciągu technologicznego. Jeśli nie zostanie określony, nowy potok zostanie nazwany przy użyciu nazwy potoku źródłowego z dołączonym [UC] .

clone_mode określa tryb do użycia dla operacji klonowania. MIGRATE_TO_UC jest jedyną obsługiwaną opcją.

Użyj pola configuration, aby ustawić konfiguracje w nowym pipeline. Wartości ustawione tutaj zastępują konfiguracje w oryginalnym procesie danych.

Odpowiedź z żądania interfejsu clone API REST to ID nowego potoku katalogu Unity.

Sklonuj potok w notesie usługi Databricks

Poniższy przykład wywołuje create a pipeline żądanie ze skryptu języka Python. Możesz użyć notatnika usługi Databricks, aby uruchomić ten skrypt.

  1. Utwórz nowy notes dla skryptu. Zobacz Tworzenie notesu.
  2. Skopiuj następujący skrypt języka Python do pierwszej komórki notesu.
  3. Zaktualizuj wartości symboli zastępczych w skrypcie, zastępując następujące wartości:
    • <databricks-instance> z nazwą instancji obszaru roboczego usługi Azure Databricks, na przykład adb-1234567890123456.7.azuredatabricks.net
    • <pipeline-id> z unikatowym identyfikatorem potoku magazynu metadanych Hive do sklonowania. Identyfikator potoku można znaleźć w interfejsie użytkownika potoków.
    • <target-catalog-name> z nazwą katalogu w Unity Catalog, do którego powinien zostać opublikowany nowy pipeline. Musi to być istniejący wykaz.
    • <target-schema-name> z nazwą schematu w katalogu Unity Catalog, do którego powinien zostać opublikowany nowy potok, o ile różni się od bieżącej nazwy schematu. Ten parametr jest opcjonalny i jeśli nie zostanie określony, używana jest istniejąca nazwa schematu.
    • <new-pipeline-name> z opcjonalną nazwą nowego ciągu technologicznego. Jeśli nie zostanie określony, nowy potok zostanie nazwany przy użyciu nazwy potoku źródłowego z dołączonym [UC] .
  4. Uruchom skrypt. Zobacz Uruchamianie notesów usługi Databricks.
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS

    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

Ograniczenia

Poniżej przedstawiono ograniczenia żądania API Lakeflow Spark Deklaratywnych Potoków clone a pipeline:

  • Tylko klonowanie z potoku skonfigurowanego do używania magazynu metadanych Hive do potoku Unity Catalog jest obsługiwane.
  • Klon można utworzyć tylko w tym samym obszarze roboczym usługi Azure Databricks co potok, z którego klonujesz.
  • Potok, który klonujesz, może obejmować tylko następujące źródła strumieniowe:
  • Jeśli potok Hive metastore, który klonujesz, używa trybu powiadamiania o plikach Auto Loader, Databricks zaleca nie uruchamiać potoku Hive metastore po sklonowaniu. Dzieje się tak dlatego, że uruchomienie procesu potoku metastore Hive powoduje usunięcie niektórych zdarzeń powiadomień o plikach z klonu Katalogu Unity. Jeśli źródłowy potok magazynu metadanych Hive działa po zakończeniu operacji klonowania, możesz wypełnić brakujące pliki, korzystając z Auto Loadera z opcją cloudFiles.backfillInterval. Aby dowiedzieć się więcej o trybie powiadomień plików w Auto Loader, zobacz Konfigurowanie strumieni w trybie powiadomień plików w Auto Loader. Aby dowiedzieć się więcej na temat wypełniania plików za pomocą Auto Loader, zapoznaj się z Wyzwalanie zwykłych wypełnień przy użyciu opcji cloudFiles.backfillInterval i Common Auto Loader options.
  • Zadania konserwacji potoku są automatycznie wstrzymywane dla obu potoków podczas klonowania.
  • Poniższe zasady dotyczą zapytań o podróże w czasie dotyczące tabel w sklonowanym potoku Katalogu Unity.
    • Jeśli wersja tabeli została pierwotnie zapisana w zarządzanym obiekcie Hive Metastore, zapytania dotyczące podróży w czasie z użyciem klauzuli timestamp_expression nie są zdefiniowane podczas wykonywania zapytań względem sklonowanego obiektu Unity Catalog.
    • Jednak jeśli wersja tabeli została zapisana w sklonowanym obiekcie Unity Catalog, zapytania dotyczące podróży w czasie przy użyciu klauzuli timestamp_expression działają poprawnie.
    • Zapytania podróży w czasie z wykorzystaniem klauzuli version działają poprawnie podczas wykonywania zapytań względem sklonowanego obiektu Unity Catalog, nawet jeśli wersja była pierwotnie zapisana w obiekcie zarządzanym przez metastore Hive.
  • Aby uzyskać informacje o innych ograniczeniach dotyczących używania deklaratywnych potoków Lakeflow Spark z Unity Catalog, zobacz Ograniczenia potoku Unity Catalog.
  • Aby uzyskać informacje o ograniczeniach Unity Catalog, zobacz Ograniczenia Unity Catalog.