비고
Databricks는 사용자 지정 상태 저장 애플리케이션을 빌드하는 데 사용하는 transformWithState 것이 좋습니다.
사용자 지정 상태 저장 애플리케이션빌드를 참조하세요.
이 문서에는 mapGroupsWithState 및 flatMapGroupsWithState을(를) 지원하는 기능에 대한 정보가 포함되어 있습니다. 이러한 연산자에 대한 자세한 내용은 링크를 참조하세요.
에 대한 초기 상태 지정 mapGroupsWithState
또는 flatMapGroupsWithState를 사용하여 mapGroupsWithState구조적 스트리밍 상태 저장 처리에 대한 사용자 정의 초기 상태를 지정할 수 있습니다. 이렇게 하면 유효한 검사점 없이 상태 저장 스트림을 시작할 때 데이터를 다시 처리하지 않도록 할 수 있습니다.
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
연산자에 대한 초기 상태를 flatMapGroupsWithState 지정하는 예제 사용 사례:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
연산자에 대한 초기 상태를 mapGroupsWithState 지정하는 예제 사용 사례:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
mapGroupsWithState 업데이트 함수 테스트
TestGroupState API를 사용하면 Dataset.groupByKey(...).mapGroupsWithState(...)와 Dataset.groupByKey(...).flatMapGroupsWithState(...)에 사용되는 상태 업데이트 함수를 테스트할 수 있습니다.
상태 업데이트 함수는 형식 GroupState의 개체를 사용하여 이전 상태를 입력으로 사용합니다. Apache Spark GroupState 참조 설명서를 참조하세요. 다음은 그 예입니다.
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}