Partilhar via


Configurar o armazenamento de estado do RocksDB no Azure Databricks

Você pode habilitar o gerenciamento de estado baseado em RocksDB definindo a seguinte configuração no SparkSession antes de iniciar a consulta de streaming.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Você pode habilitar o RocksDB em Lakeflow Spark Declarative Pipelines. Consulte Otimizar a configuração do pipeline para processamento com monitoração de estado.

Ativar ponto de verificação do changelog

No Databricks Runtime 13.3 LTS e superior, você pode habilitar o ponto de verificação do changelog para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho do Structured Streaming. A Databricks recomenda ativar o ponto de verificação para o registo de alterações em todas as consultas de Transmissão Estruturada com monitorização de estado.

Tradicionalmente, o RocksDB State Store tira snapshots e envia arquivos de dados durante o checkpoint. Para evitar esse custo, o ponto de verificação do changelog grava apenas os registros que foram alterados desde o último ponto de verificação para armazenamento durável."

O ponto de verificação do changelog está desativado por padrão. Você pode habilitar o ponto de verificação do changelog no nível SparkSession usando a seguinte sintaxe:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Você pode habilitar o ponto de verificação do changelog em um fluxo existente e manter as informações de estado armazenadas no ponto de verificação.

Importante

As consultas que ativaram o ponto de verificação do changelog só podem ser executadas no Databricks Runtime 13.3 LTS e em versões posteriores. Você pode desabilitar o ponto de verificação do changelog para reverter para o comportamento de ponto de verificação herdado, mas deve continuar a executar essas consultas no Databricks Runtime 13.3 LTS ou superior. Você deve reiniciar o trabalho para que essas alterações ocorram.

Métricas do armazenamento de estado do RocksDB

Cada operador de estado recolhe métricas relacionadas com as operações de gestão de estado realizadas na sua instância de RocksDB para observar o armazenamento de estado e, eventualmente, ajudar a diagnosticar a lentidão da tarefa.

No Databricks Runtime 16.4 LTS e superior, as métricas para uma instância de armazenamento de estado específica são rotuladas com seu ID de partição e nome de armazenamento, garantindo que permaneçam separadas. Todas as outras métricas são reportadas como a soma agregada de cada operador de estado em todas as tarefas nas quais está operando.

Essas métricas são parte do mapa customMetrics dentro dos campos stateOperators em StreamingQueryProgress. A seguir está um exemplo de StreamingQueryProgress no formulário JSON (obtido usando StreamingQueryProgress.json()).

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

As descrições detalhadas das métricas são as seguintes:

Nome da métrica Descrição
LatênciaDeCommitDoBatchDeEscritaDoRocksDB Tempo (em milissegundos) necessário para aplicar as gravações em etapas na estrutura em memória (WriteBatch) ao RocksDB nativo.
rocksdbCommitFlushLatency Tempo (em milis) necessário para liberar as alterações na memória do RocksDB para o disco local.
rocksdbCommitCompactLatency Tempo (em milissegundos) necessário para compactação durante o commit do checkpoint (opcional).
rocksdbCommitPauseLatency O tempo (em milissegundos) necessário para parar os threads de trabalho em segundo plano (para compactação, etc.) como parte do compromisso do ponto de verificação.
rocksdbCommitCheckpointLatência Tempo necessário (em milissegundos) para capturar um instantâneo do RocksDB nativo e gravá-lo no diretório local.
rocksdbCommitFileSyncLatencyMs Tempo (em milis) necessário para sincronizar os arquivos nativos relacionados ao snapshot do RocksDB com um armazenamento externo (local do ponto de verificação).
rocksdbGetLatency Tempo médio por chamada nativa RocksDB::Get subjacente (em nanossegundos).
rocksdbPutCount Tempo médio por chamada nativa RocksDB::Put subjacente (em nanossegundos).
rocksdbGetCount Número de chamadas nativas RocksDB::Get (não inclui Gets de WriteBatch - lote temporário de memória usado para preparar gravações).
rocksdbPutCount Número de chamadas nativas RocksDB::Put (não inclui Puts WriteBatch - no lote de memória usado para gravações de preparação).
rocksdbTotalBytesReadByGet (Total de bytes lidos por get no RocksDB) Número de bytes não compactados lidos através de chamadas nativas RocksDB::Get.
BytesTotaisEscritosPorPutNoRocksDB Número de bytes não compactados escritos através de chamadas nativas RocksDB::Put.
rocksdbReadBlockCacheHitCount Número de vezes que o cache de bloco nativo do RocksDB é usado para evitar a leitura de dados do disco local.
ContagemDeFaltasDaCacheDeLeituraRocksdb Número de vezes que o cache de bloco nativo do RocksDB falhou e requereu a leitura de dados do disco local.
BytesTotaisLidosPorCompactaçãoRocksDB Número de bytes lidos do disco local pelo processo de compactação nativo do RocksDB.
bytesTotaisEscritosPorCompactaçãoDoRocksdb Número de bytes gravados no disco local pelo processo de compactação nativo do RocksDB.
rocksdbTotalCompactionLatencyMs Tempo (em milissegundos) necessário para as compactações do RocksDB, tanto as executadas em segundo plano quanto as opcionais iniciadas durante a confirmação.
rocksdbWriterStallLatencyMs O tempo (em milissegundos) que o gravador esteve parado devido a uma compactação em segundo plano ou ao esvaziamento dos memtables para o disco.
bytesTotaisLidosAtravésDoIteradorRocksDB Algumas das operações com estado (como o processamento de tempo limite em flatMapGroupsWithState ou a marcação d'água em agregações em janela) exigem a leitura de todos os dados no banco de dados através de um iterador. O tamanho total dos dados não compactados lidos usando o iterador.