Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cet article décrit la clone a pipeline demande dans l’API REST Databricks et comment vous pouvez l’utiliser pour copier un pipeline existant qui publie dans le metastore Hive dans un nouveau pipeline qui publie dans le catalogue Unity. Lorsque vous appelez la requête clone a pipeline, elle :
- Copie le code source et la configuration du pipeline existant vers un nouveau pipeline, en appliquant les remplacements de configuration que vous avez spécifiés.
- Met à jour les définitions et références de vue matérialisée et de table de streaming avec les modifications requises pour que ces objets soient gérés par Unity Catalog.
- Démarre une mise à jour du pipeline pour migrer les données et métadonnées existantes, telles que les points de contrôle, pour toutes les tables de streaming du pipeline. Cela permet à ces tables de streaming de reprendre le traitement au même point que le pipeline d’origine.
Une fois l’opération de clonage terminée, le pipeline d’origine et le nouveau pipeline peuvent s’exécuter indépendamment.
Cet article inclut des exemples d’appel de la requête d’API directement et via un script Python à partir d’un notebook Databricks.
Avant de commencer
Les éléments suivants sont requis avant de cloner un pipeline :
Pour cloner un pipeline de metastore Hive, les tables et vues définies dans le pipeline doivent publier les tables dans un schéma cible. Pour savoir comment ajouter un schéma cible à un pipeline, consultez Configurer un pipeline pour publier sur le metastore Hive.
Les références aux tables ou vues gérées par le metastore Hive dans le pipeline à cloner doivent être entièrement qualifiées avec le catalogue (
hive_metastore), le schéma et le nom de la table. Par exemple, dans le code suivant créant uncustomersjeu de données, l’argument de nom de table doit être mis à jour surhive_metastore.sales.customers:@dp.table def customers(): return spark.read.table("sales.customers").where(...)Ne modifiez pas le code source du pipeline de métastore Hive d'origine pendant qu'une opération de clonage est en cours, y compris les notebooks configurés dans le cadre du pipeline et tous les modules stockés dans des dossiers Git ou dans des fichiers de l'espace de travail.
Le pipeline de metastore Hive source ne doit pas s’exécuter lorsque vous démarrez l’opération de clonage. Si une mise à jour est en cours d’exécution, arrêtez-la ou attendez qu’elle se termine.
Voici d’autres considérations importantes avant de cloner un pipeline :
- Si les tables du pipeline du metastore Hive spécifient un emplacement de stockage à l’aide de l’argument
pathdans Python ouLOCATIONdans SQL, transmettez la"pipelines.migration.ignoreExplicitPath": "true"configuration à la requête clone. La définition de cette configuration est incluse dans les instructions ci-dessous. - Si le pipeline du metastore Hive inclut une source Auto Loader qui spécifie une valeur pour l’option
cloudFiles.schemaLocation, et que le pipeline du metastore Hive reste opérationnel après avoir créé le clone du catalogue Unity, vous devez définir l’optionmergeSchemasurtruedans les deux sur le pipeline du metastore Hive et sur le pipeline du catalogue Unity cloné. L’ajout de cette option au pipeline du metastore Hive avant le clonage copiera l’option dans le nouveau pipeline.
Cloner un pipeline avec l’API REST Databricks
L’exemple suivant utilise la curl commande pour appeler la clone a pipeline requête dans l’API REST Databricks :
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
Remplacez :
-
<personal-access-token>avec un jeton d’accès personnel Databricks. -
<databricks-instance>avec le nom de l’instance de l’espace de travail Azure Databricks, par exempleadb-1234567890123456.7.azuredatabricks.net -
<pipeline-id>avec l’identificateur unique du pipeline du metastore Hive à cloner. Vous trouverez l’ID de pipeline dans l’interface utilisateur des pipelines.
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
Remplacez :
-
<target-catalog-name>avec le nom d’un catalogue dans Unity Catalog où le nouveau pipeline publiera. Il doit s’agir d’un catalogue existant. -
<target-schema-name>avec le nom d'un schéma dans le catalogue Unity vers lequel le nouveau pipeline doit publier, s'il est différent du nom du schéma actuel. Ce paramètre est facultatif et, s’il n’est pas spécifié, le nom de schéma existant est utilisé. -
<new-pipeline-name>avec un nom facultatif pour la nouvelle chaîne de traitement. Si le nouveau pipeline n’est pas spécifié, il est nommé en ajoutant[UC]au nom du pipeline source.
clone_mode spécifie le mode à utiliser pour l’opération clone.
MIGRATE_TO_UC est la seule option prise en charge.
Utilisez le configuration champ pour spécifier des configurations sur le nouveau pipeline. Les valeurs définies ici remplacent les configurations dans le pipeline d’origine.
La réponse de la demande d’API clone REST est l’ID de pipeline du nouveau pipeline Unity Catalog.
Cloner un pipeline à partir d’un notebook Databricks
L’exemple suivant appelle la create a pipeline requête à partir d’un script Python. Vous pouvez utiliser un notebook Databricks pour exécuter ce script :
- Créez un bloc-notes pour le script. Consultez Création d’un notebook.
- Copiez le script Python suivant dans la première cellule du notebook.
- Mettez à jour les valeurs d’espace réservé dans le script en remplaçant :
-
<databricks-instance>avec le nom de l’instance de l’espace de travail Azure Databricks, par exempleadb-1234567890123456.7.azuredatabricks.net -
<pipeline-id>avec l’identificateur unique du pipeline du metastore Hive à cloner. Vous trouverez l’ID de pipeline dans l’interface utilisateur des pipelines. -
<target-catalog-name>avec le nom d’un catalogue dans Unity Catalog où le nouveau pipeline publiera. Il doit s’agir d’un catalogue existant. -
<target-schema-name>avec le nom d'un schéma dans le catalogue Unity vers lequel le nouveau pipeline doit publier, s'il est différent du nom du schéma actuel. Ce paramètre est facultatif et, s’il n’est pas spécifié, le nom de schéma existant est utilisé. -
<new-pipeline-name>avec un nom facultatif pour la nouvelle chaîne de traitement. Si le nouveau pipeline n’est pas spécifié, il est nommé en ajoutant[UC]au nom du pipeline source.
-
- Exécutez le script. Consultez Run Databricks notebooks.
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
Limites
Voici les limitations de la requête API des Pipelines déclaratifs clone a pipeline Spark Lakeflow :
- Seul le clonage à partir d’un pipeline configuré pour utiliser le metastore Hive vers un pipeline de catalogue Unity est pris en charge.
- Vous pouvez créer un clone uniquement dans le même espace de travail Azure Databricks que le pipeline à partir duquel vous clonez.
- Le pipeline que vous clonez ne peut inclure que les sources de diffusion en continu suivantes :
- Sources delta
- Chargeur automatique, y compris toutes les sources de données prises en charge par le chargeur automatique. Consultez Charger des fichiers à partir du stockage d’objets cloud.
- Apache Kafka avec Structured Streaming. Toutefois, la source Kafka ne peut pas être configurée pour utiliser l’option
kafka.group.id. Consultez Traitement par flux avec Apache Kafka et Azure Databricks. - Amazon Kinesis avec Structured Streaming. Toutefois, la source Kinesis ne peut pas être configurée pour définir
consumerModesurefo.
- Si le pipeline de metastore Hive que vous clonez utilise le mode de notification de fichier chargeur automatique, Databricks recommande de ne pas exécuter le pipeline du metastore Hive après le clonage. Cela est dû au fait que l’exécution du pipeline du metastore Hive entraîne la suppression de certains événements de notification de fichier du clone Unity Catalog. Si le pipeline du metastore Hive source s’exécute après que l’opération de clonage est terminée, vous pouvez restaurer les fichiers manquants à l’aide d’Auto Loader avec l’option
cloudFiles.backfillInterval. Pour en savoir plus sur le mode de notification de fichier chargeur automatique, consultez Configurer des flux de chargeur automatique en mode de notification de fichier. Pour en savoir plus sur le remplissage de fichiers avec le chargeur automatique, consultez Déclencher des remplissages réguliers à l’aide de cloudFiles.backfillInterval et des options courantes du chargeur automatique. - Les tâches de maintenance de pipeline sont automatiquement suspendues pour les deux pipelines pendant que le clonage est en cours.
- L’exemple suivant s’applique aux requêtes de déplacement temporel sur des tables dans le pipeline de catalogue Unity cloné :
- Si une version de table a été initialement écrite dans un objet managé de metastore Hive, les requêtes de voyage dans le temps à l’aide d’une
timestamp_expressionclause ne sont pas définies lors de l’interrogation de l’objet Unity Catalog cloné. - Toutefois, si la version de la table a été écrite dans l’objet catalog Unity cloné, les requêtes de déplacement du temps à l’aide d’une
timestamp_expressionclause fonctionnent correctement. - Les requêtes de voyage dans le temps utilisant une clause
versionfonctionnent correctement lors de l’interrogation d’un objet cloné du Unity Catalog, même lorsque la version a été initialement écrite dans l’objet géré par le metastore Hive.
- Si une version de table a été initialement écrite dans un objet managé de metastore Hive, les requêtes de voyage dans le temps à l’aide d’une
- Pour d’autres limitations lors de l’utilisation de pipelines déclaratifs Spark Lakeflow avec le catalogue Unity, consultez les limitations du pipeline catalogue Unity.
- Pour connaître les limitations du catalogue Unity, consultez les limitations du catalogue Unity.