Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Puede usar operaciones de DataFrame o funciones de valores de tabla SQL para consultar datos de estado y metadatos de Structured Streaming. Utiliza estas funciones para observar la información de estado en las consultas con estado en Structured Streaming, lo que puede ser útil para la supervisión y depuración.
Debe tener acceso de lectura a la ruta de acceso del punto de control para una consulta de streaming para consultar los datos de estado o los metadatos. Las funciones descritas en este artículo proporcionan acceso de solo lectura a los datos de estado y a los metadatos. Solo puede usar la semántica de lectura por lotes para consultar la información de estado.
Nota:
No se puede consultar la información de estado de las canalizaciones declarativas de Spark de Lakeflow, de las tablas de streaming o de las vistas materializadas. No se puede consultar la información de estado mediante proceso sin servidor o proceso configurado con el modo de acceso estándar.
Requisitos
- Use una de las siguientes configuraciones de proceso:
- Databricks Runtime 16.3 y versiones posteriores en instancias de cálculo configuradas con el modo de acceso estándar.
- Databricks Runtime 14.3 LTS y versiones posteriores en entornos de cómputo configurados con modo de acceso dedicado o sin aislamiento.
- Acceso de lectura a la ruta del punto de control usada por la consulta de streaming.
Lectura del almacén de estado de Structured Streaming
Puede leer la información del almacén de estado de las consultas de Structured Streaming ejecutadas en cualquier Databricks Runtime compatible. Use la sintaxis siguiente:
Pitón
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Parámetros de la API de lector de estado
La API de lector de estado admite las siguientes configuraciones opcionales:
| Opción | Tipo | Valor predeterminado | Descripción |
|---|---|---|---|
batchId |
Largo | identificador de lote más reciente | Representa el lote de destino del que se va a leer. Especifica esta opción para consultar información de estado para un estado anterior de la consulta. El lote debe confirmarse, pero no se debe limpiar todavía. |
operatorId |
Largo | 0 | Representa el operador de destino del que se va a leer. Esta opción se usa cuando la consulta usa varios operadores con estado. |
storeName |
Cadena | "DEFAULT" | Representa el nombre del almacén de estado de destino del que se va a leer. Esta opción se usa cuando el operador con estado usa varias instancias de almacén de estado.
storeName o joinSide debe especificarse para una combinación de vapor de flujo, pero no para ambas. |
joinSide |
Cadena (“izquierda” o “derecha”) | Representa el lado de destino del que se va a leer. Esta opción se usa cuando los usuarios quieren leer el estado de una unión de flujo a flujo. | |
stateVarName |
Cadena | Ninguno | Nombre de la variable de estado que se va a leer como parte de esta consulta. El nombre de la variable de estado es el nombre único que se asigna a cada variable dentro de la init función de un StatefulProcessor utilizado por el transformWithState operador . Esta opción es una opción necesaria si se usa el transformWithState operador . Esta opción solo se aplica al transformWithState operador y se omite para otros operadores. Disponible en Databricks Runtime 16.2 y versiones posteriores. |
readRegisteredTimers |
Booleano | falso | Establézcalo en true para leer los temporizadores registrados usados en el operador transformWithState. Esta opción solo se aplica al transformWithState operador y se omite para otros operadores. Disponible en Databricks Runtime 16.2 y versiones posteriores. |
flattenCollectionTypes |
Booleano | cierto | Si true, aplana los registros devueltos para las variables de estado de mapa y lista. Si false, los registros se devuelven usando Spark SQL Array o Map. Esta opción solo se aplica al transformWithState operador y se omite para otros operadores. Disponible en Databricks Runtime 16.2 y versiones posteriores. |
Los datos devueltos tienen el siguiente esquema:
| Columna | Tipo | Descripción |
|---|---|---|
key |
Estructura (tipo adicional derivado de la clave de estado) | Clave de un registro de operador con estado en el punto de control de estado. |
value |
Estructura (tipo adicional derivado del valor de estado) | El valor de un registro de operador con estado en el punto de control de estado. |
partition_id |
Entero | La partición del punto de control de estado que contiene el registro de operador con estado. |
Consulte read_statestore función con valores de tabla.
Lectura de metadatos de estado de Structured Streaming
Importante
Debe ejecutar consultas de streaming en Databricks Runtime 14.2 o superior para registrar los metadatos de estado. Los archivos de metadatos de estado no interrumpen la compatibilidad con versiones anteriores. Si decide ejecutar una consulta de streaming en Databricks Runtime 14.1 o inferior, se omiten los archivos de metadatos de estado existentes y no se escriben nuevos archivos de metadatos de estado.
Puede leer la información de metadatos de estado de las consultas de Structured Streaming que se ejecutan en Databricks Runtime 14.2 o superior. Use la sintaxis siguiente:
Pitón
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Los datos devueltos tienen el siguiente esquema:
| Columna | Tipo | Descripción |
|---|---|---|
operatorId |
Entero | El identificador entero del operador de streaming con estado. |
operatorName |
Entero | Nombre del operador de streaming con estado. |
stateStoreName |
Cadena | Nombre del almacén de estado del operador. |
numPartitions |
Entero | Número de particiones del almacén de estado. |
minBatchId |
Largo | Identificador de lote mínimo disponible para el estado de consulta. |
maxBatchId |
Largo | Identificador de lote máximo disponible para el estado de la consulta. |
Nota:
Los valores de identificador de lote proporcionados por minBatchId y maxBatchId reflejan el estado en el momento en que se escribió el punto de control. Los lotes antiguos se limpian automáticamente con la ejecución de microproceso, por lo que no se garantiza que el valor proporcionado aquí siga estando disponible.
Consulte read_state_metadata función con valores de tabla.
Ejemplo: Consultar un lado de una unión de flujos
Use la siguiente sintaxis para consultar el lado izquierdo de una unión de flujo-flujo:
Pitón
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Ejemplo: Consulta del almacén de estado para la secuencia con varios operadores con estado
En este ejemplo se usa el lector de metadatos de estado para recopilar detalles de metadatos de una consulta de streaming con varios operadores con estado y, a continuación, se usan los resultados de los metadatos como opciones para el lector de estado.
El lector de metadatos de estado toma la ruta de acceso del punto de control como la única opción, como en el ejemplo de sintaxis siguiente:
Pitón
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
En la tabla siguiente se representa una salida de ejemplo de los metadatos del almacén de estado:
| operatorId | NombreDelOperador | stateStoreName | numPartitions | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | Predeterminado. | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | Predeterminado. | 200 | 0 | 13 |
Para obtener los resultados del dedupeWithinWatermark operador, consulte el lector de estado con la operatorId opción , como en el ejemplo siguiente:
Pitón
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);