Freigeben über


Lesen von strukturierten Streamingzustandsinformationen

Sie können DataFrame-Vorgänge oder SQL-Tabellenwertfunktionen verwenden, um Strukturierte Streaming-Statusdaten und Metadaten abzufragen. Verwenden Sie diese Funktionen, um Zustandsinformationen für strukturierte Streaming-Zustandsabfragen zu beobachten, die für die Überwachung und das Debuggen nützlich sein können.

Sie benötigen Lesezugriff auf den Prüfpunktpfad für eine Streamingabfrage, um Zustandsdaten oder Metadaten abzufragen. Die in diesem Artikel beschriebenen Funktionen bieten schreibgeschützten Zugriff auf Zustandsdaten und Metadaten. Sie können nur die Batchlesesemantik zum Abfragen von Zustandsinformationen verwenden.

Hinweis

Sie können keine Statusinformationen für Lakeflow Spark Declarative Pipelines, Streamingtabellen oder materialisierte Ansichten abfragen. Sie können keine Statusinformationen mit serverlosem Computing oder Computing abfragen, das im Standardzugriffsmodus konfiguriert ist.

Anforderungen

  • Verwenden Sie eine der folgenden Berechnungskonfigurationen:
    • Databricks Runtime 16.3 und höher auf einem Rechner, der mit dem Standardzugriffsmodus konfiguriert ist.
    • Databricks Runtime 14.3 LTS und höher auf Compute-Ressourcen mit dediziertem oder keinem Isolationszugriffsmodus.
  • Lesezugriff auf den Prüfpunktpfad, der von der Streamingabfrage verwendet wird.

Lesen des Zustandsspeichers für strukturiertes Streaming

Sie können Zustandsspeicherinformationen für strukturierte Streaming-Abfragen lesen, die in unterstützten Databricks Runtime ausgeführt werden. Verwenden Sie die folgende Syntax:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Parameter der Statusleser-API

Die Statusleser-API unterstützt die folgenden optionalen Konfigurationen:

Auswahlmöglichkeit Typ Standardwert Beschreibung
batchId Lang aktuellste Batch ID Stellt den Zielbatch dar, aus dem gelesen werden soll. Geben Sie diese Option an, um Statusinformationen für einen früheren Status der Abfrage abzufragen. Der Batch muss committet, aber noch nicht bereinigt werden.
operatorId Lang 0 Stellt den Zieloperator dar, aus dem gelesen werden soll. Diese Option wird verwendet, wenn die Abfrage mehrere zustandsbehaftete Operatoren verwendet.
storeName Schnur „STANDARD“ Stellt den Namen des Ziel-Zustandsspeichers dar, aus dem gelesen werden soll. Diese Option wird verwendet, wenn der zustandsbehaftete Operator mehrere Zustandsspeicherinstanzen verwendet. Für einen Stream-Steam-Join muss entweder storeName oder joinSide angegeben werden, aber nicht beide.
joinSide String („links“ oder „rechts“) Stellt die Zielseite dar, aus der gelesen werden soll. Diese Option wird verwendet, wenn Benutzer den Zustand aus einer Stream-Stream-Verknüpfung lesen möchten.
stateVarName Schnur Nichts Der Statusvariablenname, der als Teil dieser Abfrage gelesen werden soll. Der Name der Statusvariablen ist der eindeutige Name, der jeder Variablen innerhalb der init-Funktion eines StatefulProcessor zugewiesen wird, die vom transformWithState-Operator verwendet wird. Diese Option ist eine erforderliche Option, wenn der transformWithState Operator verwendet wird. Diese Option gilt nur für den transformWithState Operator und wird für andere Operatoren ignoriert. Verfügbar in Databricks Runtime 16.2 und höher.
readRegisteredTimers Boolescher Typ (Boolean) Falsch Stellen Sie true ein, um registrierte Zeitgeber zu lesen, die innerhalb des transformWithState Operators verwendet werden. Diese Option gilt nur für den transformWithState Operator und wird für andere Operatoren ignoriert. Verfügbar in Databricks Runtime 16.2 und höher.
flattenCollectionTypes Boolescher Typ (Boolean) Wahr Wenn truedie Datensätze, die für Karten- und Listenzustandsvariablen zurückgegeben werden, abgeflächt werden. Wenn false, werden die Datensätze mithilfe von Spark SQL Array oder Map zurückgegeben. Diese Option gilt nur für den transformWithState Operator und wird für andere Operatoren ignoriert. Verfügbar in Databricks Runtime 16.2 und höher.

Die zurückgegebenen Daten haben das folgende Schema:

Spalte Typ Beschreibung
key Struct (weiterer Typ, der vom Zustandsschlüssel abgeleitet wurde) Der Schlüssel für einen zustandsbehafteten Operatordatensatz im Zustandsprüfpunkt.
value Struct (weiterer Typ, der vom Zustandswert abgeleitet wurde) Der Wert für einen zustandsbehafteten Operatordatensatz im Zustandsprüfpunkt.
partition_id Ganze Zahl Die Partition des Zustandsprüfpunkts, die den Datensatz des zustandsbehafteten Operatoren enthält.

Siehe read_statestore Tabellenwertfunktion.

Lesen von strukturierten Streamingzustandsmetadaten

Wichtig

Sie müssen Streamingabfragen auf Databricks Runtime 14.2 oder höher ausführen, um Zustandsmetadaten aufzuzeichnen. Zustandsmetadatendateien unterbrechen die Abwärtskompatibilität nicht. Wenn Sie eine Streamingabfrage auf Databricks Runtime 14.1 oder niedriger ausführen, werden vorhandene Zustandsmetadatendateien ignoriert und es werden keine neuen Zustandsmetadatendateien geschrieben.

Sie können Zustandsmetadateninformationen für strukturierte Streamingabfragen lesen, die auf Databricks Runtime 14.2 oder höher ausgeführt werden. Verwenden Sie die folgende Syntax:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Die zurückgegebenen Daten haben das folgende Schema:

Spalte Typ Beschreibung
operatorId Ganze Zahl Die ganzzahlige ID des zustandsbehafteten Streamingoperatoren.
operatorName Ganze Zahl Name des zustandsbehafteten Streamingoperatoren.
stateStoreName Schnur Name des Zustandsspeichers des Operators.
numPartitions Ganze Zahl Anzahl der Partitionen des Zustandsspeichers.
minBatchId Lang Die niedrigste Batch-ID, die für den Abfragezustand verfügbar ist.
maxBatchId Lang Die höchste Batch-ID, die für den Abfragezustand verfügbar ist.

Hinweis

Die von minBatchId und maxBatchId bereitgestellten Batch-ID-Werte spiegeln den Zustand zum Zeitpunkt wider, zu dem der Prüfpunkt erstellt wurde. Alte Batches werden bei der Ausführung von Mikrobatches automatisch bereinigt, so dass nicht garantiert werden kann, dass der hier angegebene Wert noch verfügbar ist.

Siehe read_state_metadata Tabellenwertfunktion.

Beispiel: Eine Seite einer Stream-Stream-Verknüpfung abfragen

Verwenden Sie die folgende Syntax, um die linke Seite einer Stream-Stream-Verknüpfung abzufragen:

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Beispiel: Abfragestatusspeicher für Datenstrom mit mehreren zustandsbehafteten Operatoren

In diesem Beispiel wird der Statusmetadatenleser verwendet, um Metadatendetails einer Streamingabfrage mit mehreren zustandsbehafteten Operatoren zu sammeln, und verwendet dann die Metadatenergebnisse als Optionen für den Statusleser.

Der Statusmetadatenleser verwendet den Prüfpunktpfad als einzige Option, wie im folgenden Syntaxbeispiel gezeigt:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Die folgende Tabelle stellt eine Beispielausgabe der Zustandsspeichermetadaten dar:

operatorId operatorName Zustandspeichername AnzahlPartitionen minBatchId maxBatchId
0 stateStoreSave Standardeinstellung 200 0 13
1 dedupeWithinWatermark Standardeinstellung 200 0 13

Um Ergebnisse für den dedupeWithinWatermark Operator zu erhalten, fragen Sie den Statusleser mit der operatorId Option ab, wie im folgenden Beispiel gezeigt:

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);