Compartilhar via


O que é streaming com estado?

Uma consulta de Streaming Estruturado com estado requer atualizações incrementais para informações de estado intermediário, enquanto uma consulta de Streaming Estruturado sem estado rastreia apenas informações sobre quais linhas foram processadas da origem para o coletor.

As operações com estado incluem agregação de streaming, streaming dropDuplicates, junções de fluxo a fluxo e aplicações com estado personalizadas.

As informações de estado intermediário necessárias para consultas de Streaming Estruturado com estado podem levar a problemas inesperados de latência e produção se configurados incorretamente.

No Databricks Runtime 13.3 LTS e superior, você pode habilitar o ponto de verificação do changelog com RocksDB para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho de streaming estruturado. O Databricks recomenda habilitar o ponto de verificação do changelog para todas as consultas com estado de Streaming Estruturado. Confira Habilitar o ponto de verificação do log de alterações.

Otimizar consultas de Streaming Estruturado com estado

Gerenciar as informações de estado intermediário de consultas de Streaming Estruturado com estado pode ajudar a evitar problemas inesperados de latência e produção.

O Databricks recomenda:

  • Use instâncias otimizadas para computação como trabalhos.
  • Defina o número de partições embaralhadas como 1 a 2 vezes o número de núcleos no cluster.
  • Definir a configuração spark.sql.streaming.noDataMicroBatches.enabled como false na SparkSession. Isso impede que o mecanismo de micro lote de streaming processe micro lotes que não contêm dados. Observe também que definir essa configuração false pode resultar em operações com estado que usam marcas d'água ou tempos limite de processamento para não obter a saída de dados até que novos dados cheguem em vez de imediatamente.

O Databricks recomenda usar o RocksDB com o ponto de verificação do changelog para gerenciar o estado dos fluxos com estado. Confira Configurar o repositório de estado do RocksDB no Azure Databricks.

Observação

O esquema de gerenciamento de estado não pode ser alterado entre as reinicializações da consulta. Se uma consulta tiver sido iniciada com o gerenciamento padrão, você deverá reiniciá-la do zero com um novo local de ponto de verificação para alterar o repositório de estado.

Trabalhar com vários operadores com estado no Fluxo Estruturado

No Databricks Runtime 13.3 LTS e superior, o Azure Databricks oferece suporte avançado para operadores com estado em cargas de trabalho de streaming estruturado. Agora você pode encadear múltiplos operadores com estado, o que significa que é possível encaminhar a saída de uma operação, como uma agregação em janela, para outra operação com estado, como uma junção.

No Databricks Runtime 16.2 e superior, você pode usar transformWithState em cargas de trabalho com vários operadores com estado. Consulte Criar um aplicativo com estado personalizado.

Os exemplos a seguir demonstram vários padrões que você pode usar.

Importante

As seguintes limitações existem ao trabalhar com vários operadores com estado:

  • Operadores personalizados herdados com estado (FlatMapGroupWithState e applyInPandasWithState) não são suportados.
  • Há suporte apenas para o modo de saída de acréscimo.

Agregação de janela de tempo encadeada

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala (linguagem de programação)

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agregação de janela de tempo em dois fluxos diferentes seguidos de junção de fluxo a fluxo de janela

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala (linguagem de programação)

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Junção do intervalo de tempo de fluxo a fluxo seguido pela agregação da janela de tempo

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala (linguagem de programação)

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Rebalanceamento de estado para Streaming Estruturado

O rebalanceamento de estado é habilitado por padrão para todas as cargas de trabalho de streaming em Lakeflow Spark Declarative Pipelines. No Databricks Runtime 11.3 LTS e superior, você pode definir a seguinte opção de configuração na configuração do cluster Spark para habilitar o reequilíbrio de estado:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

O rebalanceamento de estado beneficia pipelines de Streaming Estruturado com estado que passam por eventos de redimensionamento de cluster. As operações de streaming sem estado não se beneficiam, independentemente da alteração do tamanhos do cluster.

Observação

O dimensionamento automático de computação tem limitação ao reduzir o tamanho do cluster para cargas de Fluxo Estruturado. O Databricks recomenda usar os pipelines declarativos do Lakeflow Spark com dimensionamento automático aprimorado para cargas de trabalho de streaming. Consulte Otimizar a utilização de cluster de Pipelines Declarativos do Lakeflow Spark com dimensionamento automático.

Os eventos de redimensionamento do cluster acionam o reequilíbrio de estado. Os microlotes podem ter maior latência durante o reequilíbrio de eventos à medida que o estado é carregado do armazenamento em nuvem para os novos executores.