Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette page explique comment récupérer un pipeline dans les pipelines déclaratifs Spark Lakeflow lorsqu’un point de contrôle de streaming devient non valide ou endommagé.
Qu’est-ce qu’un point de contrôle de streaming ?
Dans Apache Spark Structured Streaming, un point de contrôle est un mécanisme utilisé pour conserver l’état d’une requête de diffusion en continu. Cet état inclut :
- Informations de progression : les décalages de la source ont été traités.
-
État intermédiaire : données qui doivent être conservées entre les micro-lots pour les opérations avec état (par exemple, agrégations,
mapGroupsWithState). - Métadonnées : informations sur l’exécution de la requête de diffusion en continu.
Les points de contrôle sont essentiels pour garantir la tolérance de panne et la cohérence des données dans les applications de diffusion en continu :
- Tolérance de panne : en cas d’échec d’une application de diffusion en continu (par exemple, en raison d’une défaillance de nœud, d’un plantage de l’application), le point de contrôle permet à l’application de redémarrer à partir du dernier état de point de contrôle réussi au lieu de retraiter toutes les données à partir du début. Cela empêche la perte de données et garantit un traitement incrémentiel.
- Traitement exactement une fois : pour de nombreuses sources de streaming, points de contrôle, conjointement avec les récepteurs idempotents, activez exactement une fois le traitement garantit que chaque enregistrement est traité exactement une fois, même en cas de défaillances, empêchant les doublons ou les omissions.
- Gestion de l’état : pour les transformations avec état, les points de contrôle conservent l’état interne de ces opérations, ce qui permet à la requête de diffusion en continu de continuer à traiter correctement les nouvelles données en fonction de l’état historique cumulé.
Points de contrôle de pipeline
Les pipelines s’appuient sur Structured Streaming et simplifient la gestion des points de contrôle sous-jacents, ce qui offre une approche déclarative. Lorsque vous définissez une table de diffusion en continu dans votre pipeline, il existe un état de point de contrôle pour chaque flux écrit dans la table de diffusion en continu. Ces emplacements de point de contrôle sont internes au pipeline et ne sont pas accessibles aux utilisateurs.
En règle générale, vous n’avez pas besoin de gérer ou de comprendre les points de contrôle sous-jacents pour les tables de diffusion en continu, sauf dans les cas suivants :
- Rembobinage et relecture : si vous souhaitez retraiter les données à partir d’un point spécifique dans le temps tout en conservant l’état actuel de la table, vous devez réinitialiser le point de contrôle de la table de diffusion en continu.
-
Récupération à partir d’une défaillance ou d’une altération du point de contrôle : si une requête qui écrit dans la table de diffusion en continu a échoué en raison d’erreurs liées au point de contrôle, elle provoque une défaillance difficile et la requête ne peut pas progresser. Il existe trois approches que vous pouvez utiliser pour récupérer à partir de cette classe de défaillance :
- Actualisation complète de la table : cette opération réinitialise la table et efface les données existantes.
- Actualisation complète de la table avec sauvegarde et remplissage : vous effectuez une sauvegarde de la table avant d’effectuer une actualisation complète de la table et de remplir les anciennes données, mais cela est très coûteux et doit être le dernier recours.
- Réinitialiser le point de contrôle et continuer de manière incrémentielle : si vous ne pouvez pas vous permettre de perdre des données existantes, vous devez effectuer une réinitialisation sélective des points de contrôle pour les flux de streaming affectés.
Exemple : Échec du pipeline en raison d’une modification du code
Envisagez un scénario dans lequel vous disposez d’un pipeline qui traite un flux de données modifiées, ainsi que l’instantané initial de la table à partir d’un système de stockage cloud, tel qu’Amazon S3, et écrit dans une table de diffusion en continu SCD-1.
Le pipeline a deux flux de streaming :
-
customers_incremental_flow: lit de façon incrémentielle le flux CDC de lacustomertable source, filtre les enregistrements dupliqués et les upsert dans la table cible. -
customers_snapshot_flow: lecture unique de l’instantané initial de lacustomerstable source et upsert les enregistrements dans la table cible.
@dp.temporary_view(name="customers_incremental_view")
def query():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.load(customers_incremental_path)
.dropDuplicates(["customer_id"])
)
@dp.temporary_view(name="customers_snapshot_view")
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(customers_snapshot_path)
.select("*")
)
dp.create_streaming_table("customers")
dp.create_auto_cdc_flow(
flow_name = "customers_incremental_flow",
target = "customers",
source = "customers_incremental_view",
keys = ["customer_id"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
dp.create_auto_cdc_flow(
flow_name = "customers_snapshot_flow",
target = "customers",
source = "customers_snapshot_view",
keys = ["customer_id"],
sequence_by = lit(0),
stored_as_scd_type = 1,
once = True
)
Après avoir déployé ce pipeline, il s’exécute correctement et commence à traiter le flux de données de modification et l’instantané initial.
Plus tard, vous réalisez que la logique de déduplication dans la customers_incremental_view requête est redondante et provoque un goulot d’étranglement des performances. Vous supprimez les fonctionnalités pour améliorer les dropDuplicates() performances :
@dp.temporary_view(name="customers_raw_view")
def query():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.load()
# .dropDuplicates()
)
Après avoir supprimé l’API dropDuplicates() et redéployé le pipeline, la mise à jour échoue avec l’erreur suivante :
Streaming stateful operator name does not match with the operator in state metadata.
This is likely to happen when a user adds/removes/changes stateful operators of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)];
Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST
Cette erreur indique que la modification n’est pas autorisée en raison d’une incompatibilité entre l’état du point de contrôle et la définition de requête actuelle, ce qui empêche le pipeline de progresser.
Les défaillances liées au point de contrôle peuvent se produire pour différentes raisons au-delà de la suppression de l’API dropDuplicates . Les scénarios courants sont les suivants :
- Ajout ou suppression d’opérateurs avec état (par exemple, introduction ou suppression
dropDuplicates()ou agrégations) dans une requête de streaming existante. - Ajout, suppression ou combinaison de sources de diffusion en continu dans une requête précédemment point de contrôle (par exemple, unioning d’une requête de diffusion en continu existante avec une nouvelle requête ou ajout/suppression de sources d’une opération union existante).
- Modification du schéma d’état des opérations de streaming avec état (par exemple, modification des colonnes utilisées pour la déduplication ou l’agrégation).
Pour obtenir la liste complète des modifications prises en charge et non prises en charge, reportez-vous au Guide spark structured streaming et aux types de modifications dans les requêtes Structured Streaming.
Options de récupération
Il existe trois stratégies de récupération, en fonction des exigences de durabilité des données et des contraintes de ressources :
| Méthodes | Complexité | Coûts | Perte de données potentielle | Duplication potentielle des données | Nécessite un instantané initial | Réinitialisation complète de la table |
|---|---|---|---|---|---|---|
| Actualisation complète de la table | Low | Moyen | Oui (si aucun instantané initial n’est disponible ou si des fichiers bruts ont été supprimés à la source.) | Non (Pour appliquer la table cible des modifications.) | Oui | Oui |
| Actualisation complète de la table avec sauvegarde et remplissage | Moyen | High | Non | Non (Pour les récepteurs idempotents. Par exemple, capture de données modifiées automatique.) | Non | Non |
| Réinitialiser le point de contrôle de table | Medium-High (moyenne pour les sources d’ajout uniquement qui fournissent des décalages immuables.) | Low | Non (nécessite une attention particulière.) | Non (Pour les écrivains idempotents. Par exemple, capture de données modifiées automatiquement sur la table cible uniquement.) | Non | Non |
Medium-High complexité dépend du type de source de streaming et de la complexité de la requête.
Recommendations
- Utilisez une actualisation complète de la table si vous ne souhaitez pas gérer la complexité d’une réinitialisation de point de contrôle et que vous pouvez recompiler l’ensemble de la table. Cela vous donnera également la possibilité d’apporter des modifications de code.
- Utilisez l’actualisation complète de la table avec la sauvegarde et le remplissage si vous ne souhaitez pas gérer la complexité de la réinitialisation de point de contrôle, et vous êtes d’accord avec le coût supplémentaire de la sauvegarde et du remplissage des données historiques.
- Utilisez le point de contrôle de la table de réinitialisation si vous devez conserver les données existantes dans la table et continuer à traiter de nouvelles données de manière incrémentielle. Toutefois, cette approche nécessite une gestion minutieuse de la réinitialisation de point de contrôle pour vérifier que les données existantes de la table ne sont pas perdues et que le pipeline peut continuer à traiter de nouvelles données.
Réinitialiser le point de contrôle et continuer de manière incrémentielle
Pour réinitialiser le point de contrôle et poursuivre le traitement incrémentiel, procédez comme suit :
Arrêtez le pipeline : vérifiez que le pipeline n’a pas de mises à jour actives en cours d’exécution.
Déterminez la position de départ du nouveau point de contrôle : identifiez le dernier décalage ou horodatage réussi à partir duquel vous souhaitez poursuivre le traitement. Il s’agit généralement du dernier décalage traité correctement avant l’échec.
Dans l’exemple ci-dessus, car vous lisez les fichiers JSON à l’aide du chargeur automatique, vous pouvez utiliser l’option
modifiedAfterpour spécifier la position de départ du nouveau point de contrôle. Cette option vous permet de définir un horodatage lorsque le chargeur automatique doit commencer à traiter de nouveaux fichiers.Pour les sources Kafka, vous pouvez utiliser l’option
startingOffsetspermettant de spécifier les décalages à partir desquels la requête de diffusion en continu doit commencer à traiter de nouvelles données.Pour les sources Delta Lake, vous pouvez utiliser l’option
startingVersionpermettant de spécifier la version à partir de laquelle la requête de streaming doit commencer à traiter de nouvelles données.Apportez des modifications de code : vous pouvez modifier la requête de diffusion en continu pour supprimer l’API
dropDuplicates()ou apporter d’autres modifications. En outre, vérifiez que vous avez ajouté l’optionmodifiedAfterau chemin de lecture du chargeur automatique.@dp.temporary_view(name="customers_incremental_view") def query(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .option("cloudFiles.includeExistingFiles", "true") .option("modifiedAfter", "2025-04-09T06:15:00") .load(customers_incremental_path) # .dropDuplicates(["customer_id"]) )Note
Fournir un horodatage incorrect
modifiedAfterpeut entraîner une perte de données ou une duplication. Vérifiez que l’horodatage est correctement défini pour éviter de traiter les anciennes données ou les nouvelles données manquantes.Si votre requête a une jointure de flux de flux ou une union de flux de flux de données, vous devez appliquer la stratégie ci-dessus pour toutes les sources de diffusion en continu participantes. Par exemple:
cdc_1 = spark.readStream.format("cloudFiles")... cdc_2 = spark.readStream.format("cloudFiles")... cdc_source = cdc_1..union(cdc_2)Identifiez les noms de flux associés à la table de diffusion en continu pour laquelle vous souhaitez réinitialiser le point de contrôle. Dans l’exemple, il s’agit
customers_incremental_flowde . Vous pouvez trouver le nom du flux dans le code du pipeline ou en vérifiant l’interface utilisateur du pipeline ou les journaux d’événements du pipeline.Réinitialisez le point de contrôle : créez un notebook Python et attachez-le à un cluster Azure Databricks.
Vous aurez besoin des informations suivantes pour pouvoir réinitialiser le point de contrôle :
- URL de l’espace de travail Azure Databricks
- Identifiant de pipeline
- Nom(s) du flux pour lequel vous réinitialise le point de contrôle
import requests import json # Define your Databricks instance and pipeline ID databricks_instance = "<DATABRICKS_URL>" token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() pipeline_id = "<YOUR_PIPELINE_ID>" flows_to_reset = ["<YOUR_FLOW_NAME>"] # Set up the API endpoint endpoint = f"{databricks_instance}/api/2.0/pipelines/{pipeline_id}/updates" # Set up the request headers headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } # Define the payload payload = { "reset_checkpoint_selection": flows_to_reset } # Make the POST request response = requests.post(endpoint, headers=headers, data=json.dumps(payload)) # Check the response if response.status_code == 200: print("Pipeline update started successfully.") else: print(f"Error: {response.status_code}, {response.text}")Exécutez le pipeline : le pipeline commence à traiter de nouvelles données à partir de la position de départ spécifiée avec un nouveau point de contrôle, en préservant les données de table existantes tout en continuant le traitement incrémentiel.
Meilleures pratiques
- Évitez d’utiliser des fonctionnalités de préversion privée en production.
- Testez vos modifications avant d’apporter des modifications dans votre environnement de production.
- Créez un pipeline de test, idéalement dans un environnement inférieur. Si ce n’est pas possible, essayez d’utiliser un autre catalogue et schéma pour votre test.
- Reproduire l’erreur.
- Appliquez les modifications.
- Validez les résultats et prenez une décision en cours/no-go.
- Déployez les modifications apportées à vos pipelines de production.