다음을 통해 공유


사용자 지정 상태 저장 애플리케이션 구축하기

중요합니다

이 기능은 Databricks Runtime 16.2 이상에서 공개 미리보기 상태에 있습니다.

사용자 지정 상태 저장 연산자를 사용하여 스트리밍 애플리케이션을 빌드하여 대기 시간이 짧고 임의 상태 저장 논리를 사용하는 거의 실시간 솔루션을 구현할 수 있습니다. 사용자 지정 상태 저장 연산자는 기존 구조적 스트리밍 처리를 통해 사용할 수 없는 새로운 운영 사용 사례 및 패턴을 잠금 해제합니다.

비고

Databricks는 집계, 중복 제거 및 스트리밍 조인과 같은 지원되는 상태 저장 작업에 기본 제공 구조적 스트리밍 기능을 사용하는 것이 좋습니다. 상태 저장 스트리밍이란?을 참조하세요.

Databricks는 임의 상태 변환에 레거시 연산자보다 transformWithState 사용하는 것이 좋습니다. 레거시 flatMapGroupsWithStatemapGroupsWithState 연산자 설명서는 레거시 임의 상태 저장 연산자 참조하세요.

요구 사항

transformWithState 연산자와 관련 API 및 클래스에는 다음과 같은 요구 사항이 있습니다.

  • Databricks Runtime 16.2 이상에서 사용할 수 있습니다.
  • Databricks Runtime 16.3 이상의 Python()과 Databricks Runtime 17.3 이상의 Scala(transformWithStateInPandastransformWithState)에 대해 표준 액세스 모드가 지원되는 경우를 제외하고 컴퓨팅은 전용 또는 격리되지 않은 액세스 모드를 사용해야 합니다.
  • RocksDB 상태 저장소 공급자를 사용해야 합니다. Databricks는 컴퓨팅 구성의 일부로 RocksDB를 사용하도록 설정하는 것이 좋습니다.

비고

현재 세션에 대해 RocksDB 상태 저장소 공급자를 사용하도록 설정하려면 다음을 실행합니다.

spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

transformWithState란?

transformWithState 연산자는 구조적 스트리밍 쿼리에 사용자 지정 상태 저장 프로세서를 적용합니다. transformWithState사용하려면 사용자 지정 상태 저장 프로세서를 구현해야 합니다. 구조적 스트리밍에는 Python, Scala 또는 Java를 사용하여 상태 저장 프로세서를 빌드하기 위한 API가 포함됩니다.

transformWithState 사용하여 구조적 스트리밍으로 증분 방식으로 처리되는 레코드의 그룹화 키에 사용자 지정 논리를 적용합니다. 다음은 고급 디자인에 대해 설명합니다.

  • 하나 이상의 상태 변수를 정의합니다.
  • 상태 정보는 각 그룹화 키에 대해 유지되며 사용자 정의 논리에 따라 각 상태 변수에 액세스할 수 있습니다.
  • 처리된 각 마이크로 배치마다 키의 모든 레코드가 반복자로 제공됩니다.
  • 기본 제공 핸들을 사용하여 타이머 및 사용자 정의 조건에 따라 레코드를 내보내는 시기와 방법을 제어합니다.
  • 상태 값은 개별 TTL(Time to Live) 정의를 지원하므로 상태 만료 및 상태 크기를 유연하게 관리할 수 있습니다.

transformWithState 상태 저장소에서 스키마 진화를 지원하므로 기록 상태 정보를 손실하거나 레코드를 다시 처리할 필요 없이 프로덕션 애플리케이션을 반복하고 업데이트하여 개발 및 유지 관리의 용이성을 제공합니다. 상태 저장소 에서스키마 진화를 참조하세요.

중요합니다

PySpark는 transformWithStateInPandas대신 연산자 transformWithState 사용합니다. Azure Databricks 설명서는 transformWithState 사용하여 Python 및 Scala 구현 모두에 대한 기능을 설명합니다.

transformWithState 및 관련 API의 Scala 및 Python 구현은 언어에 따라 다르지만 동일한 기능을 제공합니다. 기본 프로그래밍 언어에 대한 언어별 예제 및 API 설명서를 참조하세요.

기본 제공 처리 기능

핸들러 를 구현하고 기본 제공 핸들 를 사용하여을 통해 사용자 지정 상태 저장 애플리케이션의 핵심 논리를 구현합니다.

  • 핸들은 상태 값 및 타이머와 상호 작용하고, 들어오는 레코드를 처리하고, 레코드를 내보내는 메서드를 제공합니다.
  • 처리기는 사용자 지정 이벤트 기반 논리를 정의합니다.

각 상태 형식에 대한 핸들은 기본 데이터 구조를 기반으로 구현되지만 각각에는 레코드를 가져오고, 배치하고, 업데이트하고, 삭제하는 기능이 포함되어 있습니다.

처리기는 다음 의미 체계를 사용하여 입력 레코드 또는 타이머에서 관찰된 이벤트를 기반으로 구현됩니다.

  • handleInputRows 메서드를 사용하여 데이터를 처리하는 방법을 제어하고, 상태를 업데이트하고, 그룹화 키에 대해 처리되는 레코드의 각 마이크로 일괄 처리에 대해 레코드를 내보내는 처리기를 정의합니다. 입력 행 처리을 참조하세요.
  • handleExpiredTimer 메서드를 사용하여 시간 기반 임계값을 사용하여 그룹화 키에 대한 추가 레코드가 처리되는지 여부에 관계없이 논리를 실행하는 처리기를 정의합니다. 프로그램 시간 제한 이벤트을 참조하세요.

다음 표에서는 이러한 처리기에서 지원하는 기능 동작을 비교합니다.

행동 handleInputRows handleExpiredTimer
상태 값 가져오기, 배치, 업데이트 또는 지우기
타이머 만들기 또는 삭제
레코드 내보내기
현재 마이크로 배치의 레코드를 순회합니다. 아니오
시간 경과를 바탕으로 트리거 논리 아니오

필요에 따라 handleInputRowshandleExpiredTimer 결합하여 복잡한 논리를 구현할 수 있습니다.

예를 들어 handleInputRows 사용하여 각 마이크로 일괄 처리에 대한 상태 값을 업데이트하고 나중에 10초 타이머를 설정하는 애플리케이션을 구현할 수 있습니다. 추가 레코드가 처리되지 않는 경우 handleExpiredTimer 사용하여 상태 저장소에서 현재 값을 내보낼 수 있습니다. 그룹화 키에 대해 새 레코드가 처리되는 경우 기존 타이머를 지우고 새 타이머를 설정할 수 있습니다.

사용자 지정 상태 유형

단일 상태 저장 연산자에서 여러 상태 개체를 구현할 수 있습니다. 각 상태 개체에 지정하는 이름은 상태 저장소에 유지되며 상태 저장소 판독기를 사용하여 액세스할 수 있습니다. 상태 개체가 StructType사용하는 경우 스키마를 전달하는 동안 구조체의 각 필드에 대한 이름을 제공합니다. 이러한 이름은 상태 저장소를 읽을 때도 표시됩니다. 구조적 스트리밍 상태 정보 읽기를 참조하세요.

기본 제공 클래스 및 연산자가 제공하는 기능은 유연성과 확장성을 제공하기 위한 것이며, 구현 선택은 애플리케이션이 실행해야 하는 전체 논리를 통해 알려야 합니다. 예를 들어, 필드 ValueStateuser_id에 의해 그룹화된 session_id을 사용하거나, MapState의 키인 user_id에 대해 session_id로 그룹화된 MapState을 사용하여 거의 동일한 논리를 구현할 수 있습니다. 이 경우, 논리가 여러 MapState에 걸쳐 조건을 평가해야 한다면 session_id를 선호하는 구현으로 사용할 수 있습니다.

다음 섹션에서는 transformWithState지원되는 상태 유형에 대해 설명합니다.

ValueState

각 그룹화 키에 대해 연결된 값이 있습니다.

값 상태에는 구조체 또는 튜플과 같은 복합 형식이 포함될 수 있습니다. ValueState업데이트할 때 전체 값을 대체하는 논리를 구현합니다. 값 상태에 대한 TTL은 값이 업데이트될 때 다시 설정되지만 저장된 ValueState업데이트하지 않고 ValueState 일치하는 원본 키가 처리되면 다시 설정되지 않습니다.

ListState

각 그룹화 키에 대해 연결된 목록이 있습니다.

목록 상태는 각각 복합 형식을 포함할 수 있는 값의 컬렉션입니다. 목록의 각 값에는 자체 TTL이 있습니다. 목록에 항목을 추가할 때 개별 항목을 추가하거나, 여러 항목을 한 번에 추가하거나, put을(를) 사용하여 전체 목록을 덮어쓸 수 있습니다. put 작업만 TTL을 다시 설정하기 위한 업데이트로 간주됩니다.

지도 상태

각 그룹화 키에는 연결된 맵이 있습니다. 맵은 Python 받아쓰기와 동일한 Apache Spark 기능입니다.

중요합니다

그룹화 키는 구조적 스트리밍 쿼리의 GROUP BY 절에 지정된 필드를 설명합니다. 맵 상태에는 그룹화 키에 대한 임의의 수의 키-값 쌍이 포함됩니다.

예를 들어, 당신이 user_id으로 그룹화하고 각 session_id에 대한 맵을 정의하려는 경우, 당신의 그룹화 키는 user_id이며, 맵의 키는 session_id입니다.

맵 상태는 각각 복합 형식을 포함할 수 있는 값에 매핑하는 고유 키의 컬렉션입니다. 맵의 각 키-값 쌍에는 자체 TTL이 있습니다. 특정 키의 값을 업데이트하거나 키와 해당 값을 제거할 수 있습니다. 해당 키를 사용하여 개별 값을 반환하거나, 모든 키를 나열하거나, 모든 값을 나열하거나, 반복기를 반환하여 맵에서 전체 키-값 쌍 집합을 사용할 수 있습니다.

사용자 지정 상태 변수 초기화

StatefulProcessor초기화할 때 사용자 지정 논리에서 상태 개체와 상호 작용할 수 있는 각 상태 개체에 대한 지역 변수를 만듭니다. 상태 변수는 init 클래스에서 기본 제공 StatefulProcessor 메서드를 재정의하여 정의되고 초기화됩니다.

getValueState초기화하는 동안 getListState, getMapStateStatefulProcessor 메서드를 사용하여 임의의 양의 상태 개체를 정의합니다.

각 상태 개체에는 다음이 있어야 합니다.

  • 고유한 이름
  • 지정된 스키마
    • Python에서 스키마는 명시적으로 지정됩니다.
    • Scala에서 상태 스키마를 지정하는 Encoder 전달합니다.

선택적 TTL(Time-to-Live) 기간을 밀리초 단위로 제공할 수도 있습니다. 지도 상태를 구현하는 경우 지도 키와 값에 대해 별도의 스키마 정의를 제공해야 합니다.

비고

상태 정보를 쿼리, 업데이트 및 내보내는 방법에 대한 논리는 별도로 처리됩니다. 상태 변수를 사용하십시오.을 참조하세요.

상태 저장 애플리케이션 예제

다음은 지원되는 각 형식에 대한 예제 상태 변수를 포함하여 transformWithState사용자 지정 상태 저장 프로세서를 정의하고 사용하기 위한 기본 구문을 보여 줍니다. 자세한 예제는 상태 저장 애플리케이션예제를 참조하세요.

비고

Python은 상태 값과의 모든 상호 작용에 튜플을 사용합니다. 즉, Python 코드는 putupdate 같은 작업을 사용할 때 튜플을 사용하여 값을 전달해야 하며 get사용할 때 튜플을 처리해야 합니다.

예를 들어 값 상태에 대한 스키마가 단일 정수일 경우 다음과 같은 코드를 구현합니다.

current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0]  # Extracts the first item in the tuple
new_value = current_value + 1           # Calculate a new value
value_state.update((new_value,))        # Pass the new value formatted as a tuple

ListState 항목 또는 MapState값에도 마찬가지입니다.

파이썬

import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator

spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

output_schema = StructType(
    [
        StructField("id", StringType(), True),
        StructField("countAsString", StringType(), True),
    ]
)

class SimpleCounterProcessor(StatefulProcessor):
  def init(self, handle: StatefulProcessorHandle) -> None:
    value_state_schema = StructType([StructField("count", IntegerType(), True)])
    list_state_schema = StructType([StructField("count", IntegerType(), True)])
    self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
    self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
    # Schema can also be defined using strings and SQL DDL syntax
    self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")

  def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
    count = 0
    for pdf in rows:
      list_state_rows = [(120,), (20,)] # A list of tuples
      self.list_state.put(list_state_rows)
      self.list_state.appendValue((111,))
      self.list_state.appendList(list_state_rows)
      pdf_count = pdf.count()
      count += pdf_count.get("value")
    self.value_state.update((count,)) # Count is passed as a tuple
    iter = self.list_state.get()
    list_state_value = next(iter1)[0]
    value = count
    user_key = ("user_key",)
    if self.map_state.exists():
      if self.map_state.containsKey(user_key):
        value += self.map_state.getValue(user_key)[0]
    self.map_state.updateValue(user_key, (value,)) # Value is a tuple
    yield pd.DataFrame({"id": key, "countAsString": str(count)})

q = (df.groupBy("key")
  .transformWithStateInPandas(
    statefulProcessor=SimpleCounterProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="None",
  )
  .writeStream...
)

스칼라

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
  @transient private var countState: ValueState[Int] = _
  @transient private var listState: ListState[Int] = _
  @transient private var mapState: MapState[String, Int] = _

  override def init(
      outputMode: OutputMode,
      timeMode: TimeMode): Unit = {
    countState = getHandle.getValueState[Int]("countState",
      Encoders.scalaLong, TTLConfig.NONE)
    listState = getHandle.getListState[Int]("listState",
      Encoders.scalaInt, TTLConfig.NONE)
    mapState = getHandle.getMapState[String, Int]("mapState",
      Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
  }

  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, String)],
      timerValues: TimerValues): Iterator[(String, String)] = {
    var count = countState.getOption().getOrElse(0)
    for (row <- inputRows) {
      val listData = Array(120, 20)
      listState.put(listData)
      listState.appendValue(count)
      listState.appendList(listData)
      count += 1
    }
    val iter = listState.get()
    var listStateValue = 0
    if (iter.hasNext) {
      listStateValue = iter.next()
    }
    countState.update(count)
    var value = count
    val userKey = "userKey"
    if (mapState.exists()) {
      if (mapState.containsKey(userKey)) {
        value += mapState.getValue(userKey)
      }
    }
    mapState.updateValue(userKey, value)
    Iterator((key, count.toString))
  }
}

val q = spark
        .readStream
        .format("delta")
        .load("$srcDeltaTableDir")
        .as[(String, String)]
        .groupByKey(x => x._1)
        .transformWithState(
            new SimpleCounterProcessor(),
            TimeMode.None(),
            OutputMode.Update(),
        )
        .writeStream...

StatefulProcessorHandle (상태 저장 프로세서 핸들)

PySpark에는 사용자 정의 Python 코드가 상태 정보와 상호 작용하는 방법을 제어하는 함수에 대한 액세스를 제공하는 StatefulProcessorHandle 클래스가 포함되어 있습니다. StatefulProcessorHandle초기화할 때 항상 handle 가져와서 StatefulProcessor 변수에 전달해야 합니다.

handle 변수는 Python 클래스의 지역 변수를 상태 변수와 연결합니다.

비고

Scala는 getHandle 메서드를 사용합니다.

초기 상태 지정

필요에 따라 첫 번째 마이크로 일괄 처리와 함께 사용할 초기 상태를 제공할 수 있습니다. 기존 워크플로를 새 사용자 지정 애플리케이션으로 마이그레이션하거나, 상태 저장 연산자를 업그레이드하여 스키마 또는 논리를 변경하거나, 자동으로 복구할 수 없고 수동 개입이 필요한 오류를 복구할 때 유용할 수 있습니다.

비고

상태 저장소 판독기를 사용하여 기존 검사점에서 상태 정보를 쿼리합니다. 구조적 스트리밍 상태 정보 읽기를 참조하세요.

기존 Delta 테이블을 상태 저장 애플리케이션으로 변환하는 경우 spark.read.table("table_name") 사용하여 테이블을 읽고 결과 DataFrame을 전달합니다. 필요에 따라 필드를 선택하거나 수정하여 새로운 상태 저장 애플리케이션에 맞출 수 있습니다.

입력 행과 동일한 그룹화 키 스키마가 있는 DataFrame을 사용하여 초기 상태를 제공합니다.

비고

Python은 handleInitialState 사용하여 StatefulProcessor정의하는 동안 초기 상태를 지정합니다. Scala는 고유 클래스 StatefulProcessorWithInitialState사용합니다.

상태 변수 사용

지원되는 상태 개체는 상태를 가져오거나, 기존 상태 정보를 업데이트하거나, 현재 상태를 지우는 메서드를 제공합니다. 지원되는 각 상태 형식에는 구현된 데이터 구조에 해당하는 메서드의 고유한 구현이 있습니다.

관찰된 각 그룹화 키에는 전용 상태 정보가 있습니다.

  • 레코드는 사용자가 구현하는 논리와 지정한 출력 스키마를 사용하여 내보내집니다. 내보내기 기록을 참조하세요.
  • statestore 판독기를 사용하여 상태 저장소의 값에 액세스할 수 있습니다. 이 판독기는 일괄 처리 기능을 사용하며 짧은 대기 시간 워크로드를 위한 것이 아닙니다. 구조적 스트리밍 상태 정보 읽기를 참조하세요.
  • handleInputRows 사용하여 지정된 논리는 키와 관련된 레코드가 마이크로 일괄 처리에 있는 경우에만 실행됩니다. 입력 행 처리을 참조하세요.
  • handleExpiredTimer을(를) 사용하여 실행될 레코드를 관찰하는 데 의존하지 않는 시간 기반 논리를 구현하십시오. 프로그램 시간 제한 이벤트을 참조하세요.

비고

상태 개체는 다음과 같은 의미를 갖는 키를 그룹화하여 격리됩니다.

  • 상태 값은 다른 그룹화 키와 연결된 레코드의 영향을 받을 수 없습니다.
  • 그룹화 키 간에 값을 비교하거나 상태를 업데이트하는 데 의존하는 논리를 구현할 없습니다.

그룹화 키 내의 값을 비교할 수 있습니다. MapState 사용하여 사용자 지정 논리에서 사용할 수 있는 두 번째 키를 사용하여 논리를 구현합니다. 예를 들어 user_id 그룹화하고 IP 주소를 사용하여 MapState 키 지정하면 동시 사용자 세션을 추적하는 논리를 구현할 수 있습니다.

상태 관리를 위한 고급 고려사항

상태 변수에 쓰면 RocksDB에 대한 쓰기가 트리거됩니다. 최적화된 성능을 위해 Databricks는 지정된 키에 대해 반복기의 모든 값을 처리하고 가능한 한 단일 쓰기로 업데이트를 커밋하는 것이 좋습니다.

비고

상태 업데이트는 내결함성을 가지고 있습니다. 마이크로 일괄 처리가 처리를 완료하기 전에 태스크가 충돌하는 경우 마지막으로 성공한 마이크로 일괄 처리의 값이 다시 시도에 사용됩니다.

상태 값에는 기본 제공 기본값이 없습니다. 논리에 기존 상태 정보를 읽어야 하는 경우 논리를 구현하는 동안 exists 메서드를 사용합니다.

비고

MapState 변수에는 개별 키를 확인하거나 null 상태에 대한 논리를 구현하기 위한 모든 키를 나열하는 추가 기능이 있습니다.

레코드 내보내기

사용자 정의 논리는 transformWithState 레코드를 내보내는 방법을 제어합니다. 레코드는 그룹화 키별로 내보내집니다.

사용자 지정 상태 저장 애플리케이션은 레코드를 내보내는 방법을 결정할 때 상태 정보가 사용되는 방식을 가정하지 않으며 지정된 조건에 대해 반환된 레코드 수는 없음, 하나 또는 여러 개일 수 있습니다.

handleInputRows 또는 handleExpiredTimer사용하여 레코드를 내보내는 논리를 구현합니다. 입력 행을(를) 처리하고 프로그램 예정 이벤트을(를) 참조하세요.

비고

여러 상태 값을 구현하고 레코드를 내보내기 위한 여러 조건을 정의할 수 있지만 내보낸 모든 레코드는 동일한 스키마를 사용해야 합니다.

파이썬

Python에서는 outputStructType호출하는 동안 transformWithStateInPandas 키워드를 사용하여 출력 스키마를 정의합니다.

pandas DataFrame 객체를 사용하여 레코드를 내보내고 yield을(를) 사용합니다.

필요에 따라 빈 DataFrame을 yield 수 있습니다. update 출력 모드와 결합하면 빈 DataFrame을 내보내면 그룹화 키의 값이 null로 업데이트됩니다.

스칼라

Scala에서는 Iterator 개체를 사용하여 레코드를 내보낸다. 출력의 스키마는 내보낸 레코드에서 파생됩니다.

필요에 따라 빈 Iterator내보낼 수 있습니다. update 출력 모드와 결합하면 빈 Iterator 내보내면 그룹화 키의 값이 null로 업데이트됩니다.

입력 행 처리

handleInputRows 메서드를 사용하여 스트리밍 쿼리에서 관찰된 레코드가 상태 값과 상호 작용하고 업데이트하는 방법에 대한 논리를 정의합니다. handleInputRows 메서드를 사용하여 정의하는 처리기는 구조적 스트리밍 쿼리를 통해 모든 레코드가 처리될 때마다 실행됩니다.

transformWithState사용하여 구현된 대부분의 상태 저장 애플리케이션의 경우 핵심 논리는 handleInputRows사용하여 정의됩니다.

처리된 각 마이크로 일괄 업데이트에 대해 지정된 그룹화 키에 대한 마이크로 일괄 처리의 모든 레코드는 반복기를 사용하여 사용할 수 있습니다. 사용자 정의 논리는 현재 마이크로배치의 모든 레코드 및 상태 저장소의 값과 상호 작용할 수 있습니다.

프로그램 시간 제한 이벤트

타이머를 사용하여 지정된 조건의 경과된 시간에 따라 사용자 지정 논리를 구현할 수 있습니다.

handleExpiredTimer 메서드를 구현하여 타이머를 사용합니다.

그룹화 키 내에서 타이머는 타임스탬프로 고유하게 식별됩니다.

타이머가 만료되면 결과는 애플리케이션에 구현된 논리에 의해 결정됩니다. 일반적인 패턴은 다음과 같습니다.

  • 상태 변수에 저장된 정보 내보내기
  • 저장된 상태 정보를 제거합니다.
  • 새 타이머 만들기

연결된 키에 대한 레코드가 마이크로 배치에서 처리되지 않더라도 만료된 타이머는 작동합니다.

시간 모델 지정

StatefulProcessortransformWithState에 전달할 때, 반드시 시간 모델을 지정해야 합니다. 지원되는 옵션은 다음과 같습니다.

  • ProcessingTime
  • EventTime
  • NoTime 또는 TimeMode.None()

NoTime 지정하면 프로세서에 타이머가 지원되지 않습니다.

기본 제공 타이머 값

Databricks는 사용자 지정 상태 저장 애플리케이션에서 시스템 클록을 호출하지 않는 것이 좋습니다. 이로 인해 작업 실패 시 신뢰할 수 없는 재시도가 발생할 수 있습니다. 처리 시간 또는 워터마크에 액세스해야 하는 경우 TimerValues 클래스의 메서드를 사용합니다.

TimerValues 설명
getCurrentProcessingTimeInMs epoch 이후 현재 일괄 처리에 대한 처리 시간의 타임스탬프를 밀리초 단위로 반환합니다.
getCurrentWatermarkInMs epoch 이후 현재 배치의 워터마크 타임스탬프를 밀리초 단위로 반환합니다.

비고

처리 시간은 Apache Spark에서 마이크로 일괄 처리가 처리되는 시간을 설명합니다. Kafka와 같은 많은 스트리밍 원본에는 시스템 처리 시간도 포함됩니다.

스트리밍 쿼리의 워터마크는 종종 스트리밍 원본의 이벤트 시간 또는 처리 시간에 대해 정의됩니다. 데이터 처리 임계값을 제어하기 위해 워터마크 적용하기를 참조하세요.

워터마크와 창은 모두 transformWithState와 함께 사용할 수 있습니다. TTL, 타이머 및 MapState 또는 ListState 기능을 활용하여 사용자 지정 상태 저장 애플리케이션에서 유사한 기능을 구현할 수 있습니다.

상태 TTL(Time To Live)은 무엇인가요?

transformWithState에 의해 사용되는 상태 값은 각각 선택적인 TTL(Time to Live) 설정을 지원합니다. TTL이 만료되면 값이 상태 저장소에서 제거됩니다. TTL은 상태 저장소의 값과만 상호 작용합니다. 즉, 상태 정보를 제거하는 논리를 구현할 수 있지만 TTL이 상태 값을 제거하므로 논리를 직접 트리거할 수는 없습니다.

중요합니다

TTL을 구현하지 않는 경우 무한 상태 증가를 방지하기 위해 다른 논리를 사용하여 상태 제거를 처리해야 합니다.

TTL은 각 상태 값에 적용되며 각 상태 유형에 대해 서로 다른 규칙이 적용됩니다.

  • 상태 변수는 그룹화 키로 범위가 지정됩니다.
  • ValueState 개체의 경우 그룹화 키당 단일 값만 저장됩니다. TTL은 이 값에 적용됩니다.
  • ListState 개체의 경우 목록에 많은 값이 포함될 수 있습니다. TTL은 목록의 각 값에 독립적으로 적용됩니다.
  • MapState 개체의 경우 각 맵 키에는 연결된 상태 값이 있습니다. TTL은 맵의 각 키-값 쌍에 독립적으로 적용됩니다.

모든 상태 유형에 대해 상태 정보가 업데이트되면 TTL이 다시 설정됩니다.

비고

TTL은 ListState개별 값으로 범위가 지정되지만 목록에서 값을 업데이트하는 유일한 방법은 put 메서드를 사용하여 ListState 변수의 전체 내용을 덮어쓰는 것입니다.

타이머와 TTL의 차이점은 무엇인가요?

상태 변수에 대한 타이머와 TTL(Time to Live) 사이에는 약간의 겹침이 있지만 타이머는 TTL보다 더 광범위한 기능 집합을 제공합니다.

TTL은 사용자가 지정한 기간 동안 업데이트되지 않은 상태 정보를 제거합니다. 이렇게 하면 사용자가 확인되지 않은 상태 증가를 방지하고 부실 상태 항목을 제거할 수 있습니다. 맵과 목록은 각 값에 대해 TTL을 구현하므로 TTL을 설정하여 최근에 업데이트된 상태 값만 고려하는 함수를 구현할 수 있습니다.

타이머를 사용하면 레코드 내보내기를 포함하여 상태 제거 이외의 사용자 지정 논리를 정의할 수 있습니다. 필요에 따라 타이머를 사용하여 지정된 상태 값에 대한 상태 정보를 지우고, 타이머를 기반으로 값을 내보내거나 다른 조건부 논리를 트리거할 수 있습니다.