다음을 통해 공유


레거시 임의 상태 저장 연산자

비고

Databricks는 사용자 지정 상태 저장 애플리케이션을 빌드하는 데 사용하는 transformWithState 것이 좋습니다. 사용자 지정 상태 저장 애플리케이션빌드를 참조하세요.

이 문서에는 mapGroupsWithStateflatMapGroupsWithState을(를) 지원하는 기능에 대한 정보가 포함되어 있습니다. 이러한 연산자에 대한 자세한 내용은 링크를 참조하세요.

에 대한 초기 상태 지정 mapGroupsWithState

또는 flatMapGroupsWithState를 사용하여 mapGroupsWithState구조적 스트리밍 상태 저장 처리에 대한 사용자 정의 초기 상태를 지정할 수 있습니다. 이렇게 하면 유효한 검사점 없이 상태 저장 스트림을 시작할 때 데이터를 다시 처리하지 않도록 할 수 있습니다.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

연산자에 대한 초기 상태를 flatMapGroupsWithState 지정하는 예제 사용 사례:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

연산자에 대한 초기 상태를 mapGroupsWithState 지정하는 예제 사용 사례:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

mapGroupsWithState 업데이트 함수 테스트

TestGroupState API를 사용하면 Dataset.groupByKey(...).mapGroupsWithState(...)Dataset.groupByKey(...).flatMapGroupsWithState(...)에 사용되는 상태 업데이트 함수를 테스트할 수 있습니다.

상태 업데이트 함수는 형식 GroupState의 개체를 사용하여 이전 상태를 입력으로 사용합니다. Apache Spark GroupState 참조 설명서를 참조하세요. 다음은 그 예입니다.

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}