Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
O Azure Databricks fornece monitoramento interno para aplicativos de Streaming Estruturado por meio da interface do usuário do Spark na guia Streaming .
Distinguir consultas de streaming estruturado na interface do usuário do Spark
Forneça aos seus streams um nome de consulta exclusivo adicionando .queryName(<query-name>) ao seu writeStream código para distinguir facilmente quais métricas pertencem a qual fluxo na interface do usuário do Spark.
Envie métricas de Streaming estruturado para serviços externos
As métricas de streaming podem ser enviadas por push para serviços externos para alertas ou painéis de casos de uso usando a interface Streaming Query Listener do Apache Spark. No Databricks Runtime 11.3 LTS e superior, StreamingQueryListener está disponível em Python e Scala.
Important
As limitações a seguir se aplicam a cargas de trabalho que usam modos de acesso de computação habilitados para Unity Catalog:
-
StreamingQueryListenerrequer o Databricks Runtime 15.1 ou superior para usar credenciais ou interagir com objetos gerenciados pelo Unity Catalog na computação com modo de acesso dedicado. -
StreamingQueryListenerrequer o Databricks Runtime 16.1 ou superior para cargas de trabalho Scala configuradas com o modo de acesso padrão (anteriormente modo de acesso compartilhado).
Note
A latência de processamento com ouvintes pode afetar significativamente as velocidades de processamento de consultas. É aconselhável limitar a lógica de processamento nesses ouvintes e optar por escrever em sistemas de resposta rápida como Kafka para eficiência.
Se a consulta não tiver dados disponíveis na fonte e estiver aguardando novos dados, uma onQueryIdle mensagem será entregue ao ouvinte de consulta de streaming. Uma onQueryProgress mensagem só é entregue no final do lote de processamento da consulta de streaming. Se a consulta estiver a processar dados por um longo tempo, é possível que nem os eventos onQueryIdle nem os eventos onQueryProgress sejam enviados, mas a consulta ainda está saudável e continua a processar dados.
O código a seguir fornece exemplos básicos da sintaxe para implementar um ouvinte:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definição de métricas observáveis no Streaming Estruturado
As métricas observáveis são chamadas de funções agregadas arbitrárias que podem ser definidas em uma consulta (DataFrame). Assim que a execução de um DataFrame atinge um ponto de conclusão (ou seja, conclui uma consulta em lote ou atinge uma época de streaming), é emitido um evento nomeado que contém as métricas para os dados processados desde o último ponto de conclusão.
Você pode observar essas métricas anexando um ouvinte à sessão do Spark. O ouvinte depende do modo de execução:
Modo de lote: Use
QueryExecutionListener.QueryExecutionListeneré chamado quando a consulta é concluída. Acesse as métricas usando oQueryExecution.observedMetricsmapa.Streaming, ou microlote: Use
StreamingQueryListener.StreamingQueryListeneré chamado quando a consulta de streaming completa uma época. Acesse as métricas usando oStreamingQueryProgress.observedMetricsmapa. O Azure Databricks não oferece suporte ao modo de gatilhocontinuouspara streaming.
Por exemplo:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Mapear identificadores de tabela de métricas Unity Catalog, Delta Lake e Structured Streaming
As métricas de Streaming estruturado usam o reservoirId campo em vários locais para a identidade exclusiva de uma tabela Delta usada como fonte para uma consulta de streaming.
O campo reservoirId mapeia o identificador exclusivo armazenado pela tabela Delta no log de transações Delta. Essa ID não é mapeada para o tableId valor atribuído pelo Catálogo Unity e exibido no Gerenciador de Catálogos.
Use a sintaxe a seguir para revisar o identificador de tabela para uma tabela Delta. Isso funciona para tabelas geridas do Unity Catalog, tabelas externas do Unity Catalog e todas as tabelas Delta do metastore Hive.
DESCRIBE DETAIL <table-name>
O campo id exibido nos resultados é o identificador que corresponde ao reservoirId nas métricas de streaming.
Métricas de objeto StreamingQueryListener
| Fields | Description |
|---|---|
id |
Um ID de consulta exclusivo que persiste nas reinicializações. |
runId |
Um ID de consulta exclusivo para cada início/reinício. Ver StreamingQuery.runId(). |
name |
O nome da consulta especificado pelo utilizador. Nome é nulo se nenhum nome for especificado. |
timestamp |
A marca temporal para a execução do microlote. |
batchId |
Um ID exclusivo para o lote atual de dados que está sendo processado. No caso de novas tentativas após uma falha, um determinado ID de lote pode ser executado mais de uma vez. Da mesma forma, quando não há dados a serem processados, o ID do lote não é incrementado. |
batchDuration |
A duração de processamento de uma operação em lote, em milissegundos. |
numInputRows |
O número agregado (em todas as fontes) de registros processados em um gatilho. |
inputRowsPerSecond |
A taxa global (em todas as fontes) de recepção de dados. |
processedRowsPerSecond |
A taxa agregada (em todas as fontes) na qual o Spark está processando dados. |
StreamingQueryListener Também define os seguintes campos que contêm objetos que podem ser examinados para métricas do cliente e detalhes do progresso da fonte:
| Fields | Description |
|---|---|
durationMs |
Tipo: ju.Map[String, JLong]. Consulte durationMs object. |
eventTime |
Tipo: ju.Map[String, String]. Consulte o objeto eventTime. |
stateOperators |
Tipo: Array[StateOperatorProgress]. Consulte o objeto stateOperators. |
sources |
Tipo: Array[SourceProgress]. Consulte o objeto sources. |
sink |
Tipo: SinkProgress. Consulte o objeto do coletor. |
observedMetrics |
Tipo: ju.Map[String, Row]. Funções de agregação arbitrárias nomeadas que podem ser definidas em um DataFrame/query (como df.observe). |
durationMs objeto
Tipo de objeto: ju.Map[String, JLong]
Informações sobre o tempo necessário para concluir várias etapas do processo de execução do microlote.
| Fields | Description |
|---|---|
durationMs.addBatch |
O tempo necessário para executar o microlote. Isso exclui o tempo que o Spark leva para planejar o microlote. |
durationMs.getBatch |
O tempo necessário para recuperar os metadados sobre os deslocamentos da fonte. |
durationMs.latestOffset |
O último offset consumido para o microlote. Este objeto de progresso refere-se ao tempo necessário para recuperar o offset mais recente das fontes. |
durationMs.queryPlanning |
O tempo necessário para gerar o plano de execução. |
durationMs.triggerExecution |
O tempo necessário para planear e executar o microlote. |
durationMs.walCommit |
Tempo necessário para confirmar os novos offsets disponíveis. |
durationMs.commitBatch |
O tempo necessário para confirmar os dados gravados no destino durante addBatch. Presente apenas para pias que suportam commit. |
durationMs.commitOffsets |
O tempo necessário para confirmar o lote no log de confirmação. |
objeto eventTime
Tipo de objeto: ju.Map[String, String]
Informações sobre o valor de tempo do evento visto nos dados que estão sendo processados no microlote. Esses dados são usados pela marca temporal para determinar como ajustar o estado para processar agregações com estado preservado definidas na tarefa de Streaming Estruturado.
| Fields | Description |
|---|---|
eventTime.avg |
O tempo médio de evento visto nesse gatilho. |
eventTime.max |
O tempo máximo de evento visto nesse gatilho. |
eventTime.min |
O tempo mínimo do evento observado nesse gatilho. |
eventTime.watermark |
O valor da marca d'água usada nesse gatilho. |
objeto stateOperators
Tipo de objeto: Array[StateOperatorProgress] O stateOperators objeto contém informações sobre as operações com estado definidas no job de Streaming Estruturado e as agregações produzidas a partir delas.
Para obter mais detalhes sobre operadores de estado de fluxo, consulte O que é streaming com monitoração de estado?.
| Fields | Description |
|---|---|
stateOperators.operatorName |
O nome do operador stateful ao qual as métricas se relacionam, como symmetricHashJoin, dedupeou stateStoreSave. |
stateOperators.numRowsTotal |
O número total de linhas no estado como resultado de um operador ou agregação com monitoração de estado. |
stateOperators.numRowsUpdated |
O número total de linhas atualizadas no estado como resultado de um operador ou agregação orientada a estado. |
stateOperators.allUpdatesTimeMs |
Atualmente, essa métrica não é mensurável pelo Spark e está planejada para ser removida em atualizações futuras. |
stateOperators.numRowsRemoved |
O número total de linhas removidas do estado como resultado de um operador ou agregação com monitoração de estado. |
stateOperators.allRemovalsTimeMs |
Atualmente, essa métrica não é mensurável pelo Spark e está planejada para ser removida em atualizações futuras. |
stateOperators.commitTimeMs |
O tempo necessário para confirmar todas as atualizações (inserções e remoções) e retornar uma nova versão. |
stateOperators.memoryUsedBytes |
Memória usada pelo armazenamento de estado. |
stateOperators.numRowsDroppedByWatermark |
O número de linhas que são consideradas tardias demais para serem incluídas em uma agregação com estado. Agregações de streaming apenas: o número de linhas descartadas pós-agregação (não linhas de entrada brutas). Este número não é preciso, mas fornece uma indicação de que dados em atraso estão a ser descartados. |
stateOperators.numShufflePartitions |
O número de partições aleatórias para este operador stateful. |
stateOperators.numStateStoreInstances |
A instância de armazenamento de estado real que o operador inicializou e manteve. Para muitos operadores com estado, isso é o mesmo que o número de partições. No entanto, as junções de fluxo para fluxo inicializam quatro instâncias de armazenamento de estado por partição. |
stateOperators.customMetrics |
Consulte stateOperators.customMetrics neste tópico para obter mais detalhes. |
Objeto de métrica personalizada de StateOperatorProgress
Tipo de objeto: ju.Map[String, JLong]
StateOperatorProgress tem um campo, customMetrics, que contém as métricas específicas para o recurso que você está usando ao coletar essas métricas.
| Feature | Description |
|---|---|
| Armazenamento de estado do RocksDB | Métricas para armazenamento de estado RocksDB. |
| Armazenamento de estado HDFS | Métricas para armazenamento de estado HDFS. |
| Deduplicação de fluxos de dados | Métricas para desduplicação de linhas. |
| Agregação de cursos de água | Métricas para agregação de linhas. |
| Operador de junção de fluxo | Métricas para operador de junção de fluxos. |
transformWithState |
Métricas para transformWithState operador. |
Métricas personalizadas do armazenamento de estado do RocksDB
Informações coletadas do RocksDB capturando métricas sobre seu desempenho e operações em relação aos valores de estado que mantém para o trabalho de Streaming Estruturado. Para obter mais informações, consulte Configurar o armazenamento de estado do RocksDB no Azure Databricks.
| Fields | Description |
|---|---|
customMetrics.rocksdbBytesCopied |
O número de bytes copiados conforme rastreados pelo Gerenciador de arquivos RocksDB. |
customMetrics.rocksdbCommitCheckpointLatency |
O tempo em milissegundos necessário para tirar um instantâneo do RocksDB nativo e gravá-lo num diretório local. |
customMetrics.rocksdbCompactLatency |
O tempo em milissegundos de compactação (opcional) durante a confirmação do ponto de verificação. |
customMetrics.rocksdbCommitCompactLatency |
O tempo de compactação durante o commit, em milissegundos. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
O tempo, em milissegundos, para sincronizar o snapshot nativo do RocksDB com o armazenamento externo (local do ponto de verificação). |
customMetrics.rocksdbCommitFlushLatency |
O tempo em milissegundos para descarregar as alterações na memória do RocksDB para o disco local. |
customMetrics.rocksdbCommitPauseLatency |
O tempo em milissegundos necessário para a paragem dos processos de trabalho em segundo plano como parte do commit do ponto de verificação, como para compactação. |
customMetrics.rocksdbCommitWriteBatchLatency |
O tempo em milissegundos ao aplicar as gravações por etapas na estrutura de memória (WriteBatch) no RocksDB nativo. |
customMetrics.rocksdbFilesCopied |
O número de ficheiros copiados, de acordo com o registado pelo Gestor de Ficheiros do RocksDB. |
customMetrics.rocksdbFilesReused |
O número de arquivos reutilizados conforme rastreado pelo RocksDB File Manager. |
customMetrics.rocksdbGetCount |
O número de get chamadas (não inclui gets de WriteBatch - lote na memória usado para armazenamento temporário de gravações). |
customMetrics.rocksdbGetLatency |
O tempo médio de uma chamada nativa subjacente RocksDB::Get em nanossegundos. |
customMetrics.rocksdbReadBlockCacheHitCount |
A contagem de acessos ao cache de bloco no RocksDB. |
customMetrics.rocksdbReadBlockCacheMissCount |
A contagem do cache de bloco falha no RocksDB. |
customMetrics.rocksdbSstFileSize |
O tamanho de todos os arquivos SST (Static Sorted Table) na instância do RocksDB. |
customMetrics.rocksdbTotalBytesRead |
O número de bytes não compactados lidos por get operações. |
customMetrics.rocksdbTotalBytesWritten |
O número total de bytes não compactados gravados por put operações. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
O número total de bytes de dados não compactados lidos usando um iterador. Algumas operações com estado (por exemplo, processamento de timeout e marca d'água em FlatMapGroupsWithState) exigem a leitura de dados para o Azure Databricks por meio de um iterador. |
customMetrics.rocksdbTotalBytesReadByCompaction |
O número de bytes que o processo de compactação lê do disco. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
O número total de bytes que o processo de compactação grava no disco. |
customMetrics.rocksdbTotalCompactionLatencyMs |
O tempo em milissegundos para compactações do RocksDB, incluindo compactações em segundo plano e a compactação opcional iniciada durante a confirmação. |
customMetrics.rocksdbTotalFlushLatencyMs |
O tempo total de descarga, incluindo lavagem de fundo. As operações de descarga são processos pelos quais o MemTable é descarregado para o armazenamento quando está cheio.
MemTables são o primeiro nível onde os dados são armazenados no RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
O tamanho em bytes dos arquivos zip não compactados, conforme relatado pelo Gerenciador de arquivos. O Gerenciador de arquivos gerencia a utilização e a exclusão do espaço em disco do arquivo SST físico. |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
A versão mais recente do snapshot RocksDB salvo no local do ponto de verificação. Um valor de "-1" indica que nunca foi salvo nenhum instantâneo. Como os instantâneos são específicos para cada instância de armazenamento de estado, essa métrica se aplica a um ID de partição e nome de armazenamento de estado específicos. |
customMetrics.rocksdbPutLatency |
A latência total da chamada colocada. |
customMetrics.rocksdbPutCount |
O número de chamadas colocadas. |
customMetrics.rocksdbWriterStallLatencyMs |
O gravador aguarda o tempo de compactação ou descarga para terminar. |
customMetrics.rocksdbTotalBytesWrittenByFlush |
O total de bytes gravados por flush |
customMetrics.rocksdbPinnedBlocksMemoryUsage |
O uso de memória para blocos fixos |
customMetrics.rocksdbNumInternalColFamiliesKeys |
O número de chaves internas para famílias de colunas internas |
customMetrics.rocksdbNumExternalColumnFamilies |
O número de famílias de colunas externas |
customMetrics.rocksdbNumInternalColumnFamilies |
O número de famílias de colunas internas |
Métricas personalizadas do armazenamento de estado do HDFS
Informações coletadas sobre comportamentos e operações do provedor de armazenamento de estado do HDFS.
| Fields | Description |
|---|---|
customMetrics.stateOnCurrentVersionSizeBytes |
O tamanho estimado do estado somente na versão atual. |
customMetrics.loadedMapCacheHitCount |
A contagem de cache atingida em estados armazenados em cache no provedor. |
customMetrics.loadedMapCacheMissCount |
A contagem de cache perdida em estados armazenados em cache no provedor. |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
A última versão carregada do snapshot para uma instância de armazenamento de estado específica. |
Métricas personalizadas de deduplicação
Informações coletadas sobre comportamentos e operações de desduplicação.
| Fields | Description |
|---|---|
customMetrics.numDroppedDuplicateRows |
O número de linhas duplicadas foi reduzido. |
customMetrics.numRowsReadDuringEviction |
O número de linhas estaduais lidas durante o despejo estadual. |
Métricas personalizadas de agregação
Informações coletadas sobre comportamentos e operações de agregação.
| Fields | Description |
|---|---|
customMetrics.numRowsReadDuringEviction |
O número de linhas estaduais lidas durante o despejo estadual. |
Métricas personalizadas de junção de fluxo
Informações coletadas sobre comportamentos e operações de junção de fluxo.
| Fields | Description |
|---|---|
customMetrics.skippedNullValueCount |
O número de valores ignorados null , quando spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled é definido como true. |
métricas personalizadas transformWithState
Informações coletadas sobre transformWithState comportamentos e operações (TWS). Para obter mais detalhes sobre transformWithState, consulte Criar uma aplicação com estado interno personalizado.
| Fields | Description |
|---|---|
customMetrics.initialStateProcessingTimeMs |
Número de milissegundos necessários para processar todo o estado inicial. |
customMetrics.numValueStateVars |
Número de variáveis de estado de valor. Também presente para transformWithStateInPandas. |
customMetrics.numListStateVars |
Número de variáveis de estado da lista. Também presente para transformWithStateInPandas. |
customMetrics.numMapStateVars |
Número de variáveis de estado do mapa. Também presente para transformWithStateInPandas. |
customMetrics.numDeletedStateVars |
Número de variáveis de estado excluídas. Também presente para transformWithStateInPandas. |
customMetrics.timerProcessingTimeMs |
Número de milissegundos necessários para processar todos os temporizadores |
customMetrics.numRegisteredTimers |
Número de temporizadores registados. Também presente para transformWithStateInPandas. |
customMetrics.numDeletedTimers |
Número de temporizadores eliminados. Também presente para transformWithStateInPandas. |
customMetrics.numExpiredTimers |
Número de temporizadores expirados. Também presente para transformWithStateInPandas. |
customMetrics.numValueStateWithTTLVars |
Número de variáveis de estado de valor com TTL. Também presente para transformWithStateInPandas. |
customMetrics.numListStateWithTTLVars |
Número de variáveis de estado da lista com TTL. Também presente para transformWithStateInPandas. |
customMetrics.numMapStateWithTTLVars |
Número de variáveis de estado do mapa com TTL. Também presente para transformWithStateInPandas. |
customMetrics.numValuesRemovedDueToTTLExpiry |
Número de valores removidos devido à expiração do TTL. Também presente para transformWithStateInPandas. |
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry |
Número de valores removidos incrementalmente devido à expiração do TTL. |
objeto de fontes
Tipo de objeto: Array[SourceProgress]
O sources objeto contém informações e métricas para fontes de dados de streaming.
| Fields | Description |
|---|---|
description |
Uma descrição detalhada da tabela da fonte de dados de streaming. |
startOffset |
A posição inicial na tabela da fonte de dados onde o trabalho de streaming foi iniciado. |
endOffset |
O último offset processado pelo microlote. |
latestOffset |
O último offset processado pelo microlote. |
numInputRows |
O número de linhas de entrada processadas a partir dessa fonte. |
inputRowsPerSecond |
A taxa a que os dados estão a chegar para processamento desta fonte, em segundos. |
processedRowsPerSecond |
A taxa na qual o Spark está processando dados dessa fonte. |
metrics |
Tipo: ju.Map[String, String]. Contém métricas personalizadas para uma fonte de dados específica. |
O Azure Databricks fornece a seguinte implementação de objeto de códigos-fonte:
Note
Para campos definidos no formulário sources.<startOffset / endOffset / latestOffset>.* (ou alguma variação), interprete-o como um dos (até estes) 3 campos possíveis, todos contendo o campo filho indicado:
sources.startOffset.<child-field>sources.endOffset.<child-field>sources.latestOffset.<child-field>
Objeto fonte do Delta Lake
Definições para métricas personalizadas usadas para fontes de dados de streaming da tabela Delta.
| Fields | Description |
|---|---|
sources.description |
A descrição da fonte a partir da qual a consulta de streaming está sendo lida. Por exemplo: “DeltaSource[table]”. |
sources.<startOffset / endOffset>.sourceVersion |
A versão da serialização com a qual esse deslocamento é codificado. |
sources.<startOffset / endOffset>.reservoirId |
O ID da tabela que está a ser lida. Isso é usado para detetar erros de configuração ao reiniciar uma consulta. Consulte Map Unity Catalog, Delta Lake e os identificadores de tabelas de métricas de Streaming Estruturado. |
sources.<startOffset / endOffset>.reservoirVersion |
A versão da tabela que está a ser processada no momento. |
sources.<startOffset / endOffset>.index |
O índice na sequência de AddFiles nesta versão. Isso é usado para dividir grandes commits em vários lotes. Esse índice é criado classificando em modificationTimestamp e path. |
sources.<startOffset / endOffset>.isStartingVersion |
Identifica se o deslocamento atual indica o início de uma nova consulta de transmissão, em vez do processamento de alterações que ocorreram após o processamento inicial dos dados. Ao iniciar uma nova consulta, todos os dados presentes na tabela no início são processados primeiro e, em seguida, todos os novos dados que chegam. |
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis |
Hora do evento registada para ordenação cronológica. A hora do evento dos dados de instantâneo iniciais que estão pendentes para serem processados. Usado ao processar um instantâneo inicial em ordem cronológica dos eventos. |
sources.latestOffset |
O último deslocamento processado pela consulta de microbatch. |
sources.numInputRows |
O número de linhas de entrada processadas a partir dessa fonte. |
sources.inputRowsPerSecond |
O ritmo a que os dados estão a chegar para processamento a partir desta fonte. |
sources.processedRowsPerSecond |
A taxa na qual o Spark está processando dados dessa fonte. |
sources.metrics.numBytesOutstanding |
O tamanho combinado dos arquivos pendentes (arquivos rastreados pelo RocksDB). Esta é a métrica de backlog para Delta e Auto Loader como a fonte de streaming. |
sources.metrics.numFilesOutstanding |
O número de processos pendentes a processar. Esta é a métrica de backlog para Delta e Auto Loader como a fonte de streaming. |
Objeto de códigos-fonte Apache Kafka
Definições para métricas personalizadas usadas para fontes de dados de streaming do Apache Kafka.
| Fields | Description |
|---|---|
sources.description |
Uma descrição detalhada da fonte Kafka, especificando o tópico exato de Kafka a partir do qual se está a ler. Por exemplo: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”. |
sources.startOffset |
O número de deslocamento inicial dentro do tópico Kafka no qual o trabalho de streaming começou. |
sources.endOffset |
O último offset processado pelo microlote. Isto pode corresponder a latestOffset numa execução de microlote em andamento. |
sources.latestOffset |
O último deslocamento calculado pelo microlote. O processo de microlote pode não processar todos os deslocamentos quando há regulação, o que resulta em endOffset e latestOffset serem diferentes. |
sources.numInputRows |
O número de linhas de entrada processadas a partir dessa fonte. |
sources.inputRowsPerSecond |
O ritmo a que os dados estão a chegar para processamento a partir desta fonte. |
sources.processedRowsPerSecond |
A taxa na qual o Spark está processando dados dessa fonte. |
sources.metrics.avgOffsetsBehindLatest |
O número médio de offsets que a consulta de streaming está atrás do último offset disponível entre todos os tópicos subscritos. |
sources.metrics.estimatedTotalBytesBehindLatest |
O número estimado de bytes que o processo de consulta não consumiu dos tópicos inscritos. |
sources.metrics.maxOffsetsBehindLatest |
O número máximo de compensações que a consulta de streaming está por trás do último deslocamento disponível entre todos os tópicos inscritos. |
sources.metrics.minOffsetsBehindLatest |
O número mínimo de offsets que a consulta de streaming está atrás do offset disponível mais recente entre todos os tópicos subscritos. |
Métricas de origem do Auto Loader
Definições para métricas personalizadas usadas para fontes de dados de streaming do Auto Loader.
| Fields | Description |
|---|---|
sources.<startOffset / endOffset / latestOffset>.seqNum |
A posição atual na sequência de arquivos que estão sendo processados na ordem em que os arquivos foram descobertos. |
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
A versão de implementação da fonte cloudFiles. |
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs |
A hora de início da operação de enchimento mais recente. |
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs |
A hora de término da operação de enchimento mais recente. |
sources.<startOffset / endOffset / latestOffset>.lastInputPath |
O último caminho de entrada fornecido pelo usuário do fluxo antes que o fluxo fosse reiniciado. |
sources.metrics.numFilesOutstanding |
O número de arquivos na lista de pendências |
sources.metrics.numBytesOutstanding |
O tamanho (bytes) dos arquivos na lista de pendências |
sources.metrics.approximateQueueSize |
O tamanho aproximado da fila de mensagens. Somente quando a opção cloudFiles.useNotifications estiver ativada. |
sources.numInputRows |
O número de linhas de entrada processadas a partir dessa fonte. Para o binaryFile formato de origem, numInputRows é igual ao número de arquivos. |
Métricas de fontes do PubSub
Definições para métricas personalizadas usadas para fontes de dados de streaming PubSub. Para obter mais detalhes sobre o monitoramento de fontes de streaming do PubSub, consulte Monitoramento de métricas de streaming.
| Fields | Description |
|---|---|
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
A versão de implementação com a qual esse deslocamento é codificado. |
sources.<startOffset / endOffset / latestOffset>.seqNum |
O número de sequência persistente que está a ser processado. |
sources.<startOffset / endOffset / latestOffset>.fetchEpoch |
A maior época de extração a ser processada. |
sources.metrics.numRecordsReadyToProcess |
O número de registros disponíveis para processamento na lista de pendências atual. |
sources.metrics.sizeOfRecordsReadyToProcess |
O tamanho total, em bytes, de dados não processados na lista de pendências atual. |
sources.metrics.numDuplicatesSinceStreamStart |
A contagem total de registros duplicados processados pelo fluxo desde que ele começou. |
Métricas de fontes de pulsar
Definições para métricas personalizadas usadas para fontes de dados de streaming do Pulsar.
| Fields | Description |
|---|---|
sources.metrics.numInputRows |
O número de linhas processadas no microbatch atual. |
sources.metrics.numInputBytes |
O número total de bytes processados no microlote atual. |
objeto de escoamento
Tipo de objeto: SinkProgress
| Fields | Description |
|---|---|
sink.description |
A descrição do sumidouro, detalhando a implementação específica do sumidouro usada. |
sink.numOutputRows |
O número de linhas de saída. Diferentes tipos de coletor podem ter comportamentos ou restrições diferentes para os valores. Veja os tipos específicos suportados |
sink.metrics |
ju.Map[String, String] de métricas de sumidouro. |
Atualmente, o Azure Databricks fornece duas implementações de objeto específicas sink :
| Tipo de lava-louça | Details |
|---|---|
| Tabela delta | Consulte Objeto receptáculo Delta. |
| Tópico Apache Kafka | Veja o objeto da pia de Kafka. |
O sink.metrics campo se comporta da mesma forma para ambas as variantes do sink objeto.
Objeto do sumidouro do Lago Delta
| Fields | Description |
|---|---|
sink.description |
A descrição do sumidouro Delta, detalhando a implementação específica do sumidouro Delta que está sendo usada. Por exemplo: “DeltaSink[table]”. |
sink.numOutputRows |
O número de linhas é sempre -1 porque o Spark não pode inferir linhas de saída para coletores DSv1, que é a classificação para o dissipador Delta Lake. |
Objeto de destino Apache Kafka
| Fields | Description |
|---|---|
sink.description |
A descrição do coletor Kafka no qual a consulta de streaming está escrevendo, detalhando a implementação específica do coletor Kafka que está sendo usada. Por exemplo: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”. |
sink.numOutputRows |
O número de linhas que foram gravadas na tabela de saída ou no coletor como parte do microlote. Para algumas situações, este valor pode ser "-1" e geralmente pode ser interpretado como "desconhecido". |
Examples
Exemplo de evento Kafka-to-Kafka StreamingQueryListener
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Exemplo de evento Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Exemplo de evento Kinesis-to-Delta Lake StreamingQueryListener
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Exemplo de evento Kafka+Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Exemplo de fonte de taxa para o evento Delta Lake StreamingQueryListener
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}