Compartir a través de


Creación de una canalización de Catálogo de Unity mediante la clonación de una canalización de metastore de Hive

En este artículo se describe la clone a pipeline solicitud en la API REST de Databricks y cómo se puede usar para copiar una canalización existente que se publica en la tienda de metadatos de Hive en una nueva canalización que se publica en el catálogo de Unity. Cuando llamas a clone a pipeline, sucede lo siguiente:

  • Copia el código fuente y la configuración desde la canalización existente a una nueva, aplicando cualquier sobrescritura de configuración que haya especificado.
  • Actualiza las definiciones de vistas materializadas y tablas de transmisión, así como las referencias, con las modificaciones necesarias para que esos objetos sean administrados por Unity Catalog.
  • Inicia una actualización de canalización para migrar los datos y metadatos existentes, como los puntos de control, para las tablas de streaming de la canalización. Esto permite que esas tablas de streaming reanuden el procesamiento en el mismo punto que la canalización original.

Una vez completada la operación de clonación, tanto las canalizaciones originales como las nuevas se pueden ejecutar de forma independiente.

En este artículo se incluyen ejemplos de llamada a la solicitud de API directamente y a través de un script de Python desde un cuaderno de Databricks.

Antes de empezar

Se requieren lo siguiente antes de clonar una canalización:

  • Para clonar una canalización de metastore de Hive, las tablas y vistas definidas en la canalización deben publicar tablas en un esquema de destino. Para obtener información sobre cómo agregar un esquema de destino a una canalización, consulte Configuración de una canalización para publicar en metastore de Hive.

  • Las referencias a las tablas o vistas administradas de Hive metastore en la canalización para clonar deben estar completamente cualificadas con el catálogo (hive_metastore), el esquema y el nombre de la tabla. Por ejemplo, en el código siguiente que crea un customers conjunto de datos, el argumento de nombre de tabla debe actualizarse a hive_metastore.sales.customers:

    @dp.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • No edite el código fuente de la canalización de metastore de Hive de origen mientras una operación de clonación está en curso, incluidos los cuadernos configurados como parte de la canalización y los módulos almacenados en carpetas de Git o archivos del área de trabajo.

  • La canalización del metastore de Hive de origen no debe estar en ejecución cuando inicies la operación de clonación. Si se está ejecutando una actualización, deténgala o espere a que se complete.

A continuación se muestran otras consideraciones importantes antes de clonar una canalización:

  • Si las tablas de la canalización de metastore de Hive especifican una ubicación de almacenamiento mediante el path argumento en Python o LOCATION en SQL, pase la configuración "pipelines.migration.ignoreExplicitPath": "true" a la solicitud de clonación. La configuración de esta configuración se incluye en las instrucciones siguientes.
  • Si la canalización de metastore de Hive incluye un origen de Auto Loader que especifica un valor para la opción cloudFiles.schemaLocation, y la canalización de metastore de Hive permanecerá operativa después de crear el clon del catálogo de Unity, debe establecer la opción mergeSchema a true tanto en la canalización de metastore de Hive como en la canalización del catálogo de Unity clonada. Agregar esta opción a la canalización del metastore de Hive antes de clonar copiará la opción a la nueva canalización.

Clonación de una canalización con la API REST de Databricks

En el ejemplo siguiente se usa el curl comando para llamar a la clone a pipeline solicitud en la API REST de Databricks:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

Reemplazar:

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

Reemplazar:

  • <target-catalog-name> con el nombre de un catálogo en Unity Catalog donde se debe publicar la nueva canalización. Debe ser un catálogo existente.
  • <target-schema-name> con el nombre de un esquema en el Catálogo de Unity al que debería publicarse la nueva canalización si es distinto al nombre de esquema actual. Este parámetro es opcional y, si no se especifica, se usa el nombre de esquema existente.
  • <new-pipeline-name> con un nombre opcional para la nueva canalización. Si no se especifica, la nueva canalización se denomina mediante el nombre de canalización de origen con [UC] anexado.

clone_mode especifica el modo que se va a usar para la operación de clonación. MIGRATE_TO_UC es la única opción admitida.

Use el configuration campo para especificar configuraciones en la nueva canalización. Los valores establecidos aquí invalidan las configuraciones de la canalización original.

La respuesta de la solicitud de API REST clone es el identificador de la canalización del nuevo catálogo de Unity.

Clonación de una canalización desde un cuaderno de Databricks

En el siguiente ejemplo se llama a la solicitud create a pipeline desde un script en Python. Puede usar un cuaderno de Databricks para ejecutar este script:

  1. Cree un cuaderno para el script. Consulte Creación de un cuaderno.
  2. Copie el siguiente script de Python en la primera celda del cuaderno.
  3. Actualice los valores de marcador de posición en el script reemplazándolos por:
    • <databricks-instance> con el nombre de la instancia del área de trabajo de Azure Databricks, por ejemplo adb-1234567890123456.7.azuredatabricks.net
    • <pipeline-id> con el identificador único de la canalización de metastore de Hive que se va a clonar. Puede encontrar el identificador de canalización en la interfaz de usuario de canalizaciones.
    • <target-catalog-name> con el nombre de un catálogo en Unity Catalog donde se debe publicar la nueva canalización. Debe ser un catálogo existente.
    • <target-schema-name> con el nombre de un esquema en el Catálogo de Unity al que debería publicarse la nueva canalización si es distinto al nombre de esquema actual. Este parámetro es opcional y, si no se especifica, se usa el nombre de esquema existente.
    • <new-pipeline-name> con un nombre opcional para la nueva canalización. Si no se especifica, la nueva canalización se denomina mediante el nombre de canalización de origen con [UC] anexado.
  4. Ejecute el script. Consulte Ejecución de cuadernos de Databricks.
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS

    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

Limitaciones

A continuación se muestran las limitaciones de la solicitud de API de canalizaciones declarativas clone a pipeline de Spark de Lakeflow:

  • Solo se admite la clonación desde una canalización configurada para usar el metastore de Hive a una canalización de Catálogo de Unity.
  • Puede crear un clon solo en el mismo área de trabajo de Azure Databricks que la canalización desde la que va a clonar.
  • La canalización que va a clonar solo puede incluir los siguientes orígenes de streaming:
  • Si la canalización de metastore de Hive que va a clonar utiliza el modo de notificación de archivos de Auto Loader, Databricks recomienda no ejecutar la canalización de metastore de Hive después de la clonación. Esto se debe a que la ejecución de la canalización de metastore de Hive produce la eliminación de algunos eventos de notificación de archivos del clon del catálogo de Unity. Si la canalización de metastore de Hive de origen se ejecuta una vez completada la operación de clonación, puede rellenar los archivos que faltan mediante Auto Loader con la opción cloudFiles.backfillInterval. Para obtener información sobre el modo de notificación de archivos del cargador automático, consulte Configuración de flujos de cargador automático en modo de notificación de archivos. Para obtener información sobre los archivos de reposición con autocargador, consulte Desencadenamiento de rerrellenamientos normales mediante cloudFiles.backfillInterval y opciones comunes del cargador automático.
  • Las tareas de mantenimiento de canalización se pausan automáticamente para ambas canalizaciones mientras la clonación está en curso.
  • Lo siguiente se aplica a las consultas de desplazamiento en el tiempo en las tablas de la canalización clonada del catálogo de Unity:
    • Si una versión de tabla se escribió originalmente en un objeto administrado de metastore de Hive, las consultas de viaje en el tiempo que usan una cláusula timestamp_expression no están definidas al consultar el objeto Unity Catalog clonado.
    • Sin embargo, si la versión de la tabla se escribió en el objeto Unity Catalog clonado, las consultas de retroceso en el tiempo que usan una timestamp_expression cláusula funcionan correctamente.
    • Las consultas temporales que utilizan una version sentencia funcionan correctamente cuando se consulta un objeto del Catálogo de Unity clonado, incluso cuando la versión originalmente se escribió en el objeto administrado del metastore de Hive.
  • Para conocer otras limitaciones al usar canalizaciones declarativas de Spark de Lakeflow con el catálogo de Unity, consulte Limitaciones de canalización del catálogo de Unity.
  • Para conocer las limitaciones del catálogo de Unity, consulte Limitaciones del catálogo de Unity.