Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Vous pouvez exécuter un pipeline dans le cadre d'un flux de traitement des données avec Lakeflow Jobs, Apache Airflow ou Azure Data Factory.
Jobs
Vous pouvez orchestrer plusieurs tâches dans un travail Databricks pour implémenter un workflow de traitement des données. Pour inclure un pipeline dans un travail, utilisez la tâche pipeline lorsque vous créez un travail. Consultez Tâche de pipeline pour les travaux.
Apache Airflow
Apache Airflow est une solution open source pour la gestion et la planification des flux de travail de données. Airflow représente les flux de travail sous la forme de graphes orientés acycliques (DAGs) d'opérations. Vous définissez un flux de travail dans un fichier Python et Airflow gère la planification et l’exécution. Pour plus d’informations sur l’installation et l’utilisation d’Airflow avec Azure Databricks, consultez Orchestrer des travaux Lakeflow avec Apache Airflow.
Pour exécuter un pipeline dans le cadre d’un flux de travail Airflow, utilisez DatabricksSubmitRunOperator.
Spécifications
Les éléments suivants sont nécessaires pour utiliser la prise en charge Airflow pour les pipelines déclaratifs Lakeflow Spark :
- Airflow version 2.1.0 ou ultérieure.
- Package du fournisseur Databricks version 2.1.0 ou ultérieure.
Example
L’exemple suivant crée un DAG Airflow qui déclenche une mise à jour pour le pipeline avec l’identificateur 8279d543-063c-4d63-9926-dae38e35ce8b:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('ldp',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Remplacez CONNECTION_ID par l’identificateur pour une connexion Airflow à votre espace de travail.
Enregistrez cet exemple dans le airflow/dags répertoire et utilisez l’interface utilisateur Airflow pour afficher et déclencher le DAG. Utilisez l’interface utilisateur du pipeline pour afficher les détails de la mise à jour du pipeline.
Azure Data Factory.
Note
Lakeflow Spark Declarative Pipelines et Azure Data Factory incluent chacune des options permettant de configurer le nombre de nouvelles tentatives en cas de défaillance. Si les valeurs de nouvelle tentative sont configurées sur votre pipeline et sur l’activité Azure Data Factory qui appelle le pipeline, le nombre de nouvelles tentatives est la valeur de nouvelle tentative Azure Data Factory multipliée par la valeur de nouvelle tentative du pipeline.
Par exemple, si une mise à jour de pipeline échoue, les Pipelines Déclaratifs Spark Lakeflow réessayent la mise à jour jusqu’à cinq fois par défaut. Si la nouvelle tentative d’Azure Data Factory est définie sur trois et que votre pipeline utilise la valeur par défaut de cinq nouvelles tentatives, votre pipeline défaillant peut être retenté jusqu’à quinze fois. Pour éviter des tentatives excessives lorsque les mises à jour du pipeline échouent, Databricks recommande de limiter le nombre de nouvelles tentatives lors de la configuration du pipeline ou de l’activité Azure Data Factory qui appelle le pipeline.
Pour modifier la configuration de nouvelle tentative pour votre pipeline, utilisez le pipelines.numUpdateRetryAttempts paramètre lors de la configuration du pipeline.
Azure Data Factory est un service ETL basé sur le cloud qui vous permet d’orchestrer les flux de travail d’intégration et de transformation des données. Azure Data Factory prend directement en charge l'exécution de tâches Azure Databricks dans un flux de travail, notamment les notebooks, les tâches JAR et les scripts Python. Vous pouvez également inclure un pipeline dans un flux de travail en appelant l’API REST du pipeline à partir d’une activité web Azure Data Factory. Par exemple, pour déclencher une mise à jour de pipeline à partir d’Azure Data Factory :
Créez une fabrique de données ou ouvrez une fabrique de données existante.
Une fois la création terminée, ouvrez la page de votre fabrique de données et cliquez sur la vignette Ouvrir Azure Data Factory Studio . L’interface utilisateur Azure Data Factory s’affiche.
Créez un pipeline Azure Data Factory en sélectionnant Pipeline dans le menu déroulant Nouveau dans l’interface utilisateur d’Azure Data Factory Studio.
Dans la boîte à outils Activités , développez Général et faites glisser l’activité Web vers le canevas de pipeline. Cliquez sur l’onglet Paramètres et entrez les valeurs suivantes :
Note
En guise de bonne pratique de sécurité, lorsque vous vous authentifiez avec des outils, des systèmes, des scripts et des applications automatisés, Databricks vous recommande d’utiliser des jetons d’accès personnels appartenant aux principaux de service au lieu des utilisateurs de l’espace de travail. Pour créer des jetons pour les principaux de service, consultez Gérer les jetons d’un principal de service.
URL :
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.Remplacez
<get-workspace-instance>.Remplacez
<pipeline-id>par l’identificateur de pipeline.Méthode : sélectionnez POST dans le menu déroulant.
En-têtes : cliquez sur + Nouveau. Dans la zone de texte Nom, entrez
Authorization. Dans la zone de texte Valeur , entrezBearer <personal-access-token>.Remplacez
<personal-access-token>par un jeton d’accès personnel Azure Databricks.Corps : pour passer des paramètres de requête supplémentaires, entrez un document JSON contenant les paramètres. Par exemple, pour démarrer une mise à jour et retraiter toutes les données du pipeline :
{"full_refresh": "true"}. S’il n’existe aucun paramètre de requête supplémentaire, entrez des accolades vides ({}).
Pour tester l’activité web, cliquez sur Déboguer dans la barre d’outils du pipeline dans l’interface utilisateur de Data Factory. La sortie et l’état de l’exécution, y compris les erreurs, sont affichés sous l’onglet Sortie du pipeline Azure Data Factory. Utilisez l’interface utilisateur des pipelines pour afficher les détails de la mise à jour du pipeline.
Conseil / Astuce
Une exigence courante de flux de travail consiste à démarrer une tâche après l’achèvement d’une tâche précédente. Étant donné que la demande de pipeline updates est asynchrone , la requête retourne après avoir démarré la mise à jour, mais avant la fin de la mise à jour, les tâches de votre pipeline Azure Data Factory avec une dépendance sur la mise à jour du pipeline doivent attendre la fin de la mise à jour. Une option permettant d’attendre la fin de la mise à jour consiste à ajouter une activité Until après l’activité Web qui déclenche la mise à jour des pipelines déclaratifs Spark Lakeflow. Dans l’activité Until :
- Ajoutez une activité d’attente pour attendre un nombre configuré de secondes pour la fin de la mise à jour.
- Ajoutez une activité web après l’activité Wait qui utilise la demande de détails de mise à jour du pipeline pour obtenir l’état de la mise à jour. Le
statechamp de la réponse retourne l’état actuel de la mise à jour, y compris s’il est terminé. - Utilisez la valeur du
statechamp pour définir la condition de fin de l’activité Until. Vous pouvez également utiliser une activité Set Variable pour ajouter une variable de pipeline en fonction de lastatevaleur et utiliser cette variable pour la condition de fin.