Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Las consultas de Structured Streaming con estado requieren que se realicen actualizaciones incrementales de la información de estado intermedio. En cambio, las consultas de Structured Streaming sin estado solo realizan un seguimiento de las filas que se han procesado desde el origen al receptor.
Las operaciones con estado incluyen agregación de streaming, streaming de dropDuplicates, combinaciones de secuencias y aplicaciones con estado personalizadas.
La información de estado intermedia necesaria para las consultas de streaming estructurado con estado puede provocar problemas inesperados de latencia y producción si no está configurado correctamente.
En Databricks Runtime 13.3 LTS y versiones posteriores, puede habilitar los puntos de control del registro de cambios con RocksDB para reducir la duración del punto de control y la latencia de un extremo a otro para cargas de trabajo de flujo estructurado. Databricks recomienda habilitar los puntos de control del registro de cambios para todas las consultas con estado de Structured Streaming. Consulte Habilitar puntos de control del registro de cambios.
Optimización de las consultas de Structured Streaming con estado
Administrar la información de estado intermedio de las consultas de Structured Streaming con estado puede ayudarle a evitar problemas inesperados de latencia y producción.
Procedimientos recomendados para Databricks:
- Use las instancias optimizadas para proceso como roles de trabajo.
- Establezca el número de particiones aleatorias en 1-2 veces el número de núcleos del clúster.
- Establezca la configuración de
spark.sql.streaming.noDataMicroBatches.enabledenfalseen SparkSession. Esto evita que el motor de microlotes de streaming procese microlotes que no contienen datos. Tenga en cuenta también que establecer esta configuración enfalsepodría dar lugar a operaciones con estado que usan marcas de agua o tiempos de espera de procesamiento para no obtener la salida de datos hasta que lleguen nuevos datos en lugar de inmediatamente.
Databricks recomienda usar RocksDB con puntos de comprobación del registro de cambios para administrar el estado de las secuencias con estado. Consulte Configuración de almacenes de estados de RocksDB en Azure Databricks.
Nota:
El esquema de administración de estado no se puede cambiar de un reinicio de consulta al siguiente. Si se ha iniciado una consulta con la administración predeterminada, debe reiniciarla desde cero con una nueva ubicación de punto de control para cambiar el almacén de estado.
Trabajar con múltiples operadores con estado en Structured Streaming
En Databricks Runtime 13.3 LTS y versiones posteriores, Azure Databricks ofrece soporte avanzado para operadores con estado en cargas de trabajo de Structured Streaming. Ahora puede encadenar varios operadores con estado, lo que significa que puede alimentar la salida de una operación, como una agregación con ventanas, a otra operación con estado, como una combinación.
En Databricks Runtime 16.2 y versiones posteriores, puede usar transformWithState en cargas de trabajo con varios operadores con estado. Consulte Compilación de una aplicación con estado personalizada.
Los siguientes ejemplos muestran varios patrones que puede utilizar.
Importante
Existen las siguientes limitaciones cuando se trabaja con múltiples operadores con estado:
- Operadores con estado personalizados heredados (
FlatMapGroupWithStateyapplyInPandasWithStateno se admiten). - Solo se admite el modo de anexar salida.
Agregación de ventanas de tiempo encadenadas
Pitón
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Agregación de ventana de tiempo en dos flujos diferentes seguida de una unión de intervalos de tiempo flujo-flujo
Pitón
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Combinación de intervalos de tiempo flujo-flujo seguida de agregación de ventanas de tiempo
Pitón
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Reequilibrio de estado para el flujo estructurado
El reequilibrio de estado está habilitado de forma predeterminada para todos los trabajos de streaming en las canalizaciones declarativas de Lakeflow Spark. En Databricks Runtime 11.3 LTS y versiones posteriores, puede establecer la siguiente opción de configuración en la configuración del clúster de Spark para habilitar el reequilibrio de estado:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
El reequilibrio de estado beneficia a las canalizaciones de Structured Streaming con estado que se someten a eventos de cambio de tamaño del clúster. Las operaciones de streaming sin estado no se benefician, independientemente de cambiar los tamaños del clúster.
Nota:
El escalado automático de proceso tiene limitaciones al reducir verticalmente el tamaño del clúster para cargas de trabajo de Structured Streaming. Databricks recomienda usar canalizaciones declarativas de Spark de Lakeflow con escalado automático mejorado para cargas de trabajo de streaming. Consulte Optimización del uso del clúster de canalizaciones declarativas de Spark de Lakeflow con escalado automático.
Los eventos de cambio de tamaño del clúster desencadenan el reequilibrio de estado. Los microprocesos pueden tener una mayor latencia durante los eventos de reequilibrio a medida que el estado se carga desde el almacenamiento en la nube a los nuevos ejecutores.