備註
Databricks 建議使用 transformWithState 來建立自訂的有狀態應用程式。 請參閱 建置自定義具狀態應用程式。
本文提供支援 mapGroupsWithState、 和 flatMapGroupsWithState的特徵資訊。 欲了解更多相關操作員資訊,請參閱 連結。
指定初始狀態 mapGroupsWithState
你可以指定使用者自定義的初始狀態,用於結構化流的有狀態處理,並使用flatMapGroupsWithState或mapGroupsWithState。 這讓你在沒有有效檢查點的情況下啟動有狀態串流時,避免重新處理資料。
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
指定運算元初始狀態 flatMapGroupsWithState 的範例使用案例:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
指定運算元初始狀態 mapGroupsWithState 的範例使用案例:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
測試 mapGroupsWithState 更新函數
API TestGroupState 讓你能測試用於 Dataset.groupByKey(...).mapGroupsWithState(...) 和 Dataset.groupByKey(...).flatMapGroupsWithState(...)的狀態更新函式。
狀態更新函數將前一個狀態作為輸入,使用的是型別為 GroupState 的物件。 請參閱 Apache Spark GroupState 參考文件。 例如:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}