Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Zarządzanie stanem opartym na bazie danych RocksDB można włączyć, ustawiając następującą konfigurację w usłudze SparkSession przed rozpoczęciem zapytania przesyłania strumieniowego.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Można włączyć RocksDB w Lakeflow Spark Declarative Pipelines. Zobacz Optymalizowanie konfiguracji potoku na potrzeby przetwarzania stanowego.
Włącz tworzenie punktów kontrolnych dziennika zmian
W środowisku Databricks Runtime 13.3 LTS i nowszym można włączyć punkt kontrolny dziennika zmian w celu obniżenia czasu trwania punktu kontrolnego oraz ogólnego opóźnienia obciążeń związanych ze strumieniowym przesyłaniem danych. Databricks zaleca włączenie punktów kontrolnych do logowania zmian dla wszystkich stanowych zapytań w usłudze Structured Streaming.
Tradycyjnie RocksDB State Store wykonuje migawki i wysyła pliki danych podczas tworzenia punktów kontrolnych. Aby uniknąć tego kosztu, punkty kontrolne w dzienniku zmian zapisują do trwałego magazynu tylko te rekordy, które uległy zmianie od ostatniego punktu kontrolnego.
Punkty kontrolne dziennika zmian są domyślnie wyłączone. Punkty kontrolne dziennika zmian można włączyć na poziomie SparkSession przy użyciu następującej składni:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Możesz włączyć punkty kontrolne dziennika zmian w istniejącym strumieniu i utrzymywać informacje o stanie przechowywane w punkcie kontrolnym.
Ważne
Zapytania, które mają włączone tworzenie punktów kontrolnych dziennika zmian, można uruchamiać tylko w środowisku Databricks Runtime 13.3 LTS lub nowszym. Możesz wyłączyć zapisywanie stanu dziennika zmian, aby przywrócić wcześniejszy sposób działania punktów kontrolnych, ale nadal musisz uruchamiać te zapytania w środowisku Databricks Runtime 13.3 LTS lub nowszym. Aby te zmiany zostały wprowadzone, należy ponownie uruchomić zadanie.
Metryki przechowalni stanów RocksDB
Każdy operator stanu zbiera metryki związane z operacjami zarządzania stanami wykonywanymi w wystąpieniu bazy danych RocksDB, aby obserwować magazyn stanów i potencjalnie pomóc w debugowaniu spowolnienia zadań.
W Databricks Runtime od wersji 16.4 LTS wzwyż metryki dla określonego instancji magazynu stanów są oznaczone identyfikatorem partycji oraz nazwą magazynu, aby zapewnić ich oddzielność. Wszystkie inne metryki są zgłaszane jako suma agregacji dla każdego operatora stanu we wszystkich zadaniach, w których jest uruchomiony operator stanu.
Te metryki są częścią mapy w polach customMetrics, stateOperators, StreamingQueryProgress. Poniżej przedstawiono przykład StreamingQueryProgress w formacie JSON (uzyskany przy użyciu StreamingQueryProgress.json()).
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
Szczegółowe opisy metryk są następujące:
| Nazwa metryki | opis |
|---|---|
| OpóźnienieZatwierdzaniaWriteBatchRocksdb | Czas (w milisekundach), jaki zajął proces zastosowania etapowych zapisów w strukturze pamięci (WriteBatch) do natywnej bazy danych RocksDB. |
| latencja zatwierdzenia i zapisu RocksDB | Czas (w milisekundach) potrzebny na opróżnienie zmian w pamięci RocksDB na lokalny dysk. |
| rocksdbCommitCompactLatency | Czas (w milisekundach) trwający na kompresję podczas zatwierdzania punktu kontrolnego (opcjonalnie). |
| rocksdbCommitPauseLatency | Czas (w milisekundach), który zajęło zatrzymanie wątków roboczych działających w tle, na przykład podczas kompaktowania itp., jako część zatwierdzenia punktu kontrolnego. |
| Opóźnienie punktu kontrolnego zatwierdzenia w RocksDB | Czas (w ms) potrzebny na utworzenie migawki natywnej bazy danych RocksDB i zapisanie jej w katalogu lokalnym. |
| rocksdbCommitFileSyncLatencyMs | Czas (w milisekundach) potrzebny na synchronizację plików związanych z natywną migawką bazy danych RocksDB do zewnętrznego przechowywania (lokalizacja punktu kontrolnego). |
| rocksdbGetLatency | Średni czas (w nanosekundach) potrzebny na wykonanie bazowego wywołania natywnego RocksDB::Get. |
| rocksdbPutCount | Średni czas (w nanosekundach) potrzebny na wykonanie bazowego wywołania natywnego RocksDB::Put. |
| rocksdbGetCount | Liczba wywołań natywnych RocksDB::Get (nie obejmuje Gets z WriteBatch - partii w pamięci używanej do tymczasowego przechowywania zapisów). |
| rocksdbPutCount | Liczba natywnych wywołań RocksDB::Put (nie obejmuje wywołań Puts do WriteBatch - partii w pamięci używanej do przejściowego zapisywania). |
| rocksdb Łączna Liczba Bajtów Przeczytanych Przez Get | Liczba nieskompresowanych bajtów odczytanych za pośrednictwem wywołań natywnych RocksDB::Get . |
| rocksdbCałkowitaLiczbaBajtówNapisanychPrzezPut | Liczba nieskompresowanych bajtów napisanych za pośrednictwem wywołań natywnych RocksDB::Put . |
| rocksdbReadBlockCacheHitCount (liczba trafień w pamięci podręcznej bloków odczytu rocksdb) | Liczba przypadków użycia natywnej pamięci podręcznej blokowej Bazy danych RocksDB w celu uniknięcia odczytywania danych z dysku lokalnego. |
| rocksdbReadBlockCacheMissCount (liczba nieudanych odczytów z bufora bloków w RocksDB) | Liczba przypadków, gdy natywna pamięć podręczna bloków RocksDB nie znalazła danych i konieczne było odczytanie ich z lokalnego dysku. |
| rocksdbCałkowitaLiczbaBajtówPrzeczytywanychPodczasKompresji | Liczba bajtów odczytanych z dysku lokalnego przez natywny proces kompaktowania bazy danych RocksDB. |
| rocksdb-CałkowitaLiczbaBajtówZapisanychPrzezKompakcję | Liczba bajtów zapisanych na dysku lokalnym przez natywny proces kompaktowania bazy danych RocksDB. |
| rocksdbTotalCompactionLatencyMs | Czas (w milisekundach) potrzebny na kompaktowanie bazy danych RocksDB (zarówno w tle, jak i opcjonalne kompaktowanie zainicjowane podczas zatwierdzenia). |
| rocksdbWriterStallLatencyMs (latencja zatrzymania zapisu w rocksdb w ms) | Czas (w milisekundach), w którym proces zapisu został zatrzymany z powodu kompaktowania w tle lub opróżniania tablic pamięci do dysku. |
| rocksdbCałkowitaLiczbaPrzeczytanychBajtówPrzezIterator | Niektóre operacje z zachowaniem stanu (takie jak zarządzanie czasem w flatMapGroupsWithState lub watermarkowanie w agregacjach okiennych) wymagają odczytywania całych danych w bazie danych przez iterator. Całkowity rozmiar nieskompresowanych danych odczytywanych przy użyciu iteratora. |