Udostępnij przez


Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks

Zarządzanie stanem opartym na bazie danych RocksDB można włączyć, ustawiając następującą konfigurację w usłudze SparkSession przed rozpoczęciem zapytania przesyłania strumieniowego.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Można włączyć RocksDB w Lakeflow Spark Declarative Pipelines. Zobacz Optymalizowanie konfiguracji potoku na potrzeby przetwarzania stanowego.

Włącz tworzenie punktów kontrolnych dziennika zmian

W środowisku Databricks Runtime 13.3 LTS i nowszym można włączyć punkt kontrolny dziennika zmian w celu obniżenia czasu trwania punktu kontrolnego oraz ogólnego opóźnienia obciążeń związanych ze strumieniowym przesyłaniem danych. Databricks zaleca włączenie punktów kontrolnych do logowania zmian dla wszystkich stanowych zapytań w usłudze Structured Streaming.

Tradycyjnie RocksDB State Store wykonuje migawki i wysyła pliki danych podczas tworzenia punktów kontrolnych. Aby uniknąć tego kosztu, punkty kontrolne w dzienniku zmian zapisują do trwałego magazynu tylko te rekordy, które uległy zmianie od ostatniego punktu kontrolnego.

Punkty kontrolne dziennika zmian są domyślnie wyłączone. Punkty kontrolne dziennika zmian można włączyć na poziomie SparkSession przy użyciu następującej składni:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Możesz włączyć punkty kontrolne dziennika zmian w istniejącym strumieniu i utrzymywać informacje o stanie przechowywane w punkcie kontrolnym.

Ważne

Zapytania, które mają włączone tworzenie punktów kontrolnych dziennika zmian, można uruchamiać tylko w środowisku Databricks Runtime 13.3 LTS lub nowszym. Możesz wyłączyć zapisywanie stanu dziennika zmian, aby przywrócić wcześniejszy sposób działania punktów kontrolnych, ale nadal musisz uruchamiać te zapytania w środowisku Databricks Runtime 13.3 LTS lub nowszym. Aby te zmiany zostały wprowadzone, należy ponownie uruchomić zadanie.

Metryki przechowalni stanów RocksDB

Każdy operator stanu zbiera metryki związane z operacjami zarządzania stanami wykonywanymi w wystąpieniu bazy danych RocksDB, aby obserwować magazyn stanów i potencjalnie pomóc w debugowaniu spowolnienia zadań.

W Databricks Runtime od wersji 16.4 LTS wzwyż metryki dla określonego instancji magazynu stanów są oznaczone identyfikatorem partycji oraz nazwą magazynu, aby zapewnić ich oddzielność. Wszystkie inne metryki są zgłaszane jako suma agregacji dla każdego operatora stanu we wszystkich zadaniach, w których jest uruchomiony operator stanu.

Te metryki są częścią mapy w polach customMetrics, stateOperators, StreamingQueryProgress. Poniżej przedstawiono przykład StreamingQueryProgress w formacie JSON (uzyskany przy użyciu StreamingQueryProgress.json()).

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

Szczegółowe opisy metryk są następujące:

Nazwa metryki opis
OpóźnienieZatwierdzaniaWriteBatchRocksdb Czas (w milisekundach), jaki zajął proces zastosowania etapowych zapisów w strukturze pamięci (WriteBatch) do natywnej bazy danych RocksDB.
latencja zatwierdzenia i zapisu RocksDB Czas (w milisekundach) potrzebny na opróżnienie zmian w pamięci RocksDB na lokalny dysk.
rocksdbCommitCompactLatency Czas (w milisekundach) trwający na kompresję podczas zatwierdzania punktu kontrolnego (opcjonalnie).
rocksdbCommitPauseLatency Czas (w milisekundach), który zajęło zatrzymanie wątków roboczych działających w tle, na przykład podczas kompaktowania itp., jako część zatwierdzenia punktu kontrolnego.
Opóźnienie punktu kontrolnego zatwierdzenia w RocksDB Czas (w ms) potrzebny na utworzenie migawki natywnej bazy danych RocksDB i zapisanie jej w katalogu lokalnym.
rocksdbCommitFileSyncLatencyMs Czas (w milisekundach) potrzebny na synchronizację plików związanych z natywną migawką bazy danych RocksDB do zewnętrznego przechowywania (lokalizacja punktu kontrolnego).
rocksdbGetLatency Średni czas (w nanosekundach) potrzebny na wykonanie bazowego wywołania natywnego RocksDB::Get.
rocksdbPutCount Średni czas (w nanosekundach) potrzebny na wykonanie bazowego wywołania natywnego RocksDB::Put.
rocksdbGetCount Liczba wywołań natywnych RocksDB::Get (nie obejmuje Gets z WriteBatch - partii w pamięci używanej do tymczasowego przechowywania zapisów).
rocksdbPutCount Liczba natywnych wywołań RocksDB::Put (nie obejmuje wywołań Puts do WriteBatch - partii w pamięci używanej do przejściowego zapisywania).
rocksdb Łączna Liczba Bajtów Przeczytanych Przez Get Liczba nieskompresowanych bajtów odczytanych za pośrednictwem wywołań natywnych RocksDB::Get .
rocksdbCałkowitaLiczbaBajtówNapisanychPrzezPut Liczba nieskompresowanych bajtów napisanych za pośrednictwem wywołań natywnych RocksDB::Put .
rocksdbReadBlockCacheHitCount (liczba trafień w pamięci podręcznej bloków odczytu rocksdb) Liczba przypadków użycia natywnej pamięci podręcznej blokowej Bazy danych RocksDB w celu uniknięcia odczytywania danych z dysku lokalnego.
rocksdbReadBlockCacheMissCount (liczba nieudanych odczytów z bufora bloków w RocksDB) Liczba przypadków, gdy natywna pamięć podręczna bloków RocksDB nie znalazła danych i konieczne było odczytanie ich z lokalnego dysku.
rocksdbCałkowitaLiczbaBajtówPrzeczytywanychPodczasKompresji Liczba bajtów odczytanych z dysku lokalnego przez natywny proces kompaktowania bazy danych RocksDB.
rocksdb-CałkowitaLiczbaBajtówZapisanychPrzezKompakcję Liczba bajtów zapisanych na dysku lokalnym przez natywny proces kompaktowania bazy danych RocksDB.
rocksdbTotalCompactionLatencyMs Czas (w milisekundach) potrzebny na kompaktowanie bazy danych RocksDB (zarówno w tle, jak i opcjonalne kompaktowanie zainicjowane podczas zatwierdzenia).
rocksdbWriterStallLatencyMs (latencja zatrzymania zapisu w rocksdb w ms) Czas (w milisekundach), w którym proces zapisu został zatrzymany z powodu kompaktowania w tle lub opróżniania tablic pamięci do dysku.
rocksdbCałkowitaLiczbaPrzeczytanychBajtówPrzezIterator Niektóre operacje z zachowaniem stanu (takie jak zarządzanie czasem w flatMapGroupsWithState lub watermarkowanie w agregacjach okiennych) wymagają odczytywania całych danych w bazie danych przez iterator. Całkowity rozmiar nieskompresowanych danych odczytywanych przy użyciu iteratora.