Compartilhar via


Configurar o repositório de estado do RocksDB no Azure Databricks

Você pode habilitar o gerenciamento de estado baseado no RockDB definindo a configuração a seguir na SparkSession antes de iniciar a consulta de streaming.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Você pode habilitar o RocksDB no Lakeflow Spark Declarative Pipelines. Consulte Otimizar a configuração do pipeline para processamento com estado.

Habilitar o ponto de verificação do log de alterações

No Databricks Runtime 13.3 LTS e superior, você pode habilitar o ponto de verificação do log de alterações para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho de Streaming Estruturado. A Databricks recomenda habilitar o ponto de verificação do log de alterações para todas as consultas com estado de Streaming Estruturado.

Tradicionalmente, o RocksDB State Store tira instantâneos e faz upload de arquivos de dados durante o ponto de verificação. Para evitar esse custo, o ponto de verificação do log de alterações grava apenas os registros que foram alterados desde o último ponto de verificação em um armazenamento durável.”

O ponto de verificação do registro de alterações está desabilitado por padrão. É possível utilizar a seguinte sintaxe para habilitar o ponto de verificação do log de alterações no nível do SparkSession:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Você pode habilitar o ponto de verificação do log de alterações em um stream existente e manter as informações de estado armazenadas no ponto de verificação.

Importante

As consultas com ponto de verificação do log de alterações habilitado só podem ser executadas no Databricks Runtime 13.3 LTS e superior. Você pode desabilitar o ponto de verificação do log de alterações para reverter para o comportamento de ponto de verificação herdado, mas deve continuar a executar essas consultas no Databricks Runtime 13.3 LTS ou superior. Você deve reiniciar o trabalho para que essas alterações ocorram.

Métricas de armazenamento de estado RocksDB

Cada operador de estado coleta métricas relacionadas às operações de gerenciamento de estado realizadas na sua instância RocksDB para observar o repositório de estado e, potencialmente, ajudar na depuração de lentidão do trabalho.

No Databricks Runtime 16.4 LTS e posteriores, as métricas de uma instância específica do repositório de estado são rotuladas com a ID de partição e o nome do repositório, garantindo que permaneçam separadas. Todas as outras métricas são relatadas como a soma agregada para cada operador de estado em todas as tarefas em que o operador de estado está em execução.

Essas métricas fazem parte do mapa customMetrics dentro dos campos stateOperators em StreamingQueryProgress. Veja a seguir um exemplo de StreamingQueryProgress no formato JSON (obtido usando StreamingQueryProgress.json()).

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

Descrições detalhadas das métricas são as seguinte:

Nome da métrica Descrição
rocksdbCommitWriteBatchLatency Tempo (mili-segundos) necessário para aplicar as gravações em fases na estrutura na memória (WriteBatch) ao RocksDB nativo.
rocksdbCommitFlushLatency Tempo (mili-segundos) necessário para liberar as alterações na memória do RocksDB para o disco local.
rocksdbCommitCompactLatency O tempo (mili-segundos) levou para compactação (opcional) durante a confirmação do ponto de verificação.
rocksdbCommitPauseLatency Tempo (em milissegundos) necessário para interromper os threads de trabalho em segundo plano (para compactação etc.) como parte da confirmação de ponto de verificação.
rocksdbCommitCheckpointLatency Tempo (mili-segundos) levou para tirar um instantâneo do RocksDB nativo e gravar em um diretório local.
rocksdbCommitFileSyncLatencyMs Tempo (mili-segundos) necessário para sincronizar os arquivos nativos relacionados ao instantâneo do RocksDB para um armazenamento externo (local do ponto de verificação).
rocksdbGetLatency O tempo médio (em nanos) levou de acordo com a chamada RocksDB::Get nativa subjacente.
rocksdbPutCount O tempo médio (em nanos) levou de acordo com a chamada RocksDB::Put nativa subjacente.
rocksdbGetCount Número de chamadas nativas RocksDB::Get (não inclui Gets no WriteBatch – lote em memória usado para preparar gravações).
rocksdbPutCount Número de chamadas nativas RocksDB::Put (excluindo chamadas para Puts WriteBatch - lote na memória usado para gravações temporárias).
rocksdbTotal de Bytes Lidos por Get Número de bytes descompactados lidos por chamadas RocksDB::Get nativas.
BytesTotaisEscritosPorPutrocksdb Número de bytes descompactados gravados por chamadas RocksDB::Put nativas.
rocksdbReadBlockCacheHitCount Número de vezes que o cache de blocos do RocksDB nativo é usado para evitar a leitura de dados do disco local.
rocksdbContagemDeFalhasNaCacheDeLeituraDeBloco Número de vezes que o cache de blocos do RocksDB nativo perdeu e exigiu a leitura de dados do disco local.
rocksdbTotalBytesReadByCompaction Número de bytes lidos do disco local pelo processo de compactação do RocksDB nativo.
rocksdbTotalDeBytesEscritosPorCompactação Número de bytes gravados no disco local pelo processo de compactação do RocksDB nativo.
rocksdbTotalCompactionLatencyMs O tempo (mili-segundos) levou para as compactações do RocksDB (tanto em segundo plano quanto para a compactação opcional iniciada durante a confirmação).
rocksdbWriterStallLatencyMs Tempo (mili-segundos) o autor foi paralisado devido a uma compactação em segundo plano ou liberação das memtables para o disco.
TotalBytesLidosAtravésDeIteradorRocksdb Algumas operações com estado (como processamento de tempo limite em flatMapGroupsWithState ou marca d'água em agregações com janela) exigem a leitura de todos os dados no BD por meio do iterador. O tamanho total dos dados descompactados lidos usando o iterador.