Partager via


Exécuter une mise à jour de flux

Cet article explique les mises à jour de pipeline et fournit des détails sur la façon de déclencher une mise à jour.

Qu’est-ce qu’une mise à jour de pipeline ?

Une fois que vous avez créé un pipeline et que vous êtes prêt à l’exécuter, vous démarrez une mise à jour. Une mise à jour de pipeline effectue les opérations suivantes :

  • démarre un cluster avec la bonne configuration ;
  • Découvre toutes les tables et vues définies et vérifie toutes les erreurs d’analyse telles que les noms de colonnes non valides, les dépendances manquantes et les erreurs de syntaxe.
  • crée ou met à jour les tables et vues avec les données disponibles les plus récentes.

À l’aide d’une exécution sèche, vous pouvez rechercher des problèmes dans le code source d’un pipeline sans attendre que les tables soient créées ou mises à jour. Cette fonctionnalité est utile lors du développement ou du test de pipelines, car elle vous permet de rechercher et de corriger rapidement des erreurs dans votre pipeline, telles que des noms de table ou de colonnes incorrects.

Comment les mises à jour de pipeline sont-elles déclenchées ?

Utilisez l’une des options suivantes pour démarrer les mises à jour du pipeline :

Déclencheur de mise à jour Détails
Manuel Vous pouvez déclencher manuellement des mises à jour de pipeline à partir de l’éditeur de pipelines Lakeflow ou de la liste des pipelines. Consultez Déclencher manuellement une mise à jour de pipeline.
Planifié Vous pouvez planifier des mises à jour pour les pipelines en utilisant des travaux. Consultez Tâche de pipeline pour les travaux.
Programmatic Vous pouvez déclencher des mises à jour par programmation à l'aide d'outils tiers, d'API et de CLI. Consultez Exécuter des pipelines dans un workflow et API REST de pipeline.

Déclencher manuellement une mise à jour du pipeline

Utilisez l’une des options suivantes pour déclencher manuellement une mise à jour de pipeline :

  • Exécutez le pipeline complet, ou un sous-ensemble du pipeline (un fichier source unique ou une table unique), à partir de l’éditeur de pipelines Lakeflow. Pour plus d’informations, consultez Exécuter le code du pipeline.
  • Exécutez le pipeline complet à partir de la liste Travaux et pipelines . Cliquez sur l’icône Lecture. Dans la même ligne que le pipeline de la liste.
  • Dans la page de surveillance du pipeline, cliquez sur le bouton Icône de démarrage LDP.

Note

Le comportement par défaut des mises à jour de pipeline déclenchées manuellement consiste à actualiser tous les jeux de données définis dans le pipeline.

Sémantique d’actualisation des pipelines

Le tableau suivant décrit le comportement d’actualisation par défaut, d’actualisation complète et de réinitialisation des points de contrôle pour les vues matérialisées et les tables de diffusion en continu :

Type de mise à jour Vue matérialisée Table de diffusion en continu
Actualiser (par défaut) Met à jour les résultats pour refléter les résultats actuels de la requête de définition. Examine les coûts et effectue une actualisation incrémentielle s’il est plus économique. Traite de nouveaux enregistrements par le biais de la logique définie dans les tables et flux de streaming.
Actualisation complète Met à jour les résultats pour refléter les résultats actuels de la requête de définition. Efface les données des tables de diffusion en continu, efface les informations d’état (points de contrôle) des flux et retraite tous les enregistrements de la source de données.
Réinitialiser les points de contrôle de flux de streaming Non applicable aux vues matérialisées. Efface les informations d’état (points de contrôle) des flux, mais ne efface pas les données des tables de diffusion en continu et retraite tous les enregistrements de la source de données.

Par défaut, toutes les vues matérialisées et les tables de streaming dans un pipeline sont actualisées avec chaque mise à jour. Vous pouvez éventuellement omettre les tables des mises à jour à l’aide des fonctionnalités suivantes :

Ces deux fonctionnalités prennent en charge la sémantique d’actualisation par défaut ou l’actualisation complète. Vous pouvez éventuellement utiliser la boîte de dialogue Sélectionner des tables pour actualiser pour exclure des tables supplémentaires lors de l’exécution d’une actualisation pour les tables ayant échoué.

Pour les tables de diffusion en continu, vous pouvez choisir d’effacer les points de contrôle de diffusion en continu pour les flux sélectionnés et non les données des tables de diffusion en continu associées. Pour effacer les points de contrôle des flux sélectionnés, utilisez l’API REST Databricks pour démarrer une actualisation. Consultez Démarrer une mise à jour de pipeline pour effacer les points de contrôle des flux de streaming sélectifs.

Dois-je utiliser une actualisation complète ?

Databricks recommande d’exécuter des actualisations complètes uniquement si nécessaire. Une actualisation complète retraite toujours tous les enregistrements des sources de données spécifiées via la logique qui définit le jeu de données. Le temps et les ressources pour effectuer une actualisation complète sont corrélées à la taille des données sources.

Les vues matérialisées retournent les mêmes résultats que l’actualisation par défaut ou complète est utilisée. L’utilisation d’une actualisation complète avec des tables en flux réinitialise toutes les informations de traitement des états et de point de contrôle et peut entraîner la perte d’enregistrements si les données sources ne sont plus disponibles.

Databricks recommande uniquement l’actualisation complète lorsque les sources de données d’entrée contiennent les données nécessaires pour recréer l’état souhaité de la table ou de la vue. Tenez compte des scénarios suivants dans lesquels les données de source d’entrée ne sont plus disponibles et le résultat de l’exécution d’une actualisation complète :

Source de données Raison pour laquelle les données d’entrée sont absentes Résultat de l’actualisation complète
Kafka Seuil de rétention court Les enregistrements qui ne sont plus présents dans la source Kafka sont supprimés de la table cible.
Fichiers dans le stockage d’objets Politique de cycle de vie Les fichiers de données qui ne sont plus présents dans le répertoire source sont supprimés de la table cible.
Enregistrements dans une table Supprimé pour la conformité Seuls les enregistrements présents dans la table source sont traités.

Pour empêcher l’exécution complète des actualisations sur une table ou une vue, définissez la propriété de table pipelines.reset.allowed sur false. Consultez les propriétés des tables de pipeline. Vous pouvez également utiliser un flux d’ajout pour ajouter des données à une table de streaming existante sans nécessiter d’actualisation complète.

Démarrer une mise à jour de pipeline pour les tables sélectionnées

Vous pouvez éventuellement retraiter des données pour les tables sélectionnées uniquement dans votre pipeline. Par exemple, lors du développement, vous ne changez qu’une seule table et souhaitez réduire la durée des tests, ou une mise à jour de pipeline échoue et vous voulez actualiser uniquement les tables ayant échoué.

L’éditeur de pipelines Lakeflow propose des options pour retraiter un fichier source, des tables sélectionnées ou une seule table. Pour plus d’informations, consultez Exécuter le code du pipeline.

Démarrer une mise à jour de pipeline pour les tables défaillantes

Si une mise à jour de pipeline échoue en raison d’erreurs dans une ou plusieurs tables du graphique du pipeline, vous pouvez démarrer une mise à jour uniquement des tables en échec et de toutes les dépendances en aval.

Note

Les tables exclues ne sont pas actualisées, même si elles dépendent d’une table en échec.

Pour mettre à jour les tables ayant échoué, dans la page de surveillance du pipeline, cliquez sur Actualiser les tables ayant échoué.

Pour mettre à jour uniquement les tables échouées sélectionnées à partir de la page de surveillance du pipeline :

  1. Cliquez sur Bouton vers le bas à côté du bouton Actualiser les tables en échec et cliquez sur Sélectionner les tables à actualiser. La boîte de dialogue Sélectionner les tables à actualiser s’affiche.

  2. Pour sélectionner les tables à actualiser, cliquez sur chaque table. Les tables sélectionnées sont mises en évidence et étiquetées. Pour supprimer une table de la mise à jour, cliquez à nouveau sur la table.

  3. Cliquez sur Actualiser la sélection.

    Note

    Le bouton Actualiser la sélection affiche le nombre de tables sélectionnées entre parenthèses.

Pour retraiter les données déjà ingérées pour les tables sélectionnées, cliquez sur la Flèche bleue vers le bas à côté du bouton Actualiser la sélection puis cliquez sur Actualisation complète de la sélection.

Démarrer une mise à jour de pipeline pour effacer les points de contrôle des flux de streaming sélectifs

Vous pouvez éventuellement retraiter des données pour les flux de streaming sélectionnés dans votre pipeline sans effacer les données déjà ingérées.

Note

Les flux qui ne sont pas sélectionnés sont exécutés à l’aide d’une mise à jour REFRESH. Vous pouvez également spécifier full_refresh_selection ou refresh_selection actualiser sélectivement d’autres tables.

Pour démarrer une mise à jour pour actualiser les points de contrôle de streaming sélectionnés, utilisez la demande de mises à jour dans l’API REST Pipelines déclaratifs Lakeflow Spark. L’exemple suivant utilise la curl commande pour appeler la updates requête pour démarrer une mise à jour de pipeline :

curl -X POST \
-H "Authorization: Bearer <your-token>" \
-H "Content-Type: application/json" \
-d '{
"reset_checkpoint_selection": [<streaming flow1>, <streaming flow 2>...]
}' \
https://<your-databricks-instance>/api/2.0/pipelines/<your-pipeline-id>/updates

Vérifiez un pipeline pour des erreurs sans attendre la mise à jour des tables

Important

La fonctionnalité de pipeline Dry run est en préversion publique.

Pour vérifier si le code source d’un pipeline est valide sans exécuter une mise à jour complète, utilisez une exécution sèche. Une exécution sèche résout les définitions des jeux de données et des flux définis dans le pipeline, mais ne matérialise ni ne publie aucun jeu de données. Les erreurs détectées pendant l’exécution sèche, telles que des noms de table ou de colonnes incorrects, sont signalées dans l’interface utilisateur.

Pour démarrer une simulation, cliquez sur Blue Down Caret dans la page de détails du pipeline en regard de Démarrer puis cliquez sur Simulation.

Une fois la simulation terminée, toutes les erreurs s’affichent dans la zone des événements dans le panneau inférieur. Le fait de cliquer sur la barre d’événements affiche les problèmes détectés dans le volet inférieur. En outre, le journal des événements affiche les événements liés uniquement à l’exécution sèche, et aucune métrique n’est affichée dans le DAG. Si des erreurs sont détectées, les détails sont disponibles dans le journal des événements.

Vous pouvez voir les résultats uniquement pour la course sèche la plus récente. Si la simulation était la mise à jour la plus récemment exécutée, vous pouvez voir les résultats en la sélectionnant dans l’historique des mises à jour. Si une autre mise à jour est exécutée après l’exécution sèche, les résultats ne sont plus disponibles dans l’interface utilisateur.

Mode de développement

Les pipelines s'exécutent à partir de l'Éditeur de Pipelines Lakeflow, avec le mode de développement activé. Les pipelines programmés sont par défaut exécutés avec le mode de développement désactivé. Si vous souhaitez tester l’exécution du pipeline en production, vous pouvez choisir de manière interactive d’utiliser le mode de développement en choisissant Exécuter avec différents paramètres dans la liste déroulante de l’éditeur.

Note

Les pipelines créés avec l’éditeur de notebook hérité par défaut utilisent le mode de développement. Vous pouvez vérifier ou modifier le paramètre en choisissant Paramètres dans la page de surveillance du pipeline. La page d’analyse est disponible à partir du bouton Travaux & Pipelines sur le côté gauche de votre espace de travail. Vous pouvez également accéder directement à la page d’analyse à partir de l’éditeur de pipeline en cliquant sur les résultats d’exécution dans le navigateur des ressources du pipeline.

Lorsque vous exécutez votre pipeline en mode de développement, le système Pipelines déclaratifs Spark Lakeflow effectue les opérations suivantes :

  • Réutilise un cluster pour éviter les surcharges liées aux redémarrages. Par défaut, les clusters s’exécutent pendant deux heures lorsque le mode de développement est activé. Vous pouvez le modifier avec le paramètre pipelines.clusterShutdown.delay dans Configurer le calcul classique pour les pipelines.
  • désactive les nouvelles tentatives de pipeline pour vous permettre de détecter et de corriger immédiatement les erreurs.

Avec le mode de développement désactivé, le système effectue les opérations suivantes :

  • redémarre le cluster en cas d’erreurs récupérables spécifiques, notamment les fuites de mémoire et les informations d’identification obsolètes ;
  • Réessayez l’exécution en cas d’erreurs spécifiques, telles qu’un échec de démarrage d’un cluster.

Note

L'activation et la désactivation du mode de développement contrôlent uniquement le comportement d’exécution du cluster et du pipeline. Les emplacements de stockage et les schémas cibles dans le catalogue pour la publication des tables doivent être configurés dans le cadre des paramètres de pipeline, et ne sont pas affectés lors du basculement d’un mode à l’autre.