Partager via


Lire les informations d'état de Structured Streaming

Vous pouvez utiliser des opérations DataFrame ou des fonctions SQL de valeur de table pour interroger les données et les métadonnées d’état de Structured Streaming. Utilisez ces fonctions pour observer les informations d’état des requêtes stateful dans Structured Streaming, ce qui est utile pour la surveillance et le débogage.

Vous devez disposer d’un accès en lecture au chemin de point de contrôle d’une requête de diffusion en continu pour interroger les métadonnées ou les données d’état. Les fonctions décrites dans cet article fournissent un accès en lecture seule aux métadonnées et aux données d’état. Vous pouvez uniquement utiliser la sémantique de lecture par lots pour interroger les informations d’état.

Remarque

Vous ne pouvez pas interroger les informations sur l'état des pipelines déclaratifs Lakeflow Spark, des tables de streaming ou des vues matérialisées. Vous ne pouvez pas interroger les informations d'état en utilisant l'informatique sans serveur ou avec des paramètres calculés selon le mode d'accès standard.

Spécifications

  • Utilisez l’une des configurations de calcul suivantes :
    • Databricks Runtime 16.3 et versions ultérieures sur le calcul configuré avec le mode d’accès standard.
    • Databricks Runtime 14.3 LTS et au-delà sur une infrastructure informatique configurée avec un mode d’accès dédié ou sans isolation.
  • Accès en lecture au chemin de point de contrôle utilisé par la requête de diffusion en continu.

Lire le magasin d’état Structured Streaming

Vous pouvez lire les informations de magasin d’état pour les requêtes Structured Streaming exécutées dans n’importe quel Databricks Runtime pris en charge. Utilisez la syntaxe suivante :

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Paramètres de l’API Lecteur d’état

L’API lecteur d’état prend en charge les configurations facultatives suivantes :

Choix Catégorie Valeur par défaut Descriptif
batchId Long ID du dernier lot Représente le lot cible à partir duquel effectuer la lecture. Spécifiez cette option pour interroger les informations d’état pour un état antérieur de la requête. Le lot doit être validé, mais pas encore nettoyé.
operatorId Long 0 Représente l’opérateur cible à partir duquel effectuer la lecture. Cette option est utilisée quand la requête utilise plusieurs opérateurs avec état.
storeName Chaîne « DEFAULT » Représente le nom du magasin d’état cible à lire. Cette option est utilisée lorsque l’opérateur avec état utilise plusieurs instances de stockage d’états. storeName ou joinSide doivent être spécifiés pour une jointure flux-vapeur, mais pas les deux.
joinSide Chaîne (« gauche » ou « droite ») Représente le côté cible à partir duquel effectuer la lecture. Cette option est utilisée lorsque les utilisateurs souhaitent lire l’état à partir d’une jointure de flux à flux.
stateVarName Chaîne Aucun Nom de la variable d’état à lire dans le cadre de cette requête. Le nom de la variable d'état est le nom unique donné à chaque variable dans la fonction init d'un StatefulProcessor utilisé par l'opérateur transformWithState. Cette option est une option obligatoire si l’opérateur transformWithState est utilisé. Cette option s’applique uniquement à l’opérateur transformWithState et est ignorée pour d’autres opérateurs. Disponible dans Databricks Runtime 16.2 et versions ultérieures.
readRegisteredTimers Booléen faux Réglé à true pour lire les minuteurs inscrits utilisés dans l’opérateur transformWithState. Cette option s’applique uniquement à l’opérateur transformWithState et est ignorée pour d’autres opérateurs. Disponible dans Databricks Runtime 16.2 et versions ultérieures.
flattenCollectionTypes Booléen vrai Si true, aplatit les enregistrements retournés pour les variables d’état de carte et de liste. Si false, les enregistrements sont retournés en utilisant un SQL Spark Array ou Map. Cette option s’applique uniquement à l’opérateur transformWithState et est ignorée pour d’autres opérateurs. Disponible dans Databricks Runtime 16.2 et versions ultérieures.

Les données retournées ont le schéma suivant :

Colonne Catégorie Descriptif
key Struct (autre type dérivé de la clé d’état) Clé d’un enregistrement d’opérateur avec état dans le point de contrôle d’état.
value Struct (autre type dérivé de la valeur d’état) Valeur d’un enregistrement d’opérateur avec état dans le point de contrôle d’état.
partition_id Entier Partition du point de contrôle d’état qui contient l’enregistrement d’opérateur avec état.

Consultez read_statestoreTVF.

Lire les métadonnées d’état Structured Streaming

Important

Vous devez exécuter des requêtes de diffusion en continu sur Databricks Runtime 14.2 ou version ultérieure pour enregistrer les métadonnées d’état. Les fichiers de métadonnées d’état ne cassent pas la compatibilité descendante. Si vous choisissez d’exécuter une requête de diffusion en continu sur Databricks Runtime 14.1 ou version antérieure, les fichiers de métadonnées d’état existants sont ignorés et aucun nouveau fichier de métadonnées d’état n’est écrit.

Vous pouvez lire les informations de métadonnées d’état pour les requêtes Structured Streaming exécutées dans Databricks Runtime 14.2 ou version ultérieure. Utilisez la syntaxe suivante :

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Les données retournées ont le schéma suivant :

Colonne Catégorie Descriptif
operatorId Entier ID entier de l’opérateur de diffusion en continu avec état.
operatorName Entier Nom de l’opérateur de diffusion en continu avec état.
stateStoreName Chaîne Nom de l'entrepôt d'état de l'opérateur.
numPartitions Entier Nombre de partitions du magasin d’état.
minBatchId Long ID de lot minimal disponible pour l’état d’interrogation.
maxBatchId Long L’ID de lot maximal disponible pour interroger l'état.

Remarque

Les valeurs d’ID de lot fournies par minBatchId et maxBatchId reflètent l’état au moment où le point de contrôle a été écrit. Les anciens lots sont nettoyés automatiquement avec l’exécution de micro-lots. Par conséquent, la valeur fournie ici n’est pas garantie d’être toujours disponible.

Consultez read_state_metadataTVF.

Exemple : interroger un côté d’une jointure de flux de flux

Utilisez la syntaxe suivante pour interroger le côté gauche d’une jointure de flux de flux :

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Exemple : magasin d’états de requête pour le flux avec plusieurs opérateurs avec état

Cet exemple utilise le lecteur de métadonnées d’état pour collecter les détails des métadonnées d’une requête de diffusion en continu avec plusieurs opérateurs avec état, puis utilise les résultats des métadonnées comme options pour le lecteur d’état.

Le lecteur de métadonnées d’état prend le chemin de point de contrôle comme seule option, comme dans l’exemple de syntaxe suivant :

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Le tableau suivant représente un exemple de sortie des métadonnées du magasin d’états :

operatorId nom de l'opérateur stateStoreName nombreDePartitions minBatchId maxBatchId
0 stateStoreSave par défaut 200 0 13
1 dedupeWithinWatermark par défaut 200 0 13

Pour obtenir les résultats de l’opérateur dedupeWithinWatermark , interrogez le lecteur d’état avec l’option operatorId , comme dans l’exemple suivant :

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);