Compartilhar via


Operadores herdados de estado arbitrário

Observação

O Databricks recomenda usar transformWithState para criar aplicativos com estado personalizados. Consulte Criar um aplicativo com estado personalizado.

Este artigo tem informações sobre recursos que dão suporte mapGroupsWithStatee flatMapGroupsWithState. Para obter mais detalhes sobre esses operadores, consulte link.

Especificar o estado inicial para mapGroupsWithState

Você pode especificar um estado inicial definido pelo usuário para processamento com estado de Streaming Estruturado usando flatMapGroupsWithStateou mapGroupsWithState. Isso permite que você evite reprocessar dados ao iniciar um fluxo com estado sem um ponto de verificação válido.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Caso de uso de exemplo que especifica um estado inicial para o flatMapGroupsWithState operador:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Caso de uso de exemplo que especifica um estado inicial para o mapGroupsWithState operador:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Testar a mapGroupsWithState função de atualização

A TestGroupState API permite que você teste a função de atualização de estado usada para Dataset.groupByKey(...).mapGroupsWithState(...) e Dataset.groupByKey(...).flatMapGroupsWithState(...).

A função de atualização de estado usa o estado anterior como entrada usando um objeto do tipo GroupState. Consulte a documentação de referência do Apache Spark GroupState. Por exemplo:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}