Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Note
Databricks recommande d’utiliser transformWithState pour générer des applications avec état personnalisées. Consultez Générer une application avec état personnalisé.
Cet article contient des informations sur les fonctionnalités qui prennent en charge mapGroupsWithState, et flatMapGroupsWithState. Pour plus d’informations sur ces opérateurs, consultez le lien.
Spécifier l’état initial pour mapGroupsWithState
Vous pouvez spécifier un état initial défini par l'utilisateur pour le traitement stateful de Structured Streaming à l'aide de flatMapGroupsWithState ou de mapGroupsWithState. Cela vous permet d’éviter de traiter à nouveau les données lors du démarrage d’un flux avec état sans point de contrôle valide.
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
Exemple de cas d’usage qui spécifie un état initial à l’opérateur flatMapGroupsWithState :
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Exemple de cas d’usage qui spécifie un état initial à l’opérateur mapGroupsWithState :
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Tester la mapGroupsWithState fonction de mise à jour
L’API TestGroupState vous permet de tester la fonction de mise à jour d’état utilisée pour Dataset.groupByKey(...).mapGroupsWithState(...) et Dataset.groupByKey(...).flatMapGroupsWithState(...).
La fonction de mise à jour d’état prend l’état précédent comme entrée à l’aide d’un objet de type GroupState. Consultez la documentation de référence d’Apache Spark GroupState. Par exemple:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}