스트리밍 쿼리를 시작하기 전에 SparkSession에서 다음 구성을 설정하여 RocksDB 기반 상태 관리를 사용하도록 설정할 수 있습니다.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Lakeflow Spark 선언적 파이프라인에서 RocksDB를 사용하도록 설정할 수 있습니다. 상태 저장 처리의 파이프라인 구성 최적화 ()를 참조하세요.
변경 로그 검사점 사용
Databricks Runtime 13.3 LTS 이상에서 구조적 스트리밍 워크로드에 대한 검사점 기간 및 엔드 투 엔드 대기 시간을 낮추도록 변경 로그 검사점을 사용하도록 설정할 수 있습니다. Databricks는 모든 구조적 스트리밍 상태 저장 쿼리에 대해 changelog 검사포인트를 사용하도록 설정할 것을 권합니다.
일반적으로 RocksDB 상태 저장소는 검사점이 있는 동안 데이터 파일을 스냅샷하고 업로드합니다. 이 비용을 방지하기 위해 변경 로그 검사점은 마지막 검사점 이후 변경된 레코드만 지속성 스토리지에 기록합니다."
변경 로그 검사점은 기본적으로 사용하지 않도록 설정됩니다. 다음 구문을 사용하여 SparkSession 수준에서 변경 로그 검사점을 사용하도록 설정할 수 있습니다.
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
기존 스트림에서 변경 로그 검사점을 사용하도록 설정하고 검사점에 저장된 상태 정보를 유지할 수 있습니다.
중요함
변경 로그 검사점을 사용하도록 설정한 쿼리는 Databricks Runtime 13.3 LTS 이상에서만 실행할 수 있습니다. 변경 로그 검사점을 사용하지 않도록 설정하여 레거시 검사점 동작으로 되돌릴 수 있지만 Databricks Runtime 13.3 LTS 이상에서 이러한 쿼리를 계속 실행해야 합니다. 이러한 변경이 수행되려면 작업을 다시 시작해야 합니다.
RocksDB 상태 저장소 메트릭
각 상태 연산자는 해당 RocksDB 인스턴스에서 수행된 상태 관리 작업과 관련된 메트릭을 수집하여 상태 저장소를 관찰하고 작업 속도 저하 문제의 디버그를 지원할 수 있습니다.
Databricks Runtime 16.4 LTS 이상에서 특정 상태 저장소 인스턴스에 대한 메트릭은 파티션 ID 및 저장소 이름으로 레이블이 지정되어 별도로 유지됩니다. 다른 모든 메트릭은 상태 연산자가 실행 중인 모든 작업에서 각 상태 연산자의 집계 합계로 보고됩니다.
이러한 메트릭은 customMetrics의 stateOperators 필드 내 StreamingQueryProgress 맵의 일부입니다. 다음은 JSON 형식의 StreamingQueryProgress 예제입니다(StreamingQueryProgress.json()을 사용하여 가져옴).
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
메트릭에 대한 자세한 설명은 다음과 같습니다.
| 지표 이름 | 설명 |
|---|---|
| rocksdbCommitWriteBatchLatency (록스디비 커밋 쓰기 배치 대기 시간) | 메모리 내 데이터 구조(WriteBatch)에 단계적 쓰기를 네이티브 RocksDB에 적용하는 데 걸린 시간(밀리초)입니다. |
| rocksdb 커밋 플러시 지연 시간 (latency) | RocksDB 메모리 내 변경 내용을 로컬 디스크로 플러시하는 데 걸린 시간(밀리초)입니다. |
| 락스디비커밋콤팩트지연시간 | 검사점 커밋 중 압축(선택 사항)에 걸린 시간(밀리초)입니다. |
| rocksdbCommitPauseLatency (커밋 일시 중지 지연 시간) | 검사점 커밋의 일부로 압축 등을 위해 백그라운드 작업자 스레드를 중지하는 데 걸린 시간(밀리초)입니다. |
| rocksdb 커밋 검사 지연 시간 | 네이티브 RocksDB 스냅샷을 만들고 로컬 디렉터리에 쓰는 데 걸린 시간(밀리초)입니다. |
| rocksdb 커밋 파일 동기화 지연 시간(ms) | 네이티브 RocksDB 스냅샷 관련 파일을 외부 스토리지(검사점 위치)에 동기화하는 데 걸린 시간(밀리초)입니다. |
| rocksdbGetLatency 호출 시간 지연 | 기본 네이티브 RocksDB::Get 호출당 평균 소요 시간(나노초)입니다. |
| rocksdbPutCount (록스디비 푸트 카운트) | 기본 네이티브 RocksDB::Put 호출당 평균 소요 시간(나노초)입니다. |
| rocksdb에서 카운트를 가져오기 | 네이티브 RocksDB::Get 호출 수(WriteBatch에서 포함되지 Gets 않음 - 준비 쓰기에 사용되는 메모리 일괄 처리). |
| rocksdbPutCount (록스디비 푸트 카운트) | 네이티브 RocksDB::Put 호출 수(WriteBatch에는 포함되지 않음 - 스테이징 쓰기에 사용되는 메모리 내 일괄 처리). |
| rocksdb총바이트읽기Get에의한 | 네이티브 RocksDB::Get 호출을 통해 읽은, 압축되지 않은 바이트 수입니다. |
| rocksdb의 Put에 의해 기록된 총 바이트 수 | 네이티브 RocksDB::Put 호출을 통해 쓴, 압축되지 않은 바이트 수입니다. |
| rocksdbReadBlockCacheHitCount | 로컬 디스크에서 데이터를 읽지 않도록 네이티브 RocksDB 블록 캐시가 사용된 횟수입니다. |
| rocksdb읽기블록캐시미스카운트 | 네이티브 RocksDB 블록 캐시가 누락되어 로컬 디스크에서 데이터를 읽어야 했던 횟수입니다. |
| 컴패션에 의해 읽은 RocksDB 총 바이트 수 | 네이티브 RocksDB 압축 프로세스가 로컬 디스크에서 읽은 바이트 수입니다. |
| rocksdb_압축에_의해_기록된_총_바이트수 | 네이티브 RocksDB 압축 프로세스가 로컬 디스크에 쓴 바이트 수입니다. |
| rocksdbTotalCompactionLatencyMs | RocksDB 압축(커밋 중에 시작된 선택적 압축과 백그라운드 압축 포함)에 걸린 시간(밀리초)입니다. |
| rocksdbWriterStallLatencyMs (레이터 정지 지연 시간 밀리초) | 백그라운드 압축 또는 memtable을 디스크로 플러시하는 작업으로 인해 기록기가 중단된 시간(밀리초)입니다. |
| rocksdbTotalBytesReadThroughIterator (록스디비 총 바이트 읽기 수를 이터레이터를 통해) | 일부 상태가 있는 작업(예: flatMapGroupsWithState에서의 시간 제한 처리 또는 윈도우 기반 집계에서의 워터마크 처리)은 반복기를 통해 DB의 전체 데이터를 읽어야 합니다. 반복기를 사용하여 읽은, 압축되지 않은 데이터의 총 크기입니다. |