Azure Databricks는 스트리밍 탭의 Spark UI를 통해 구조적 스트리밍 애플리케이션에 대한 기본 제공 모니터링을 제공합니다.
Spark UI에서 구조적 스트리밍 쿼리 구분
.queryName(<query-name>) 코드에 writeStream을 추가하여 Spark UI의 스트림에 속하는 메트릭을 쉽게 구분하여 스트림에 고유한 쿼리 이름을 제공합니다.
외부 서비스에 구조적 스트리밍 메트릭 푸시
Apache Spark의 스트리밍 쿼리 수신기 인터페이스를 사용하여 경고 또는 대시보드 사용 사례를 위해 스트리밍 메트릭을 외부 서비스로 푸시할 수 있습니다. Databricks Runtime 11.3 LTS 이상에서는 Python 및 Scala에서 StreamingQueryListener 사용할 수 있습니다.
Important
Unity 카탈로그 사용 컴퓨팅 액세스 모드를 사용하는 워크로드에는 다음과 같은 제한 사항이 적용됩니다.
-
StreamingQueryListener자격 증명을 사용하거나 전용 액세스 모드로 컴퓨팅 시 Unity 카탈로그에서 관리하는 개체와 상호 작용하려면 Databricks Runtime 15.1 이상이 필요합니다. -
StreamingQueryListener표준 액세스 모드(이전의 공유 액세스 모드)로 구성된 Scala 워크로드의 경우 Databricks Runtime 16.1 이상이 필요합니다.
Note
수신기를 사용하여 대기 시간을 처리하면 쿼리 처리 속도에 큰 영향을 줄 수 있습니다. 이러한 수신기에서 처리 논리를 제한하고 효율성을 위해 Kafka와 같은 빠른 응답 시스템에 쓰기를 선택하는 것이 좋습니다.
쿼리에 원본에서 사용할 수 있는 데이터가 없고 새 데이터를 onQueryIdle 기다리는 경우 메시지가 스트리밍 쿼리 수신기에 전달됩니다.
onQueryProgress 메시지는 스트리밍 쿼리 일괄 처리의 끝에만 전달됩니다. 쿼리가 오랜 시간 동안 데이터를 처리 중인 경우, onQueryIdle 이벤트와 onQueryProgress 이벤트 모두 전송되지 않는 경우가 있으나, 쿼리는 여전히 정상적이며 데이터 처리를 계속합니다.
다음 코드는 수신기를 구현하기 위한 구문의 기본 예제를 제공합니다.
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
구조적 스트리밍에서 관찰 가능한 메트릭 정의
관찰 가능한 메트릭은 쿼리(DataFrame)에서 정의할 수 있는 임의의 집계 함수로 명명됩니다. DataFrame의 실행이 완료 지점에 도달하는 즉시(즉, 일괄 처리 쿼리를 완료하거나 스트리밍 Epoch에 도달) 마지막 완료 지점 이후 처리된 데이터에 대한 메트릭을 포함하는 명명된 이벤트가 내보내집니다.
수신기를 Spark 세션에 연결하여 이러한 메트릭을 관찰할 수 있습니다. 수신기는 실행 모드에 따라 달라집니다.
일괄 처리 모드:
QueryExecutionListener을(를) 사용합니다.QueryExecutionListener는 쿼리가 완료되면 호출됩니다.QueryExecution.observedMetrics맵을 사용하여 메트릭에 액세스합니다.스트리밍 또는 마이크로배치: .를 사용합니다
StreamingQueryListener.StreamingQueryListener는 스트리밍 쿼리가 Epoch를 완료할 때 호출됩니다.StreamingQueryProgress.observedMetrics맵을 사용하여 메트릭에 액세스합니다. Azure Databricks는 스트리밍에 대한continuous트리거 모드를 지원하지 않습니다.
다음은 그 예입니다.
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Unity 카탈로그, Delta Lake 및 구조적 스트리밍 메트릭 테이블 식별자 매핑
구조적 스트리밍 메트릭은 스트리밍 쿼리의 reservoirId 원본으로 사용되는 델타 테이블의 고유 ID에 대해 여러 위치에서 필드를 사용합니다.
reservoirId 필드는 델타 트랜잭션 로그의 Delta 테이블에 저장된 고유 식별자를 매핑합니다. 이 ID는 Unity 카탈로그에서 할당하고 카탈로그 탐색기에 표시되는 값에 매핑tableId.
다음 구문을 사용하여 델타 테이블에 대한 테이블 식별자를 검토합니다. Unity 카탈로그 관리 테이블, Unity 카탈로그 외부 테이블 및 모든 Hive 메타스토어 델타 테이블에 대해 작동합니다.
DESCRIBE DETAIL <table-name>
결과에 표시되는 id 필드는 스트리밍 메트릭의 reservoirId에 매핑되는 식별자입니다.
StreamingQueryListener 개체 메트릭
| Fields | Description |
|---|---|
id |
다시 시작할 때 지속되는 고유한 쿼리 ID입니다. |
runId |
시작/다시 시작할 때마다 고유한 쿼리 ID입니다. StreamingQuery.runId()를 참조하세요. |
name |
사용자가 지정한 쿼리 이름입니다. 이름이 지정되지 않은 경우 이름은 null입니다. |
timestamp |
마이크로배치 실행을 위한 타임스탬프입니다. |
batchId |
처리 중인 데이터의 현재 일괄 처리에 대한 고유 ID입니다. 실패 후 재시도하는 경우 지정된 일괄 처리 ID가 두 번 이상 실행될 수 있습니다. 마찬가지로 처리할 데이터가 없으면 일괄 처리 ID가 증가하지 않습니다. |
batchDuration |
일괄 처리 작업의 처리 기간(밀리초)입니다. |
numInputRows |
트리거에서 처리된 레코드의 집계(모든 원본에서) 수입니다. |
inputRowsPerSecond |
모든 원본에서 도착하는 데이터의 전체 속도입니다. |
processedRowsPerSecond |
Spark가 데이터를 처리하는 집계(모든 원본에서) 속도입니다. |
StreamingQueryListener 또한 고객 메트릭 및 원본 진행률 세부 정보를 검사할 수 있는 개체가 포함된 다음 필드를 정의합니다.
| Fields | Description |
|---|---|
durationMs |
유형: ju.Map[String, JLong].
durationMs 개체를 참조하세요. |
eventTime |
유형: ju.Map[String, String].
eventTime 개체를 참조하세요. |
stateOperators |
유형: Array[StateOperatorProgress].
stateOperators 개체를 참조하세요. |
sources |
유형: Array[SourceProgress].
소스 개체를 참조하세요. |
sink |
유형: SinkProgress.
싱크 개체를 참조하세요. |
observedMetrics |
유형: ju.Map[String, Row]. DataFrame/쿼리(예: df.observe)에 정의할 수 있는 명명된 임의 집계 함수입니다. |
durationMs 오브젝트
개체 유형: ju.Map[String, JLong]
마이크로배치 실행 프로세스의 다양한 단계를 완료하는 데 걸리는 시간에 대한 정보입니다.
| Fields | Description |
|---|---|
durationMs.addBatch |
마이크로배치를 실행하는 데 걸린 시간입니다. 그러면 Spark가 마이크로배치를 계획하는 데 걸리는 시간이 제외됩니다. |
durationMs.getBatch |
원본에서 오프셋에 대한 메타데이터를 검색하는 데 걸리는 시간입니다. |
durationMs.latestOffset |
마이크로배치에서 소비된 최신 오프셋입니다. 이 진행률 개체는 원본에서 최신 오프셋을 검색하는 데 걸린 시간을 나타냅니다. |
durationMs.queryPlanning |
실행 계획을 생성하는 데 걸린 시간입니다. |
durationMs.triggerExecution |
마이크로배치를 계획하고 실행하는 데 걸리는 시간입니다. |
durationMs.walCommit |
사용 가능한 새 오프셋을 커밋하는 데 걸린 시간입니다. |
durationMs.commitBatch |
addBatch 동안 싱크에 기록된 데이터를 커밋하는 데 걸린 시간입니다. 커밋을 지원하는 싱크에만 존재합니다. |
durationMs.commitOffsets |
일괄 처리를 커밋 로그에 커밋하는 데 걸린 시간입니다. |
eventTime 개체
개체 유형: ju.Map[String, String]
마이크로배치에서 처리되는 데이터 내에서 표시되는 이벤트 시간 값에 대한 정보입니다. 이 데이터는 워터마크가 구조적 스트리밍 작업에서 정의된 상태 저장 집계를 처리하기 위해 상태를 트림하는 방법을 결정하는 데 사용됩니다.
| Fields | Description |
|---|---|
eventTime.avg |
해당 트리거에 표시되는 평균 이벤트 시간입니다. |
eventTime.max |
해당 트리거에 표시되는 최대 이벤트 시간입니다. |
eventTime.min |
해당 트리거에 표시되는 최소 이벤트 시간입니다. |
eventTime.watermark |
해당 트리거에 사용되는 워터마크의 값입니다. |
상태연산자 개체
개체 형식: Array[StateOperatorProgress] 개체에는 stateOperators 구조적 스트리밍 작업에 정의된 상태 저장 작업 및 해당 작업에서 생성된 집계에 대한 정보가 포함됩니다.
스트림 상태 연산자에 대한 자세한 내용은 상태 저장 스트리밍이란?을 참조하세요.
| Fields | Description |
|---|---|
stateOperators.operatorName |
메트릭이 관련되는 상태 저장 연산자의 이름(예: symmetricHashJoin, dedupe또는 stateStoreSave. |
stateOperators.numRowsTotal |
상태 저장 연산자 또는 집계의 결과로 발생한 상태의 총 행 수입니다. |
stateOperators.numRowsUpdated |
상태 저장 연산자 또는 집계의 결과로 상태에서 업데이트된 총 행 수입니다. |
stateOperators.allUpdatesTimeMs |
이 메트릭은 현재 Spark에서 측정할 수 없으며 향후 업데이트에서 제거될 예정입니다. |
stateOperators.numRowsRemoved |
상태 저장 연산자 또는 집계의 결과로 상태에서 제거된 총 행 수입니다. |
stateOperators.allRemovalsTimeMs |
이 메트릭은 현재 Spark에서 측정할 수 없으며 향후 업데이트에서 제거될 예정입니다. |
stateOperators.commitTimeMs |
모든 업데이트를 커밋하고(배치 및 제거) 새 버전을 반환하는 데 걸린 시간입니다. |
stateOperators.memoryUsedBytes |
상태 저장소에서 사용하는 메모리입니다. |
stateOperators.numRowsDroppedByWatermark |
상태 저장 집계에 포함하기에는 너무 늦었다고 간주되는 행의 수입니다. 스트리밍 집계만 해당: 집계 후 삭제된 행의 수입니다(원시 입력 행이 아님). 이 숫자는 정확하지는 않지만 지연된 데이터가 손실되고 있다는 것을 보여줍니다. |
stateOperators.numShufflePartitions |
이 상태 저장 연산자의 셔플 파티션 수입니다. |
stateOperators.numStateStoreInstances |
연산자가 초기화하고 유지 관리한 실제 상태 저장소 인스턴스입니다. 많은 상태 저장 연산자의 경우 파티션 수와 동일합니다. 그러나 스트림 스트림 조인은 파티션당 4개의 상태 저장소 인스턴스를 초기화합니다. |
stateOperators.customMetrics |
자세한 내용은 이 항목의 stateOperators.customMetrics 를 참조하세요. |
StateOperatorProgress.customMetrics 개체
개체 유형: ju.Map[String, JLong]
StateOperatorProgress 에는 해당 메트릭을 수집할 때 사용하는 기능과 관련된 메트릭이 포함된 필드 customMetrics가 있습니다.
| Feature | Description |
|---|---|
| RocksDB 상태 저장소 | RocksDB 상태 저장소에 대한 메트릭입니다. |
| HDFS 상태 저장소 | HDFS 상태 저장소에 대한 메트릭입니다. |
| 스트림 중복 제거 | 행 중복 제거에 대한 메트릭입니다. |
| 스트림 데이터 집계 | 행 집계에 대한 메트릭입니다. |
| 스트림 조인 연산자 | 스트림 조인 연산자에 대한 메트릭입니다. |
transformWithState |
연산자에 대한 transformWithState 메트릭입니다. |
RocksDB 상태 저장소 사용자 지정 메트릭
Structured Streaming 작업의 상태 저장 값을 유지 관리하는 RocksDB의 성능 및 작업과 관련된 메트릭을 캡처하는 정보입니다. 자세한 내용은 Azure Databricks에서 RocksDB 상태 저장소 구성을 참조하세요.
| Fields | Description |
|---|---|
customMetrics.rocksdbBytesCopied |
RocksDB 파일 관리자가 추적한 대로 복사한 바이트 수입니다. |
customMetrics.rocksdbCommitCheckpointLatency |
네이티브 RocksDB의 스냅샷을 만들고 그것을 로컬 디렉터리에 쓰는 데 걸리는 시간(밀리초)입니다. |
customMetrics.rocksdbCompactLatency |
검사점 커밋 중 압축(선택 사항)의 시간(밀리초)입니다. |
customMetrics.rocksdbCommitCompactLatency |
커밋 중 압축 시간(밀리초)입니다. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
네이티브 RocksDB 스냅샷을 외부 스토리지(검사점 위치)에 동기화하는 시간(밀리초)입니다. |
customMetrics.rocksdbCommitFlushLatency |
RocksDB 메모리 내 변경 내용을 로컬 디스크로 플러시하는 시간(밀리초)입니다. |
customMetrics.rocksdbCommitPauseLatency |
압축과 같이 검사점 커밋의 일부로 백그라운드 작업자 스레드를 중지하는 시간(밀리초)입니다. |
customMetrics.rocksdbCommitWriteBatchLatency |
네이티브 RocksDB에 메모리 내 구조(WriteBatch)에서 스테이징된 쓰기를 적용하는 데 걸리는 시간은 밀리초 단위입니다. |
customMetrics.rocksdbFilesCopied |
RocksDB 파일 관리자가 추적한 대로 복사한 파일 수입니다. |
customMetrics.rocksdbFilesReused |
RocksDB 파일 관리자가 추적한 대로 다시 사용하는 파일 수입니다. |
customMetrics.rocksdbGetCount |
호출 수 get (쓰기 준비에 사용되는 메모리 내 일괄 처리에서 비롯된 gets를 WriteBatch 포함하지 않음). |
customMetrics.rocksdbGetLatency |
기본 네이티브 RocksDB::Get 호출의 평균 시간(나노초)입니다. |
customMetrics.rocksdbReadBlockCacheHitCount |
RocksDB의 블록 캐시에서 캐시 적중 횟수입니다. |
customMetrics.rocksdbReadBlockCacheMissCount |
RocksDB에서 블록 캐시 누락의 수입니다. |
customMetrics.rocksdbSstFileSize |
RocksDB 인스턴스에 있는 모든 SST(정적 정렬 테이블) 파일의 크기입니다. |
customMetrics.rocksdbTotalBytesRead |
get 작업에 의해 읽혀진 비압축 바이트 수입니다. |
customMetrics.rocksdbTotalBytesWritten |
put 연산에 의해 기록된 압축되지 않은 총 바이트 수입니다. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
반복기를 사용하여 읽은 압축되지 않은 데이터의 총 바이트 수입니다. 일부 상태 저장 작업(예: 시간 제한 처리 FlatMapGroupsWithState 및 워터마크)에는 반복기를 통해 Azure Databricks로 데이터를 읽어야 합니다. |
customMetrics.rocksdbTotalBytesReadByCompaction |
압축 프로세스가 디스크에서 읽는 바이트 수입니다. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
압축 프로세스가 디스크에 쓰는 총 바이트 수입니다. |
customMetrics.rocksdbTotalCompactionLatencyMs |
백그라운드 압축 및 커밋 중에 시작된 선택적 압축을 포함하여 RocksDB 압축의 시간(밀리초)입니다. |
customMetrics.rocksdbTotalFlushLatencyMs |
백그라운드 플러시를 포함한 총 플러시 시간입니다. 플러시 작업은 MemTable가 가득 차면 스토리지 장치로 데이터가 플러시되는 과정입니다.
MemTables 데이터가 RocksDB에 저장되는 첫 번째 수준입니다. |
customMetrics.rocksdbZipFileBytesUncompressed |
파일 관리자가 보고한 압축되지 않은 zip 파일의 크기(바이트)입니다. 파일 관리자는 물리적 SST 파일 디스크 공간 사용률 및 삭제를 관리합니다. |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
검사점 위치에 저장된 RocksDB 스냅샷의 최신 버전입니다. 값 "-1"은 스냅샷이 저장되지 않음을 나타냅니다. 스냅샷은 각 상태 저장소 인스턴스에 특정하므로 이 메트릭은 특정 파티션 ID 및 상태 저장소 이름에 적용됩니다. |
customMetrics.rocksdbPutLatency |
총 호출 대기 시간입니다. |
customMetrics.rocksdbPutCount |
풋 옵션 콜 수입니다. |
customMetrics.rocksdbWriterStallLatencyMs |
작성기에서 압축 또는 플러시가 완료되기를 기다리는 시간입니다. |
customMetrics.rocksdbTotalBytesWrittenByFlush |
플러시로 기록된 총 바이트 |
customMetrics.rocksdbPinnedBlocksMemoryUsage |
고정된 블록의 메모리 사용량 |
customMetrics.rocksdbNumInternalColFamiliesKeys |
내부 컬럼 패밀리의 내부 키 수 |
customMetrics.rocksdbNumExternalColumnFamilies |
외부 열 패밀리의 수 |
customMetrics.rocksdbNumInternalColumnFamilies |
내부 열 그룹 수 |
HDFS 상태 저장소 사용자 지정 메트릭
HDFS 상태 저장소 공급자 동작 및 작업에 대해 수집된 정보입니다.
| Fields | Description |
|---|---|
customMetrics.stateOnCurrentVersionSizeBytes |
현재 버전에서만 예상되는 상태 크기입니다. |
customMetrics.loadedMapCacheHitCount |
공급자에 캐시된 상태에 대한 캐시 적중 횟수입니다. |
customMetrics.loadedMapCacheMissCount |
공급자에 캐시된 상태의 캐시 누락 횟수입니다. |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
특정 상태 저장소 인스턴스에 대해 마지막으로 업로드된 스냅샷 버전입니다. |
중복 제거 사용자 지정 메트릭
중복 제거 동작 및 작업에 대해 수집된 정보입니다.
| Fields | Description |
|---|---|
customMetrics.numDroppedDuplicateRows |
제거된 중복 행 수입니다. |
customMetrics.numRowsReadDuringEviction |
상태 제거 중에 읽은 상태 행의 수입니다. |
집계 사용자 지정 메트릭
집계 동작 및 작업에 대해 수집된 정보입니다.
| Fields | Description |
|---|---|
customMetrics.numRowsReadDuringEviction |
상태 제거 중에 읽은 상태 행의 수입니다. |
스트림 조인 사용자 지정 메트릭
스트림 조인 동작 및 작업에 대해 수집된 정보입니다.
| Fields | Description |
|---|---|
customMetrics.skippedNullValueCount |
null이 spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled로 설정된 경우 건너뛴 true 값의 수입니다. |
transformWithState 사용자 지정 메트릭
transformWithState의 동작 및 운영에 대해 수집된 정보입니다. 자세한 내용은 transformWithState사용자 지정 상태 저장 애플리케이션 빌드를 참조하세요.
| Fields | Description |
|---|---|
customMetrics.initialStateProcessingTimeMs |
모든 초기 상태를 처리하는 데 걸린 시간(밀리초)입니다. |
customMetrics.numValueStateVars |
값 상태 변수의 수입니다.
transformWithStateInPandas에도 참석하였습니다. |
customMetrics.numListStateVars |
목록 상태 변수의 수입니다.
transformWithStateInPandas에도 참석하였습니다. |
customMetrics.numMapStateVars |
지도 상태 변수의 수입니다.
transformWithStateInPandas에도 참석하였습니다. |
customMetrics.numDeletedStateVars |
삭제된 상태 변수의 수입니다.
transformWithStateInPandas에도 참석하였습니다. |
customMetrics.timerProcessingTimeMs |
모든 타이머를 처리하는 데 걸린 시간(밀리초) |
customMetrics.numRegisteredTimers |
등록된 타이머 수입니다.
transformWithStateInPandas에도 참석하였습니다. |
customMetrics.numDeletedTimers |
삭제된 타이머 수입니다.
transformWithStateInPandas에도 참석하였습니다. |
customMetrics.numExpiredTimers |
만료된 타이머 수입니다.
transformWithStateInPandas에도 참석하였습니다. |
customMetrics.numValueStateWithTTLVars |
TTL을 사용하는 값 상태 변수의 수입니다.
transformWithStateInPandas에도 참석하였습니다. |
customMetrics.numListStateWithTTLVars |
TTL을 사용하는 목록 상태 변수의 수입니다.
transformWithStateInPandas에도 참석하였습니다. |
customMetrics.numMapStateWithTTLVars |
TTL을 사용하는 지도 상태 변수의 수입니다.
transformWithStateInPandas에도 참석하였습니다. |
customMetrics.numValuesRemovedDueToTTLExpiry |
TTL 만료로 인해 제거된 값의 수입니다.
transformWithStateInPandas에도 참석하였습니다. |
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry |
TTL 만료로 인해 증분 방식으로 제거된 값의 수입니다. |
소스 개체
개체 유형: Array[SourceProgress]
개체에는 sources 스트리밍 데이터 원본에 대한 정보 및 메트릭이 포함됩니다.
| Fields | Description |
|---|---|
description |
스트리밍 데이터 원본 테이블에 대한 자세한 설명입니다. |
startOffset |
스트리밍 작업이 시작된 데이터 원본 테이블 내의 시작 오프셋 번호입니다. |
endOffset |
마이크로배치에서 처리되었던 마지막 오프셋입니다. |
latestOffset |
마이크로배치에서 처리된 최신 오프셋입니다. |
numInputRows |
이 원본에서 처리된 입력 행의 수입니다. |
inputRowsPerSecond |
이 원본에서 처리하기 위해 데이터가 도착하는 속도(초)입니다. |
processedRowsPerSecond |
Spark가 이 원본의 데이터를 처리하는 속도입니다. |
metrics |
유형: ju.Map[String, String]. 특정 데이터 원본에 대한 사용자 지정 메트릭을 포함합니다. |
Azure Databricks는 다음 원본 개체 구현을 제공합니다.
Note
양식 sources.<startOffset / endOffset / latestOffset>.* (또는 일부 변형)에 정의된 필드의 경우 표시된 자식 필드를 포함하는 3개의 가능한 필드 중 하나로 해석합니다.
sources.startOffset.<child-field>sources.endOffset.<child-field>sources.latestOffset.<child-field>
Delta Lake 소스 개체
델타 테이블 스트리밍 데이터 원본에 사용되는 사용자 지정 메트릭에 대한 정의입니다.
| Fields | Description |
|---|---|
sources.description |
스트리밍 쿼리가 읽는 원본에 대한 설명입니다. 예: “DeltaSource[table]” |
sources.<startOffset / endOffset>.sourceVersion |
이 오프셋이 인코딩된 serialization의 버전입니다. |
sources.<startOffset / endOffset>.reservoirId |
읽고 있는 테이블의 ID입니다. 쿼리를 다시 시작할 때 잘못된 구성을 검색하는 데 사용됩니다. Map Unity 카탈로그, Delta Lake 및 구조적 스트리밍 메트릭 테이블 식별자를 참조하세요. |
sources.<startOffset / endOffset>.reservoirVersion |
현재 처리 중인 테이블의 버전입니다. |
sources.<startOffset / endOffset>.index |
이 버전의 시퀀스에 있는 인덱스 AddFiles입니다. 이는 큰 커밋을 여러 일괄 처리로 분리하는 데 사용됩니다. 이 인덱스는 modificationTimestamp와 path으로 정렬하여 만들어집니다. |
sources.<startOffset / endOffset>.isStartingVersion |
초기 데이터가 처리된 후 발생한 변경 내용을 처리하는 대신 현재 오프셋이 새 스트리밍 쿼리의 시작을 표시하는지 여부를 식별합니다. 새 쿼리를 시작할 때 시작 시 테이블에 있는 모든 데이터가 먼저 처리된 다음 도착하는 모든 새 데이터가 처리됩니다. |
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis |
이벤트 시간 순서 지정을 위해 기록된 이벤트 시간입니다. 처리 보류 중인 초기 스냅샷 데이터의 이벤트 시간입니다. 이벤트 시간 순서를 사용하여 초기 스냅샷을 처리할 때 사용됩니다. |
sources.latestOffset |
마이크로배치 쿼리에서 처리된 가장 최근의 오프셋입니다. |
sources.numInputRows |
이 원본에서 처리된 입력 행의 수입니다. |
sources.inputRowsPerSecond |
이 원본에서 처리하기 위해 데이터가 도착하는 속도입니다. |
sources.processedRowsPerSecond |
Spark가 이 원본의 데이터를 처리하는 속도입니다. |
sources.metrics.numBytesOutstanding |
미해결 파일(RocksDB에서 추적한 파일)의 결합된 크기입니다. 이는 스트리밍 원본으로 델타 및 자동 로더에 대한 백로그 메트릭입니다. |
sources.metrics.numFilesOutstanding |
처리할 미해결 파일 수입니다. 이는 스트리밍 원본으로 델타 및 자동 로더에 대한 백로그 메트릭입니다. |
Apache Kafka 소스 개체
Apache Kafka 스트리밍 데이터 원본에 사용되는 사용자 지정 메트릭에 대한 정의입니다.
| Fields | Description |
|---|---|
sources.description |
읽는 정확한 Kafka 토픽을 지정하는 Kafka 원본에 대한 자세한 설명입니다. 예: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” |
sources.startOffset |
스트리밍 작업이 시작된 Kafka 토픽 내의 시작 오프셋 번호입니다. |
sources.endOffset |
마이크로배치에서 처리되었던 마지막 오프셋입니다. 이는 진행 중인 마이크로배치 실행에 대해 latestOffset와 같을 수 있습니다. |
sources.latestOffset |
마이크로배치가 파악한 최신 오프셋입니다. 마이크로배칭 프로세스는 속도 제한이 있을 때 모든 오프셋을 처리하지 않을 수 있으며, 이로 인해 endOffset와 latestOffset이 서로 다르게 됩니다. |
sources.numInputRows |
이 원본에서 처리된 입력 행의 수입니다. |
sources.inputRowsPerSecond |
이 원본에서 처리하기 위해 데이터가 도착하는 속도입니다. |
sources.processedRowsPerSecond |
Spark가 이 원본의 데이터를 처리하는 속도입니다. |
sources.metrics.avgOffsetsBehindLatest |
모든 구독된 항목 중 스트리밍 쿼리가 사용 가능한 최신 오프셋보다 뒤쳐져 있는 평균 오프셋 수입니다. |
sources.metrics.estimatedTotalBytesBehindLatest |
구독된 토픽에서 쿼리 프로세스가 사용하지 않은 예상 바이트 수입니다. |
sources.metrics.maxOffsetsBehindLatest |
스트리밍 쿼리가 구독된 모든 항목 중에서 사용 가능한 최신 오프셋 뒤에 있는 최대 오프셋 수입니다. |
sources.metrics.minOffsetsBehindLatest |
스트리밍 쿼리가 구독된 모든 토픽에서 사용 가능한 최신 오프셋에 뒤처진 최소 오프셋 수입니다. |
자동 로더 원본 메트릭
자동 로더 스트리밍 데이터 원본에 사용되는 사용자 지정 메트릭에 대한 정의입니다.
| Fields | Description |
|---|---|
sources.<startOffset / endOffset / latestOffset>.seqNum |
파일이 검색된 순서대로 처리되는 파일 시퀀스의 현재 위치입니다. |
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
cloudFiles 원본의 구현 버전입니다. |
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs |
가장 최근 백필 작업의 시작 시간입니다. |
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs |
가장 최근 백필 작업의 종료 시간입니다. |
sources.<startOffset / endOffset / latestOffset>.lastInputPath |
스트림을 다시 시작하기 전에 스트림의 마지막 사용자가 제공한 입력 경로입니다. |
sources.metrics.numFilesOutstanding |
백로그의 파일 수 |
sources.metrics.numBytesOutstanding |
백로그에 있는 파일의 크기(바이트) |
sources.metrics.approximateQueueSize |
메시지 큐의 대략적인 크기입니다. cloudFiles.useNotifications 옵션을 사용하도록 설정한 경우에만 해당합니다. |
sources.numInputRows |
이 원본에서 처리된 입력 행의 수입니다. 원본 형식 binaryFile 의 numInputRows 경우 파일 수와 같습니다. |
PubSub 소스 지표
PubSub 스트리밍 데이터 원본에 사용되는 사용자 지정 메트릭에 대한 정의입니다. PubSub 스트리밍 원본 모니터링에 대한 자세한 내용은 스트리밍 메트릭 모니터링을 참조하세요.
| Fields | Description |
|---|---|
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
이 오프셋이 인코딩되는 구현 버전입니다. |
sources.<startOffset / endOffset / latestOffset>.seqNum |
처리 중인 지속형 시퀀스 번호입니다. |
sources.<startOffset / endOffset / latestOffset>.fetchEpoch |
처리 중인 가장 큰 fetch epoch입니다. |
sources.metrics.numRecordsReadyToProcess |
현재 백로그에서 처리할 수 있는 레코드 수입니다. |
sources.metrics.sizeOfRecordsReadyToProcess |
현재 백로그에서 처리되지 않은 데이터의 총 크기(바이트)입니다. |
sources.metrics.numDuplicatesSinceStreamStart |
시작된 이후 스트림에서 처리된 중복 레코드의 총 수입니다. |
펄서 소스 지표
펄서 스트리밍 데이터 원본에 사용되는 사용자 지정 메트릭에 대한 정의입니다.
| Fields | Description |
|---|---|
sources.metrics.numInputRows |
현재 마이크로 일괄 처리에서 처리된 행 수입니다. |
sources.metrics.numInputBytes |
현재 마이크로 일괄 처리에서 처리된 총 바이트 수입니다. |
싱크 객체
개체 유형: SinkProgress
| Fields | Description |
|---|---|
sink.description |
사용 중인 특정 싱크 구현을 자세히 설명하는 싱크에 대한 설명입니다. |
sink.numOutputRows |
출력 행의 수입니다. 싱크 유형에 따라 값에 대한 동작이나 제한이 다를 수 있습니다. 지원되는 특정 형식 참조 |
sink.metrics |
ju.Map[String, String] 싱크 메트릭에 대한. |
현재 Azure Databricks는 다음 두 가지 특정 sink 개체 구현을 제공합니다.
| 싱크 형식 | Details |
|---|---|
| 델타 테이블 | 델타 싱크 개체를 참조하세요. |
| Apache Kafka 토픽 | Kafka 싱크 개체를 참조하십시오. |
sink.metrics 필드는 개체의 sink 두 변형에 대해 동일하게 동작합니다.
Delta Lake 데이터 처리 개체
| Fields | Description |
|---|---|
sink.description |
사용되는 특정 델타 싱크 구현을 자세히 설명하는 델타 싱크에 대한 설명입니다. 예: “DeltaSink[table]” |
sink.numOutputRows |
행 수는 항상 -1 Spark가 Delta Lake 싱크에 대한 분류인 DSv1 싱크의 출력 행을 유추할 수 없기 때문입니다. |
Apache Kafka Sink 오브젝트
| Fields | Description |
|---|---|
sink.description |
스트리밍 쿼리가 작성 중인 Kafka 싱크에 대한 설명으로, 사용 중인 특정 Kafka 싱크 구현을 자세히 설명합니다. 예: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” |
sink.numOutputRows |
마이크로배치 작업에 따라 출력 테이블이나 데이터 싱크에 기록된 행 수입니다. 경우에 따라 이 값은 "-1"일 수 있으며 일반적으로 "알 수 없음"으로 해석될 수 있습니다. |
Examples
Kafka-to-Kafka StreamingQueryListener 이벤트 예제
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
예제 Delta Lake-To-Delta Lake StreamingQueryListener 이벤트
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Kinesis와 Delta Lake StreamingQueryListener 이벤트 예제
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Kafka+Delta Lake-To-Delta Lake StreamingQueryListener 이벤트 예제
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Delta Lake StreamingQueryListener 이벤트의 예제 속도 소스
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}