Freigeben über


Konfigurieren des RocksDB-Statusspeichers auf Azure Databricks

Sie können die RocksDB-basierte Zustandsverwaltung aktivieren, indem Sie die folgende Konfiguration in der SparkSession-Instanz festlegen, bevor Sie die Streamingabfrage starten.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Sie können RocksDB auf Lakeflow Spark Declarative Pipelines aktivieren. Siehe Optimieren der Pipelinekonfiguration für die zustandsbehaftete Verarbeitung.

Änderungsprotokoll-Checkpoints aktivieren

In Databricks Runtime 13.3 LTS und höher können Sie Changelog-Prüfpunkte aktivieren, um die Prüfpunktdauer und End-to-End-Latenz für Strukturierte Streaming-Workloads zu verringern. Databricks empfiehlt, Änderungsprotokollprüfpunkte für alle zustandsbehafteten Abfragen von strukturiertem Streaming zu aktivieren.

Normalerweise werden Momentaufnahmen des RocksDB-Zustandsspeichers während des Prüfpunkts erstellt und Datendateien hochgeladen. Um diese Kosten zu vermeiden, schreiben Änderungsprotokollprüfpunkte nur Datensätze, die sich seit dem letzten Prüfpunkt geändert haben, in dauerhaften Speicher."

Änderungsprotokollprüfpunkte sind standardmäßig deaktiviert. Sie können Änderungsprotokollprüfpunkte in der SparkSession-Ebene mit der folgenden Syntax aktivieren:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Sie können die Änderungsprotokollprüfpunkte für einen vorhandenen Stream aktivieren und die im Prüfpunkt gespeicherten Zustandsinformationen verwalten.

Wichtig

Abfragen mit aktivierten Prüfpunkten im Änderungsprotokoll, können nur unter Databricks Runtime 13.3 LTS und höher ausgeführt werden. Sie können Prüfpunkte im Änderungsprotokoll deaktivieren, um das Legacyverhalten von Prüfpunkten wiederherzustellen. Sie müssen diese Abfragen jedoch trotzdem in Databricks Runtime 13.3 LTS oder höher ausführen. Sie müssen den Auftrag neu starten, damit diese Änderungen vorgenommen werden.

Metriken des RocksDB-Zustandsspeichers

Jeder Zustandsoperator erfasst Metriken im Zusammenhang mit den Zustandsverwaltungsvorgängen, die auf seiner RocksDB-Instanz ausgeführt werden, um den Zustandsspeicher zu beobachten und möglicherweise bei der Behebung einer langsamen Auftragsausführung zu helfen.

In Databricks Runtime 16.4 LTS und höher werden die Metriken für eine bestimmte Zustandsspeicherinstanz mit ihrer Partitions-ID und dem Speichernamen bezeichnet, um sicherzustellen, dass sie getrennt bleiben. Alle anderen Metriken werden als Aggregatsumme für jeden Statusoperator über alle Aufgaben hinweg gemeldet, bei denen der Statusoperator aktiv ist.

Diese Metriken sind Teil der customMetrics-Abbildung in den stateOperators-Feldern in StreamingQueryProgress. Nachfolgend sehen Sie ein Beispiel für StreamingQueryProgress im JSON-Format (mit StreamingQueryProgress.json() abgerufen).

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

Ausführliche Beschreibungen der Metriken:

Metrikname BESCHREIBUNG
rocksdbCommitWriteBatchLatenz Zeit (in Millisekunden) zum Anwenden der Stagingschreibvorgänge in der In-Memory-Struktur (WriteBatch) auf die native RocksDB-Instanz.
rocksdbCommitFlush-Verzögerung Zeit (in Millisekunden), die benötigt wird, um die Änderungen im Arbeitsspeicher von RocksDB auf den lokalen Datenträger zu übertragen.
rocksdbCommitCompactLatency Zeit (in Millisekunden) für die Komprimierung (optional) während des Prüfpunktcommits.
rocksdbCommitPauseLatency Zeit (in Millisekunden) zum Beenden der Arbeitsthreads im Hintergrund (für Komprimierung usw.) als Teil des Prüfpunkt-Commits.
RocksDB Commit-Checkpoint-Latenz Zeit (in Millisekunden) für das Erstellen einer Momentaufnahme der nativen RocksDB-Instanz und das Schreiben in ein lokales Verzeichnis.
rocksdbÜbertragungDateisynchronisationLatenzMs Zeit (in Millisekunden) für die Synchronisierung der Dateien für die Momentaufnahme der nativen RocksDB-Instanz mit einem externen Speicher (Prüfpunktspeicherort).
rocksdbGetLatency Durchschnittliche Zeit (in Nanosekunden) pro zugrunde liegendem nativen RocksDB::Get-Aufruf.
rocksdbPutCount Durchschnittliche Zeit (in Nanosekunden) pro zugrunde liegendem nativen RocksDB::Put-Aufruf.
rocksdbGetCount Anzahl der nativen RocksDB::Get Aufrufe (schließt Gets aus WriteBatch - im Speicher genutztem Batch für das Staging von Schreibvorgängen nicht ein).
rocksdbPutCount Anzahl der systemeigenen RocksDB::Put-Anrufe (schließt Puts zu "WriteBatch" nicht ein – eine Speicherbatch, die für Staging-Schreibvorgänge verwendet wird).
rocksdbGesamtanzahlDerGelesenenBytesDurchGet Anzahl unkomprimierter Bytes, die durch native RocksDB::Get-Aufrufe gelesen wurden.
rocksdbGesamteBytesGeschriebenDurchPut Anzahl unkomprimierter Bytes, die durch native RocksDB::Put-Aufrufe geschrieben wurden.
rocksdbLeseBlockCacheTrefferAnzahl Anzahl, wie oft der native RocksDB-Blockcache verwendet wird, um das Lesen von Daten vom lokalen Datenträger zu vermeiden.
rocksdbLeseBlockCacheFehlzähler Anzahl, wie oft der native RocksDB-Blockcache nicht funktioniert hat und Daten vom lokalen Datenträger gelesen werden mussten.
GesamtanzahlBytesGelesenDurchKompaktierungRocksDB Anzahl der Bytes, die durch den nativen RocksDB-Komprimierungsprozess vom lokalen Datenträger gelesen wurden.
rocksdbTotalBytesWrittenByCompaction Anzahl der Bytes, die durch den nativen RocksDB-Komprimierungsprozess auf den lokalen Datenträger geschrieben wurden.
rocksdbGesamtkompaktierungslatenzMs Zeit (in Millisekunden) für RocksDB-Komprimierungen (sowohl im Hintergrund als auch optionale Komprimierung, die während des Commits initiiert wurde).
rocksdbWriterStallLatencyMs Zeit (in Millisekunden), über die der Writer aufgrund einer Komprimierung im Hintergrund oder zum Leeren der Memtables auf den Datenträger angehalten hat.
rocksdbGesamtanzahlBytesGelesenDurchIterator Einige zustandsbehaftete Vorgänge (z. B. Timeoutverarbeitung in flatMapGroupsWithState oder Wasserzeichen für Aggregationen im Fenstermodus) erfordern das Lesen der gesamten Daten in der Datenbank über Iterator. Die Gesamtgröße der mit dem Iterator gelesenen unkomprimierten Daten.