다음을 통해 공유


구조적 스트리밍 상태 정보 읽기

DataFrame 작업 또는 SQL 테이블 값 함수를 사용하여 구조적 스트리밍 상태 데이터 및 메타데이터를 쿼리할 수 있습니다. 이러한 함수를 사용하여 구조적 스트리밍 상태 저장 쿼리에 대한 상태 정보를 관찰합니다. 이는 모니터링 및 디버깅에 유용할 수 있습니다.

상태 데이터 또는 메타데이터를 쿼리하려면 스트리밍 쿼리의 검사점 경로에 대한 읽기 권한이 있어야 합니다. 이 문서에 설명된 함수는 상태 데이터 및 메타데이터에 대한 읽기 전용 액세스를 제공합니다. 배치 읽기 의미 체계만 사용하여 상태 정보를 쿼리할 수 있습니다.

참고

Lakeflow Spark 선언적 파이프라인, 스트리밍 테이블 또는 구체화된 뷰에 대한 상태 정보를 쿼리할 수 없습니다. 표준 액세스 모드로 구성된 서버리스 컴퓨팅 또는 컴퓨팅을 사용하여 상태 정보를 쿼리할 수 없습니다.

요구 사항

  • 다음 컴퓨팅 구성 중 하나를 사용합니다.
    • 표준 액세스 모드로 구성된 컴퓨팅에서 Databricks Runtime 16.3 이상 사용.
    • Databricks Runtime 14.3 LTS 이상이 전용 또는 비격리 액세스 모드로 구성된 컴퓨팅에서.
  • 스트리밍 쿼리에서 사용하는 검사점 경로에 대한 읽기 권한입니다.

구조적 스트리밍 상태 저장소 읽기

지원되는 모든 Databricks Runtime에서 실행되는 구조적 스트리밍 쿼리에 대한 상태 저장소 정보를 읽을 수 있습니다. 다음 구문을 사용합니다.

파이썬

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

상태 판독기 API 매개 변수

상태 판독기 API는 다음과 같은 선택적 구성을 지원합니다.

옵션 타입 기본값 설명
batchId 최신 배치 ID 읽을 대상 배치를 나타냅니다. 쿼리의 이전 상태에 대한 상태 정보를 쿼리하려면 이 옵션을 지정합니다. 배치는 커밋되어야 하지만 아직 정리되지 않았습니다.
operatorId 0 읽을 대상 연산자를 나타냅니다. 이 옵션은 쿼리에서 여러 상태 저장 연산자를 사용하는 경우에 사용됩니다.
storeName 문자열 기본값 읽을 대상 상태 저장소 이름을 나타냅니다. 이 옵션은 상태 저장 연산자가 여러 상태 저장소 인스턴스를 사용하는 경우에 사용됩니다. storeName 또는 joinSide를 스트림 스팀 조인에 대해 지정해야 하지만 둘 다 지정하지는 않습니다.
joinSide 문자열('왼쪽' 또는 '오른쪽') 읽을 대상 쪽을 나타냅니다. 이 옵션은 사용자가 스트림 스트림 조인에서 상태를 읽으려는 경우에 사용됩니다.
stateVarName 문자열 없음 이 쿼리의 일부로 읽을 상태 변수 이름입니다. 상태 변수 이름은 initStatefulProcessor 함수 내에 있는 각 변수에 transformWithState 연산자가 사용하는 고유한 이름입니다. 연산자를 사용하는 경우 transformWithState 이 옵션은 필수 옵션입니다. 이 옵션은 연산자에 transformWithState 만 적용되며 다른 연산자에 대해서는 무시됩니다. Databricks Runtime 16.2 이상에서 사용할 수 있습니다.
readRegisteredTimers 불리언 (Boolean) 거짓 true 연산자 내에서 사용되는 등록된 타이머를 읽을 수 있도록 transformWithState로 설정합니다. 이 옵션은 연산자에 transformWithState 만 적용되며 다른 연산자에 대해서는 무시됩니다. Databricks Runtime 16.2 이상에서 사용할 수 있습니다.
flattenCollectionTypes 불리언 (Boolean) 맞다 이면 true맵 및 목록 상태 변수에 대해 반환된 레코드를 평면화합니다. 이면 falseSpark SQL Array 또는 Map.를 사용하여 레코드가 반환됩니다. 이 옵션은 연산자에 transformWithState 만 적용되며 다른 연산자에 대해서는 무시됩니다. Databricks Runtime 16.2 이상에서 사용할 수 있습니다.

반환된 데이터는 다음 스키마를 보유합니다.

타입 설명
key 구조체(상태 키에서 파생된 추가 형식) 상태 검사점에서 상태 저장 연산자 레코드의 키입니다.
value 구조체(상태 값에서 파생된 추가 형식) 상태 검사점에서 상태 저장 연산자 레코드의 값입니다.
partition_id 정수 상태 저장 연산자 레코드를 포함하는 상태 검사점의 파티션입니다.

테이블 값 함수 read_statestore을 참조하세요.

구조적 스트리밍 상태 메타데이터 읽기

중요한

상태 메타데이터를 기록하려면 Databricks Runtime 14.2 이상에서 스트리밍 쿼리를 실행해야 합니다. 상태 메타데이터 파일은 이전 버전과의 호환성을 손상시키지 않습니다. Databricks Runtime 14.1 이하에서 스트리밍 쿼리를 실행하도록 선택하면 기존 상태 메타데이터 파일이 무시되고 새 상태 메타데이터 파일이 기록되지 않습니다.

Databricks Runtime 14.2 이상에서 실행되는 구조적 스트리밍 쿼리에 대한 상태 메타데이터 정보를 읽을 수 있습니다. 다음 구문을 사용합니다.

파이썬

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

반환된 데이터는 다음 스키마를 보유합니다.

타입 설명
operatorId 정수 스트리밍 상태 저장 연산자의 ID는 정수입니다.
operatorName 정수 상태 저장 스트리밍 연산자의 이름입니다.
stateStoreName 문자열 연산자 상태 저장소의 이름입니다.
numPartitions 정수 상태 저장소의 파티션 수입니다.
minBatchId 쿼리 상태에 사용할 수 있는 최소 배치 ID입니다.
maxBatchId 쿼리 상태에 사용할 수 있는 최대 배치 ID입니다.

참고

검사점이 작성될 당시의 상태를 minBatchIdmaxBatchId가 제공하는 배치 ID 값들이 반영합니다. 이전 배치는 마이크로 일괄 처리 실행으로 자동으로 정리되므로 여기에 제공된 값은 계속 사용할 수 있다고 보장되지 않습니다.

테이블 값 함수 read_state_metadata을 참조하세요.

예: 스트림-스트림 조인의 한쪽 쿼리

다음 구문을 사용하여 스트림 스트림 조인의 왼쪽을 쿼리합니다.

파이썬

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

예: 여러 상태 저장 연산자가 있는 스트림에 대한 쿼리 상태 저장소

이 예제에서는 상태 메타데이터 판독기를 사용하여 여러 상태 저장 연산자를 사용하여 스트리밍 쿼리의 메타데이터 세부 정보를 수집한 다음, 상태 판독기의 옵션으로 메타데이터 결과를 사용합니다.

상태 메타데이터 판독기는 다음 구문 예제와 같이 검사점 경로를 유일한 옵션으로 사용합니다.

파이썬

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

다음 표는 상태 저장소 메타데이터의 예제 출력을 나타냅니다.

operatorId 운영자이름 상태저장소이름 numPartitions minBatchId maxBatchId
0 상태 저장소 저장 기본값 200 0 13
1 워터마크 내 중복 제거 기본값 200 0 13

연산자에 dedupeWithinWatermark 대한 결과를 얻으려면 다음 예제와 같이 옵션을 사용하여 operatorId 상태 판독기를 쿼리합니다.

파이썬

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);