Compartir a través de


Operadores heredados con estado arbitrario

Nota:

Databricks recomienda usar transformWithState para compilar aplicaciones con estado personalizadas. Consulte Compilación de una aplicación con estado personalizada.

En este artículo se proporciona información sobre las características que admiten mapGroupsWithStatey flatMapGroupsWithState. Para obtener más información sobre estos operadores, consulte el vínculo.

Especificar el estado inicial de mapGroupsWithState

Puede especificar un estado inicial definido por el usuario para el procesamiento con estado de Structured Streaming mediante flatMapGroupsWithStateo mapGroupsWithState. Esto le permite evitar volver a procesar los datos al iniciar una secuencia con estado sin un punto de control válido.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Caso de uso de ejemplo que especifica un estado inicial para el flatMapGroupsWithState operador:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Caso de uso de ejemplo que especifica un estado inicial para el mapGroupsWithState operador:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Prueba de la mapGroupsWithState función de actualización

La TestGroupState API le permite probar la función de actualización de estado usada para Dataset.groupByKey(...).mapGroupsWithState(...) y Dataset.groupByKey(...).flatMapGroupsWithState(...).

La función de actualización de estado toma el estado anterior como entrada mediante un objeto de tipo GroupState. Consulte la documentación de referencia sobre GroupState de Apache Spark. Por ejemplo:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}