Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Você pode habilitar o gerenciamento de estado baseado no RockDB definindo a configuração a seguir na SparkSession antes de iniciar a consulta de streaming.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Você pode habilitar o RocksDB no Lakeflow Spark Declarative Pipelines. Consulte Otimizar a configuração do pipeline para processamento com estado.
Habilitar o ponto de verificação do log de alterações
No Databricks Runtime 13.3 LTS e superior, você pode habilitar o ponto de verificação do log de alterações para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho de Streaming Estruturado. A Databricks recomenda habilitar o ponto de verificação do log de alterações para todas as consultas com estado de Streaming Estruturado.
Tradicionalmente, o RocksDB State Store tira instantâneos e faz upload de arquivos de dados durante o ponto de verificação. Para evitar esse custo, o ponto de verificação do log de alterações grava apenas os registros que foram alterados desde o último ponto de verificação em um armazenamento durável.”
O ponto de verificação do registro de alterações está desabilitado por padrão. É possível utilizar a seguinte sintaxe para habilitar o ponto de verificação do log de alterações no nível do SparkSession:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Você pode habilitar o ponto de verificação do log de alterações em um stream existente e manter as informações de estado armazenadas no ponto de verificação.
Importante
As consultas com ponto de verificação do log de alterações habilitado só podem ser executadas no Databricks Runtime 13.3 LTS e superior. Você pode desabilitar o ponto de verificação do log de alterações para reverter para o comportamento de ponto de verificação herdado, mas deve continuar a executar essas consultas no Databricks Runtime 13.3 LTS ou superior. Você deve reiniciar o trabalho para que essas alterações ocorram.
Métricas de armazenamento de estado RocksDB
Cada operador de estado coleta métricas relacionadas às operações de gerenciamento de estado realizadas na sua instância RocksDB para observar o repositório de estado e, potencialmente, ajudar na depuração de lentidão do trabalho.
No Databricks Runtime 16.4 LTS e posteriores, as métricas de uma instância específica do repositório de estado são rotuladas com a ID de partição e o nome do repositório, garantindo que permaneçam separadas. Todas as outras métricas são relatadas como a soma agregada para cada operador de estado em todas as tarefas em que o operador de estado está em execução.
Essas métricas fazem parte do mapa customMetrics dentro dos campos stateOperators em StreamingQueryProgress. Veja a seguir um exemplo de StreamingQueryProgress no formato JSON (obtido usando StreamingQueryProgress.json()).
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
Descrições detalhadas das métricas são as seguinte:
| Nome da métrica | Descrição |
|---|---|
| rocksdbCommitWriteBatchLatency | Tempo (mili-segundos) necessário para aplicar as gravações em fases na estrutura na memória (WriteBatch) ao RocksDB nativo. |
| rocksdbCommitFlushLatency | Tempo (mili-segundos) necessário para liberar as alterações na memória do RocksDB para o disco local. |
| rocksdbCommitCompactLatency | O tempo (mili-segundos) levou para compactação (opcional) durante a confirmação do ponto de verificação. |
| rocksdbCommitPauseLatency | Tempo (em milissegundos) necessário para interromper os threads de trabalho em segundo plano (para compactação etc.) como parte da confirmação de ponto de verificação. |
| rocksdbCommitCheckpointLatency | Tempo (mili-segundos) levou para tirar um instantâneo do RocksDB nativo e gravar em um diretório local. |
| rocksdbCommitFileSyncLatencyMs | Tempo (mili-segundos) necessário para sincronizar os arquivos nativos relacionados ao instantâneo do RocksDB para um armazenamento externo (local do ponto de verificação). |
| rocksdbGetLatency | O tempo médio (em nanos) levou de acordo com a chamada RocksDB::Get nativa subjacente. |
| rocksdbPutCount | O tempo médio (em nanos) levou de acordo com a chamada RocksDB::Put nativa subjacente. |
| rocksdbGetCount | Número de chamadas nativas RocksDB::Get (não inclui Gets no WriteBatch – lote em memória usado para preparar gravações). |
| rocksdbPutCount | Número de chamadas nativas RocksDB::Put (excluindo chamadas para Puts WriteBatch - lote na memória usado para gravações temporárias). |
| rocksdbTotal de Bytes Lidos por Get | Número de bytes descompactados lidos por chamadas RocksDB::Get nativas. |
| BytesTotaisEscritosPorPutrocksdb | Número de bytes descompactados gravados por chamadas RocksDB::Put nativas. |
| rocksdbReadBlockCacheHitCount | Número de vezes que o cache de blocos do RocksDB nativo é usado para evitar a leitura de dados do disco local. |
| rocksdbContagemDeFalhasNaCacheDeLeituraDeBloco | Número de vezes que o cache de blocos do RocksDB nativo perdeu e exigiu a leitura de dados do disco local. |
| rocksdbTotalBytesReadByCompaction | Número de bytes lidos do disco local pelo processo de compactação do RocksDB nativo. |
| rocksdbTotalDeBytesEscritosPorCompactação | Número de bytes gravados no disco local pelo processo de compactação do RocksDB nativo. |
| rocksdbTotalCompactionLatencyMs | O tempo (mili-segundos) levou para as compactações do RocksDB (tanto em segundo plano quanto para a compactação opcional iniciada durante a confirmação). |
| rocksdbWriterStallLatencyMs | Tempo (mili-segundos) o autor foi paralisado devido a uma compactação em segundo plano ou liberação das memtables para o disco. |
| TotalBytesLidosAtravésDeIteradorRocksdb | Algumas operações com estado (como processamento de tempo limite em flatMapGroupsWithState ou marca d'água em agregações com janela) exigem a leitura de todos os dados no BD por meio do iterador. O tamanho total dos dados descompactados lidos usando o iterador. |