Partilhar via


Monitorizar as consultas de Transmissão em Fluxo Estruturada no Azure Databricks

O Azure Databricks fornece monitoramento interno para aplicativos de Streaming Estruturado por meio da interface do usuário do Spark na guia Streaming .

Distinguir consultas de streaming estruturado na interface do usuário do Spark

Forneça aos seus streams um nome de consulta exclusivo adicionando .queryName(<query-name>) ao seu writeStream código para distinguir facilmente quais métricas pertencem a qual fluxo na interface do usuário do Spark.

Envie métricas de Streaming estruturado para serviços externos

As métricas de streaming podem ser enviadas por push para serviços externos para alertas ou painéis de casos de uso usando a interface Streaming Query Listener do Apache Spark. No Databricks Runtime 11.3 LTS e superior, StreamingQueryListener está disponível em Python e Scala.

Important

As limitações a seguir se aplicam a cargas de trabalho que usam modos de acesso de computação habilitados para Unity Catalog:

  • StreamingQueryListener requer o Databricks Runtime 15.1 ou superior para usar credenciais ou interagir com objetos gerenciados pelo Unity Catalog na computação com modo de acesso dedicado.
  • StreamingQueryListener requer o Databricks Runtime 16.1 ou superior para cargas de trabalho Scala configuradas com o modo de acesso padrão (anteriormente modo de acesso compartilhado).

Note

A latência de processamento com ouvintes pode afetar significativamente as velocidades de processamento de consultas. É aconselhável limitar a lógica de processamento nesses ouvintes e optar por escrever em sistemas de resposta rápida como Kafka para eficiência.

Se a consulta não tiver dados disponíveis na fonte e estiver aguardando novos dados, uma onQueryIdle mensagem será entregue ao ouvinte de consulta de streaming. Uma onQueryProgress mensagem só é entregue no final do lote de processamento da consulta de streaming. Se a consulta estiver a processar dados por um longo tempo, é possível que nem os eventos onQueryIdle nem os eventos onQueryProgress sejam enviados, mas a consulta ainda está saudável e continua a processar dados.

O código a seguir fornece exemplos básicos da sintaxe para implementar um ouvinte:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definição de métricas observáveis no Streaming Estruturado

As métricas observáveis são chamadas de funções agregadas arbitrárias que podem ser definidas em uma consulta (DataFrame). Assim que a execução de um DataFrame atinge um ponto de conclusão (ou seja, conclui uma consulta em lote ou atinge uma época de streaming), é emitido um evento nomeado que contém as métricas para os dados processados desde o último ponto de conclusão.

Você pode observar essas métricas anexando um ouvinte à sessão do Spark. O ouvinte depende do modo de execução:

  • Modo de lote: Use QueryExecutionListener.

    QueryExecutionListener é chamado quando a consulta é concluída. Acesse as métricas usando o QueryExecution.observedMetrics mapa.

  • Streaming, ou microlote: Use StreamingQueryListener.

    StreamingQueryListener é chamado quando a consulta de streaming completa uma época. Acesse as métricas usando o StreamingQueryProgress.observedMetrics mapa. O Azure Databricks não oferece suporte ao modo de gatilho continuous para streaming.

Por exemplo:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Mapear identificadores de tabela de métricas Unity Catalog, Delta Lake e Structured Streaming

As métricas de Streaming estruturado usam o reservoirId campo em vários locais para a identidade exclusiva de uma tabela Delta usada como fonte para uma consulta de streaming.

O campo reservoirId mapeia o identificador exclusivo armazenado pela tabela Delta no log de transações Delta. Essa ID não é mapeada para o tableId valor atribuído pelo Catálogo Unity e exibido no Gerenciador de Catálogos.

Use a sintaxe a seguir para revisar o identificador de tabela para uma tabela Delta. Isso funciona para tabelas geridas do Unity Catalog, tabelas externas do Unity Catalog e todas as tabelas Delta do metastore Hive.

DESCRIBE DETAIL <table-name>

O campo id exibido nos resultados é o identificador que corresponde ao reservoirId nas métricas de streaming.

Métricas de objeto StreamingQueryListener

Fields Description
id Um ID de consulta exclusivo que persiste nas reinicializações.
runId Um ID de consulta exclusivo para cada início/reinício. Ver StreamingQuery.runId().
name O nome da consulta especificado pelo utilizador. Nome é nulo se nenhum nome for especificado.
timestamp A marca temporal para a execução do microlote.
batchId Um ID exclusivo para o lote atual de dados que está sendo processado. No caso de novas tentativas após uma falha, um determinado ID de lote pode ser executado mais de uma vez. Da mesma forma, quando não há dados a serem processados, o ID do lote não é incrementado.
batchDuration A duração de processamento de uma operação em lote, em milissegundos.
numInputRows O número agregado (em todas as fontes) de registros processados em um gatilho.
inputRowsPerSecond A taxa global (em todas as fontes) de recepção de dados.
processedRowsPerSecond A taxa agregada (em todas as fontes) na qual o Spark está processando dados.

StreamingQueryListener Também define os seguintes campos que contêm objetos que podem ser examinados para métricas do cliente e detalhes do progresso da fonte:

Fields Description
durationMs Tipo: ju.Map[String, JLong]. Consulte durationMs object.
eventTime Tipo: ju.Map[String, String]. Consulte o objeto eventTime.
stateOperators Tipo: Array[StateOperatorProgress]. Consulte o objeto stateOperators.
sources Tipo: Array[SourceProgress]. Consulte o objeto sources.
sink Tipo: SinkProgress. Consulte o objeto do coletor.
observedMetrics Tipo: ju.Map[String, Row]. Funções de agregação arbitrárias nomeadas que podem ser definidas em um DataFrame/query (como df.observe).

durationMs objeto

Tipo de objeto: ju.Map[String, JLong]

Informações sobre o tempo necessário para concluir várias etapas do processo de execução do microlote.

Fields Description
durationMs.addBatch O tempo necessário para executar o microlote. Isso exclui o tempo que o Spark leva para planejar o microlote.
durationMs.getBatch O tempo necessário para recuperar os metadados sobre os deslocamentos da fonte.
durationMs.latestOffset O último offset consumido para o microlote. Este objeto de progresso refere-se ao tempo necessário para recuperar o offset mais recente das fontes.
durationMs.queryPlanning O tempo necessário para gerar o plano de execução.
durationMs.triggerExecution O tempo necessário para planear e executar o microlote.
durationMs.walCommit Tempo necessário para confirmar os novos offsets disponíveis.
durationMs.commitBatch O tempo necessário para confirmar os dados gravados no destino durante addBatch. Presente apenas para pias que suportam commit.
durationMs.commitOffsets O tempo necessário para confirmar o lote no log de confirmação.

objeto eventTime

Tipo de objeto: ju.Map[String, String]

Informações sobre o valor de tempo do evento visto nos dados que estão sendo processados no microlote. Esses dados são usados pela marca temporal para determinar como ajustar o estado para processar agregações com estado preservado definidas na tarefa de Streaming Estruturado.

Fields Description
eventTime.avg O tempo médio de evento visto nesse gatilho.
eventTime.max O tempo máximo de evento visto nesse gatilho.
eventTime.min O tempo mínimo do evento observado nesse gatilho.
eventTime.watermark O valor da marca d'água usada nesse gatilho.

objeto stateOperators

Tipo de objeto: Array[StateOperatorProgress] O stateOperators objeto contém informações sobre as operações com estado definidas no job de Streaming Estruturado e as agregações produzidas a partir delas.

Para obter mais detalhes sobre operadores de estado de fluxo, consulte O que é streaming com monitoração de estado?.

Fields Description
stateOperators.operatorName O nome do operador stateful ao qual as métricas se relacionam, como symmetricHashJoin, dedupeou stateStoreSave.
stateOperators.numRowsTotal O número total de linhas no estado como resultado de um operador ou agregação com monitoração de estado.
stateOperators.numRowsUpdated O número total de linhas atualizadas no estado como resultado de um operador ou agregação orientada a estado.
stateOperators.allUpdatesTimeMs Atualmente, essa métrica não é mensurável pelo Spark e está planejada para ser removida em atualizações futuras.
stateOperators.numRowsRemoved O número total de linhas removidas do estado como resultado de um operador ou agregação com monitoração de estado.
stateOperators.allRemovalsTimeMs Atualmente, essa métrica não é mensurável pelo Spark e está planejada para ser removida em atualizações futuras.
stateOperators.commitTimeMs O tempo necessário para confirmar todas as atualizações (inserções e remoções) e retornar uma nova versão.
stateOperators.memoryUsedBytes Memória usada pelo armazenamento de estado.
stateOperators.numRowsDroppedByWatermark O número de linhas que são consideradas tardias demais para serem incluídas em uma agregação com estado. Agregações de streaming apenas: o número de linhas descartadas pós-agregação (não linhas de entrada brutas). Este número não é preciso, mas fornece uma indicação de que dados em atraso estão a ser descartados.
stateOperators.numShufflePartitions O número de partições aleatórias para este operador stateful.
stateOperators.numStateStoreInstances A instância de armazenamento de estado real que o operador inicializou e manteve. Para muitos operadores com estado, isso é o mesmo que o número de partições. No entanto, as junções de fluxo para fluxo inicializam quatro instâncias de armazenamento de estado por partição.
stateOperators.customMetrics Consulte stateOperators.customMetrics neste tópico para obter mais detalhes.

Objeto de métrica personalizada de StateOperatorProgress

Tipo de objeto: ju.Map[String, JLong]

StateOperatorProgress tem um campo, customMetrics, que contém as métricas específicas para o recurso que você está usando ao coletar essas métricas.

Feature Description
Armazenamento de estado do RocksDB Métricas para armazenamento de estado RocksDB.
Armazenamento de estado HDFS Métricas para armazenamento de estado HDFS.
Deduplicação de fluxos de dados Métricas para desduplicação de linhas.
Agregação de cursos de água Métricas para agregação de linhas.
Operador de junção de fluxo Métricas para operador de junção de fluxos.
transformWithState Métricas para transformWithState operador.

Métricas personalizadas do armazenamento de estado do RocksDB

Informações coletadas do RocksDB capturando métricas sobre seu desempenho e operações em relação aos valores de estado que mantém para o trabalho de Streaming Estruturado. Para obter mais informações, consulte Configurar o armazenamento de estado do RocksDB no Azure Databricks.

Fields Description
customMetrics.rocksdbBytesCopied O número de bytes copiados conforme rastreados pelo Gerenciador de arquivos RocksDB.
customMetrics.rocksdbCommitCheckpointLatency O tempo em milissegundos necessário para tirar um instantâneo do RocksDB nativo e gravá-lo num diretório local.
customMetrics.rocksdbCompactLatency O tempo em milissegundos de compactação (opcional) durante a confirmação do ponto de verificação.
customMetrics.rocksdbCommitCompactLatency O tempo de compactação durante o commit, em milissegundos.
customMetrics.rocksdbCommitFileSyncLatencyMs O tempo, em milissegundos, para sincronizar o snapshot nativo do RocksDB com o armazenamento externo (local do ponto de verificação).
customMetrics.rocksdbCommitFlushLatency O tempo em milissegundos para descarregar as alterações na memória do RocksDB para o disco local.
customMetrics.rocksdbCommitPauseLatency O tempo em milissegundos necessário para a paragem dos processos de trabalho em segundo plano como parte do commit do ponto de verificação, como para compactação.
customMetrics.rocksdbCommitWriteBatchLatency O tempo em milissegundos ao aplicar as gravações por etapas na estrutura de memória (WriteBatch) no RocksDB nativo.
customMetrics.rocksdbFilesCopied O número de ficheiros copiados, de acordo com o registado pelo Gestor de Ficheiros do RocksDB.
customMetrics.rocksdbFilesReused O número de arquivos reutilizados conforme rastreado pelo RocksDB File Manager.
customMetrics.rocksdbGetCount O número de get chamadas (não inclui gets de WriteBatch - lote na memória usado para armazenamento temporário de gravações).
customMetrics.rocksdbGetLatency O tempo médio de uma chamada nativa subjacente RocksDB::Get em nanossegundos.
customMetrics.rocksdbReadBlockCacheHitCount A contagem de acessos ao cache de bloco no RocksDB.
customMetrics.rocksdbReadBlockCacheMissCount A contagem do cache de bloco falha no RocksDB.
customMetrics.rocksdbSstFileSize O tamanho de todos os arquivos SST (Static Sorted Table) na instância do RocksDB.
customMetrics.rocksdbTotalBytesRead O número de bytes não compactados lidos por get operações.
customMetrics.rocksdbTotalBytesWritten O número total de bytes não compactados gravados por put operações.
customMetrics.rocksdbTotalBytesReadThroughIterator O número total de bytes de dados não compactados lidos usando um iterador. Algumas operações com estado (por exemplo, processamento de timeout e marca d'água em FlatMapGroupsWithState) exigem a leitura de dados para o Azure Databricks por meio de um iterador.
customMetrics.rocksdbTotalBytesReadByCompaction O número de bytes que o processo de compactação lê do disco.
customMetrics.rocksdbTotalBytesWrittenByCompaction O número total de bytes que o processo de compactação grava no disco.
customMetrics.rocksdbTotalCompactionLatencyMs O tempo em milissegundos para compactações do RocksDB, incluindo compactações em segundo plano e a compactação opcional iniciada durante a confirmação.
customMetrics.rocksdbTotalFlushLatencyMs O tempo total de descarga, incluindo lavagem de fundo. As operações de descarga são processos pelos quais o MemTable é descarregado para o armazenamento quando está cheio. MemTables são o primeiro nível onde os dados são armazenados no RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed O tamanho em bytes dos arquivos zip não compactados, conforme relatado pelo Gerenciador de arquivos. O Gerenciador de arquivos gerencia a utilização e a exclusão do espaço em disco do arquivo SST físico.
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> A versão mais recente do snapshot RocksDB salvo no local do ponto de verificação. Um valor de "-1" indica que nunca foi salvo nenhum instantâneo. Como os instantâneos são específicos para cada instância de armazenamento de estado, essa métrica se aplica a um ID de partição e nome de armazenamento de estado específicos.
customMetrics.rocksdbPutLatency A latência total da chamada colocada.
customMetrics.rocksdbPutCount O número de chamadas colocadas.
customMetrics.rocksdbWriterStallLatencyMs O gravador aguarda o tempo de compactação ou descarga para terminar.
customMetrics.rocksdbTotalBytesWrittenByFlush O total de bytes gravados por flush
customMetrics.rocksdbPinnedBlocksMemoryUsage O uso de memória para blocos fixos
customMetrics.rocksdbNumInternalColFamiliesKeys O número de chaves internas para famílias de colunas internas
customMetrics.rocksdbNumExternalColumnFamilies O número de famílias de colunas externas
customMetrics.rocksdbNumInternalColumnFamilies O número de famílias de colunas internas

Métricas personalizadas do armazenamento de estado do HDFS

Informações coletadas sobre comportamentos e operações do provedor de armazenamento de estado do HDFS.

Fields Description
customMetrics.stateOnCurrentVersionSizeBytes O tamanho estimado do estado somente na versão atual.
customMetrics.loadedMapCacheHitCount A contagem de cache atingida em estados armazenados em cache no provedor.
customMetrics.loadedMapCacheMissCount A contagem de cache perdida em estados armazenados em cache no provedor.
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> A última versão carregada do snapshot para uma instância de armazenamento de estado específica.

Métricas personalizadas de deduplicação

Informações coletadas sobre comportamentos e operações de desduplicação.

Fields Description
customMetrics.numDroppedDuplicateRows O número de linhas duplicadas foi reduzido.
customMetrics.numRowsReadDuringEviction O número de linhas estaduais lidas durante o despejo estadual.

Métricas personalizadas de agregação

Informações coletadas sobre comportamentos e operações de agregação.

Fields Description
customMetrics.numRowsReadDuringEviction O número de linhas estaduais lidas durante o despejo estadual.

Métricas personalizadas de junção de fluxo

Informações coletadas sobre comportamentos e operações de junção de fluxo.

Fields Description
customMetrics.skippedNullValueCount O número de valores ignorados null , quando spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled é definido como true.

métricas personalizadas transformWithState

Informações coletadas sobre transformWithState comportamentos e operações (TWS). Para obter mais detalhes sobre transformWithState, consulte Criar uma aplicação com estado interno personalizado.

Fields Description
customMetrics.initialStateProcessingTimeMs Número de milissegundos necessários para processar todo o estado inicial.
customMetrics.numValueStateVars Número de variáveis de estado de valor. Também presente para transformWithStateInPandas.
customMetrics.numListStateVars Número de variáveis de estado da lista. Também presente para transformWithStateInPandas.
customMetrics.numMapStateVars Número de variáveis de estado do mapa. Também presente para transformWithStateInPandas.
customMetrics.numDeletedStateVars Número de variáveis de estado excluídas. Também presente para transformWithStateInPandas.
customMetrics.timerProcessingTimeMs Número de milissegundos necessários para processar todos os temporizadores
customMetrics.numRegisteredTimers Número de temporizadores registados. Também presente para transformWithStateInPandas.
customMetrics.numDeletedTimers Número de temporizadores eliminados. Também presente para transformWithStateInPandas.
customMetrics.numExpiredTimers Número de temporizadores expirados. Também presente para transformWithStateInPandas.
customMetrics.numValueStateWithTTLVars Número de variáveis de estado de valor com TTL. Também presente para transformWithStateInPandas.
customMetrics.numListStateWithTTLVars Número de variáveis de estado da lista com TTL. Também presente para transformWithStateInPandas.
customMetrics.numMapStateWithTTLVars Número de variáveis de estado do mapa com TTL. Também presente para transformWithStateInPandas.
customMetrics.numValuesRemovedDueToTTLExpiry Número de valores removidos devido à expiração do TTL. Também presente para transformWithStateInPandas.
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry Número de valores removidos incrementalmente devido à expiração do TTL.

objeto de fontes

Tipo de objeto: Array[SourceProgress]

O sources objeto contém informações e métricas para fontes de dados de streaming.

Fields Description
description Uma descrição detalhada da tabela da fonte de dados de streaming.
startOffset A posição inicial na tabela da fonte de dados onde o trabalho de streaming foi iniciado.
endOffset O último offset processado pelo microlote.
latestOffset O último offset processado pelo microlote.
numInputRows O número de linhas de entrada processadas a partir dessa fonte.
inputRowsPerSecond A taxa a que os dados estão a chegar para processamento desta fonte, em segundos.
processedRowsPerSecond A taxa na qual o Spark está processando dados dessa fonte.
metrics Tipo: ju.Map[String, String]. Contém métricas personalizadas para uma fonte de dados específica.

O Azure Databricks fornece a seguinte implementação de objeto de códigos-fonte:

Note

Para campos definidos no formulário sources.<startOffset / endOffset / latestOffset>.* (ou alguma variação), interprete-o como um dos (até estes) 3 campos possíveis, todos contendo o campo filho indicado:

  • sources.startOffset.<child-field>
  • sources.endOffset.<child-field>
  • sources.latestOffset.<child-field>

Objeto fonte do Delta Lake

Definições para métricas personalizadas usadas para fontes de dados de streaming da tabela Delta.

Fields Description
sources.description A descrição da fonte a partir da qual a consulta de streaming está sendo lida. Por exemplo: “DeltaSource[table]”.
sources.<startOffset / endOffset>.sourceVersion A versão da serialização com a qual esse deslocamento é codificado.
sources.<startOffset / endOffset>.reservoirId O ID da tabela que está a ser lida. Isso é usado para detetar erros de configuração ao reiniciar uma consulta. Consulte Map Unity Catalog, Delta Lake e os identificadores de tabelas de métricas de Streaming Estruturado.
sources.<startOffset / endOffset>.reservoirVersion A versão da tabela que está a ser processada no momento.
sources.<startOffset / endOffset>.index O índice na sequência de AddFiles nesta versão. Isso é usado para dividir grandes commits em vários lotes. Esse índice é criado classificando em modificationTimestamp e path.
sources.<startOffset / endOffset>.isStartingVersion Identifica se o deslocamento atual indica o início de uma nova consulta de transmissão, em vez do processamento de alterações que ocorreram após o processamento inicial dos dados. Ao iniciar uma nova consulta, todos os dados presentes na tabela no início são processados primeiro e, em seguida, todos os novos dados que chegam.
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis Hora do evento registada para ordenação cronológica. A hora do evento dos dados de instantâneo iniciais que estão pendentes para serem processados. Usado ao processar um instantâneo inicial em ordem cronológica dos eventos.
sources.latestOffset O último deslocamento processado pela consulta de microbatch.
sources.numInputRows O número de linhas de entrada processadas a partir dessa fonte.
sources.inputRowsPerSecond O ritmo a que os dados estão a chegar para processamento a partir desta fonte.
sources.processedRowsPerSecond A taxa na qual o Spark está processando dados dessa fonte.
sources.metrics.numBytesOutstanding O tamanho combinado dos arquivos pendentes (arquivos rastreados pelo RocksDB). Esta é a métrica de backlog para Delta e Auto Loader como a fonte de streaming.
sources.metrics.numFilesOutstanding O número de processos pendentes a processar. Esta é a métrica de backlog para Delta e Auto Loader como a fonte de streaming.

Objeto de códigos-fonte Apache Kafka

Definições para métricas personalizadas usadas para fontes de dados de streaming do Apache Kafka.

Fields Description
sources.description Uma descrição detalhada da fonte Kafka, especificando o tópico exato de Kafka a partir do qual se está a ler. Por exemplo: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset O número de deslocamento inicial dentro do tópico Kafka no qual o trabalho de streaming começou.
sources.endOffset O último offset processado pelo microlote. Isto pode corresponder a latestOffset numa execução de microlote em andamento.
sources.latestOffset O último deslocamento calculado pelo microlote. O processo de microlote pode não processar todos os deslocamentos quando há regulação, o que resulta em endOffset e latestOffset serem diferentes.
sources.numInputRows O número de linhas de entrada processadas a partir dessa fonte.
sources.inputRowsPerSecond O ritmo a que os dados estão a chegar para processamento a partir desta fonte.
sources.processedRowsPerSecond A taxa na qual o Spark está processando dados dessa fonte.
sources.metrics.avgOffsetsBehindLatest O número médio de offsets que a consulta de streaming está atrás do último offset disponível entre todos os tópicos subscritos.
sources.metrics.estimatedTotalBytesBehindLatest O número estimado de bytes que o processo de consulta não consumiu dos tópicos inscritos.
sources.metrics.maxOffsetsBehindLatest O número máximo de compensações que a consulta de streaming está por trás do último deslocamento disponível entre todos os tópicos inscritos.
sources.metrics.minOffsetsBehindLatest O número mínimo de offsets que a consulta de streaming está atrás do offset disponível mais recente entre todos os tópicos subscritos.

Métricas de origem do Auto Loader

Definições para métricas personalizadas usadas para fontes de dados de streaming do Auto Loader.

Fields Description
sources.<startOffset / endOffset / latestOffset>.seqNum A posição atual na sequência de arquivos que estão sendo processados na ordem em que os arquivos foram descobertos.
sources.<startOffset / endOffset / latestOffset>.sourceVersion A versão de implementação da fonte cloudFiles.
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs A hora de início da operação de enchimento mais recente.
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs A hora de término da operação de enchimento mais recente.
sources.<startOffset / endOffset / latestOffset>.lastInputPath O último caminho de entrada fornecido pelo usuário do fluxo antes que o fluxo fosse reiniciado.
sources.metrics.numFilesOutstanding O número de arquivos na lista de pendências
sources.metrics.numBytesOutstanding O tamanho (bytes) dos arquivos na lista de pendências
sources.metrics.approximateQueueSize O tamanho aproximado da fila de mensagens. Somente quando a opção cloudFiles.useNotifications estiver ativada.
sources.numInputRows O número de linhas de entrada processadas a partir dessa fonte. Para o binaryFile formato de origem, numInputRows é igual ao número de arquivos.

Métricas de fontes do PubSub

Definições para métricas personalizadas usadas para fontes de dados de streaming PubSub. Para obter mais detalhes sobre o monitoramento de fontes de streaming do PubSub, consulte Monitoramento de métricas de streaming.

Fields Description
sources.<startOffset / endOffset / latestOffset>.sourceVersion A versão de implementação com a qual esse deslocamento é codificado.
sources.<startOffset / endOffset / latestOffset>.seqNum O número de sequência persistente que está a ser processado.
sources.<startOffset / endOffset / latestOffset>.fetchEpoch A maior época de extração a ser processada.
sources.metrics.numRecordsReadyToProcess O número de registros disponíveis para processamento na lista de pendências atual.
sources.metrics.sizeOfRecordsReadyToProcess O tamanho total, em bytes, de dados não processados na lista de pendências atual.
sources.metrics.numDuplicatesSinceStreamStart A contagem total de registros duplicados processados pelo fluxo desde que ele começou.

Métricas de fontes de pulsar

Definições para métricas personalizadas usadas para fontes de dados de streaming do Pulsar.

Fields Description
sources.metrics.numInputRows O número de linhas processadas no microbatch atual.
sources.metrics.numInputBytes O número total de bytes processados no microlote atual.

objeto de escoamento

Tipo de objeto: SinkProgress

Fields Description
sink.description A descrição do sumidouro, detalhando a implementação específica do sumidouro usada.
sink.numOutputRows O número de linhas de saída. Diferentes tipos de coletor podem ter comportamentos ou restrições diferentes para os valores. Veja os tipos específicos suportados
sink.metrics ju.Map[String, String] de métricas de sumidouro.

Atualmente, o Azure Databricks fornece duas implementações de objeto específicas sink :

Tipo de lava-louça Details
Tabela delta Consulte Objeto receptáculo Delta.
Tópico Apache Kafka Veja o objeto da pia de Kafka.

O sink.metrics campo se comporta da mesma forma para ambas as variantes do sink objeto.

Objeto do sumidouro do Lago Delta

Fields Description
sink.description A descrição do sumidouro Delta, detalhando a implementação específica do sumidouro Delta que está sendo usada. Por exemplo: “DeltaSink[table]”.
sink.numOutputRows O número de linhas é sempre -1 porque o Spark não pode inferir linhas de saída para coletores DSv1, que é a classificação para o dissipador Delta Lake.

Objeto de destino Apache Kafka

Fields Description
sink.description A descrição do coletor Kafka no qual a consulta de streaming está escrevendo, detalhando a implementação específica do coletor Kafka que está sendo usada. Por exemplo: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows O número de linhas que foram gravadas na tabela de saída ou no coletor como parte do microlote. Para algumas situações, este valor pode ser "-1" e geralmente pode ser interpretado como "desconhecido".

Examples

Exemplo de evento Kafka-to-Kafka StreamingQueryListener

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1370,
      "SnapshotLastUploaded.partition_1_default" : 1370,
      "SnapshotLastUploaded.partition_2_default" : 1362,
      "SnapshotLastUploaded.partition_3_default" : 1370,
      "SnapshotLastUploaded.partition_4_default" : 1356,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1360,
      "SnapshotLastUploaded.partition_1_default" : 1360,
      "SnapshotLastUploaded.partition_2_default" : 1352,
      "SnapshotLastUploaded.partition_3_default" : 1360,
      "SnapshotLastUploaded.partition_4_default" : 1346,
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
      "SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
      "SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
      "SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
      "SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Exemplo de evento Delta Lake-to-Delta Lake StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Exemplo de evento Kinesis-to-Delta Lake StreamingQueryListener

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Exemplo de evento Kafka+Delta Lake-to-Delta Lake StreamingQueryListener

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Exemplo de fonte de taxa para o evento Delta Lake StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}