Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Vous pouvez utiliser des opérations DataFrame ou des fonctions SQL de valeur de table pour interroger les données et les métadonnées d’état de Structured Streaming. Utilisez ces fonctions pour observer les informations d’état des requêtes stateful dans Structured Streaming, ce qui est utile pour la surveillance et le débogage.
Vous devez disposer d’un accès en lecture au chemin de point de contrôle d’une requête de diffusion en continu pour interroger les métadonnées ou les données d’état. Les fonctions décrites dans cet article fournissent un accès en lecture seule aux métadonnées et aux données d’état. Vous pouvez uniquement utiliser la sémantique de lecture par lots pour interroger les informations d’état.
Remarque
Vous ne pouvez pas interroger les informations sur l'état des pipelines déclaratifs Lakeflow Spark, des tables de streaming ou des vues matérialisées. Vous ne pouvez pas interroger les informations d'état en utilisant l'informatique sans serveur ou avec des paramètres calculés selon le mode d'accès standard.
Spécifications
- Utilisez l’une des configurations de calcul suivantes :
- Databricks Runtime 16.3 et versions ultérieures sur le calcul configuré avec le mode d’accès standard.
- Databricks Runtime 14.3 LTS et au-delà sur une infrastructure informatique configurée avec un mode d’accès dédié ou sans isolation.
- Accès en lecture au chemin de point de contrôle utilisé par la requête de diffusion en continu.
Lire le magasin d’état Structured Streaming
Vous pouvez lire les informations de magasin d’état pour les requêtes Structured Streaming exécutées dans n’importe quel Databricks Runtime pris en charge. Utilisez la syntaxe suivante :
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Paramètres de l’API Lecteur d’état
L’API lecteur d’état prend en charge les configurations facultatives suivantes :
| Choix | Catégorie | Valeur par défaut | Descriptif |
|---|---|---|---|
batchId |
Long | ID du dernier lot | Représente le lot cible à partir duquel effectuer la lecture. Spécifiez cette option pour interroger les informations d’état pour un état antérieur de la requête. Le lot doit être validé, mais pas encore nettoyé. |
operatorId |
Long | 0 | Représente l’opérateur cible à partir duquel effectuer la lecture. Cette option est utilisée quand la requête utilise plusieurs opérateurs avec état. |
storeName |
Chaîne | « DEFAULT » | Représente le nom du magasin d’état cible à lire. Cette option est utilisée lorsque l’opérateur avec état utilise plusieurs instances de stockage d’états.
storeName ou joinSide doivent être spécifiés pour une jointure flux-vapeur, mais pas les deux. |
joinSide |
Chaîne (« gauche » ou « droite ») | Représente le côté cible à partir duquel effectuer la lecture. Cette option est utilisée lorsque les utilisateurs souhaitent lire l’état à partir d’une jointure de flux à flux. | |
stateVarName |
Chaîne | Aucun | Nom de la variable d’état à lire dans le cadre de cette requête. Le nom de la variable d'état est le nom unique donné à chaque variable dans la fonction init d'un StatefulProcessor utilisé par l'opérateur transformWithState. Cette option est une option obligatoire si l’opérateur transformWithState est utilisé. Cette option s’applique uniquement à l’opérateur transformWithState et est ignorée pour d’autres opérateurs. Disponible dans Databricks Runtime 16.2 et versions ultérieures. |
readRegisteredTimers |
Booléen | faux | Réglé à true pour lire les minuteurs inscrits utilisés dans l’opérateur transformWithState. Cette option s’applique uniquement à l’opérateur transformWithState et est ignorée pour d’autres opérateurs. Disponible dans Databricks Runtime 16.2 et versions ultérieures. |
flattenCollectionTypes |
Booléen | vrai | Si true, aplatit les enregistrements retournés pour les variables d’état de carte et de liste. Si false, les enregistrements sont retournés en utilisant un SQL Spark Array ou Map. Cette option s’applique uniquement à l’opérateur transformWithState et est ignorée pour d’autres opérateurs. Disponible dans Databricks Runtime 16.2 et versions ultérieures. |
Les données retournées ont le schéma suivant :
| Colonne | Catégorie | Descriptif |
|---|---|---|
key |
Struct (autre type dérivé de la clé d’état) | Clé d’un enregistrement d’opérateur avec état dans le point de contrôle d’état. |
value |
Struct (autre type dérivé de la valeur d’état) | Valeur d’un enregistrement d’opérateur avec état dans le point de contrôle d’état. |
partition_id |
Entier | Partition du point de contrôle d’état qui contient l’enregistrement d’opérateur avec état. |
Consultez read_statestoreTVF.
Lire les métadonnées d’état Structured Streaming
Important
Vous devez exécuter des requêtes de diffusion en continu sur Databricks Runtime 14.2 ou version ultérieure pour enregistrer les métadonnées d’état. Les fichiers de métadonnées d’état ne cassent pas la compatibilité descendante. Si vous choisissez d’exécuter une requête de diffusion en continu sur Databricks Runtime 14.1 ou version antérieure, les fichiers de métadonnées d’état existants sont ignorés et aucun nouveau fichier de métadonnées d’état n’est écrit.
Vous pouvez lire les informations de métadonnées d’état pour les requêtes Structured Streaming exécutées dans Databricks Runtime 14.2 ou version ultérieure. Utilisez la syntaxe suivante :
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Les données retournées ont le schéma suivant :
| Colonne | Catégorie | Descriptif |
|---|---|---|
operatorId |
Entier | ID entier de l’opérateur de diffusion en continu avec état. |
operatorName |
Entier | Nom de l’opérateur de diffusion en continu avec état. |
stateStoreName |
Chaîne | Nom de l'entrepôt d'état de l'opérateur. |
numPartitions |
Entier | Nombre de partitions du magasin d’état. |
minBatchId |
Long | ID de lot minimal disponible pour l’état d’interrogation. |
maxBatchId |
Long | L’ID de lot maximal disponible pour interroger l'état. |
Remarque
Les valeurs d’ID de lot fournies par minBatchId et maxBatchId reflètent l’état au moment où le point de contrôle a été écrit. Les anciens lots sont nettoyés automatiquement avec l’exécution de micro-lots. Par conséquent, la valeur fournie ici n’est pas garantie d’être toujours disponible.
Consultez read_state_metadataTVF.
Exemple : interroger un côté d’une jointure de flux de flux
Utilisez la syntaxe suivante pour interroger le côté gauche d’une jointure de flux de flux :
Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Exemple : magasin d’états de requête pour le flux avec plusieurs opérateurs avec état
Cet exemple utilise le lecteur de métadonnées d’état pour collecter les détails des métadonnées d’une requête de diffusion en continu avec plusieurs opérateurs avec état, puis utilise les résultats des métadonnées comme options pour le lecteur d’état.
Le lecteur de métadonnées d’état prend le chemin de point de contrôle comme seule option, comme dans l’exemple de syntaxe suivant :
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Le tableau suivant représente un exemple de sortie des métadonnées du magasin d’états :
| operatorId | nom de l'opérateur | stateStoreName | nombreDePartitions | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | par défaut | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | par défaut | 200 | 0 | 13 |
Pour obtenir les résultats de l’opérateur dedupeWithinWatermark , interrogez le lecteur d’état avec l’option operatorId , comme dans l’exemple suivant :
Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);