Freigeben über


Veraltete beliebige zustandsbehaftete Operatoren

Hinweis

Databricks empfiehlt, transformWithState zum Erstellen benutzerdefinierter zustandsbehafteter Anwendungen zu verwenden. Siehe Erstellen einer benutzerdefinierten zustandsbehafteten Anwendung.

Dieser Artikel enthält Informationen zu Features, die mapGroupsWithState und flatMapGroupsWithState unterstützen. Weitere Informationen zu diesen Operatoren finden Sie unter "Link".

Angeben des Anfangszustands für mapGroupsWithState

Sie können einen benutzerdefinierten Anfangszustand für die Verarbeitung des strukturierten Streamings mithilfe von flatMapGroupsWithState oder mapGroupsWithState angeben. Auf diese Weise können Sie vermeiden, Daten erneut zu verarbeiten, wenn Sie einen zustandsbehafteten Stream ohne gültigen Prüfpunkt starten.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Beispielanwendungsfall, der einen Anfangszustand für den flatMapGroupsWithState Operator angibt:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Beispielanwendungsfall, der einen Anfangszustand für den mapGroupsWithState Operator angibt:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Testen der mapGroupsWithState Updatefunktion

Die TestGroupState API ermöglicht es Ihnen, die Zustandsaktualisierungsfunktion zu testen, die für Dataset.groupByKey(...).mapGroupsWithState(...) und Dataset.groupByKey(...).flatMapGroupsWithState(...) verwendet wird.

Die Statusaktualisierungsfunktion verwendet den vorherigen Zustand als Eingabe mithilfe eines Typobjekts GroupState. Weitere Informationen finden Sie in der Referenzdokumentation zu Apache Spark GroupState. Beispiel:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}