Freigeben über


Was ist zustandsbehaftetes Streaming?

Eine zustandsbehaftete strukturierte Streaming-Abfrage erfordert inkrementelle Aktualisierungen von Zwischenzustandsinformationen, während eine zustandslose strukturierte Streaming-Abfrage nur Informationen darüber verfolgt, welche Datensätze von der Datenquelle zur Senke verarbeitet wurden.

Zustandsbehaftete Operationen umfassen Streaming-Aggregation, Streaming dropDuplicates, Stream-Stream-Verknüpfungen und benutzerdefinierte zustandsbehaftete Anwendungen.

Die für zustandsbehafteten Strukturierten Streaming-Abfragen erforderlichen Zwischenstatusinformationen können zu unerwarteten Latenz- und Produktionsproblemen führen, wenn sie falsch konfiguriert sind.

In Databricks Runtime 13.3 LTS und höher können Sie mit RocksDB Prüfpunkte im Änderungsprotokoll aktivieren, um die Prüfpunktdauer und End-to-End-Latenz für strukturierte Streamingworkloads zu verringern. Databricks empfiehlt, Änderungsprotokollprüfpunkte für alle zustandsbehafteten Abfragen von strukturiertem Streaming zu aktivieren. Weitere Informationen unter Aktivieren der Änderungsprotokollprüfpunkte.

Optimieren zustandsbehafteter strukturierter Streaming-Abfragen

Das Verwalten der Zwischenstatusinformationen von zustandsbehafteten strukturierten Streaming-Abfragen kann dazu beitragen, unerwartete Latenz- und Produktionsprobleme zu verhindern.

Databricks empfiehlt:

  • Verwenden Sie computeoptimierte Instanzen als Worker,
  • Legen Sie die Anzahl der Shuffle-Partitionen auf 1-2 mal die Anzahl der Kerne im Cluster fest.
  • Legen Sie die Konfiguration von spark.sql.streaming.noDataMicroBatches.enabled in der SparkSession-Instanz auf false fest. Dadurch wird verhindert, dass die Streaming-Microbatch-Engine Microbatches verarbeitet, die keine Daten enthalten. Beachten Sie auch, dass das Festlegen dieser Konfiguration false zu zustandsbehafteten Vorgängen führen kann, die Wasserzeichen oder Verarbeitungstimeouts verwenden, sodass die Datenausgabe nicht erfolgt, bis neue Daten eintreffen, anstatt sofort.

Databricks empfiehlt die Verwendung von RocksDB mit Changelog-Prüfpunkten, um den Zustand für zustandsbehaftete Datenströme zu verwalten. Weitere Informationen finden Sie unter Konfigurieren des RocksDB-Statusspeichers auf Azure Databricks.

Hinweis

Das Zustandsverwaltungsschema kann zwischen Abfrageneustarts nicht geändert werden. Wenn eine Abfrage mit der Standardverwaltung gestartet wurde, müssen Sie sie von Grund auf neu mit einem neuen Prüfpunktspeicher neu starten, um den Zustandsspeicher zu ändern.

Arbeiten mit mehreren zustandsbehafteten Operatoren im strukturierten Streaming

In Databricks Runtime 13.3 LTS und höher bietet Azure Databricks erweiterte Unterstützung für zustandsbehaftete Operatoren in Workloads für strukturiertes Streaming. Sie können jetzt mehrere zustandsbehaftete Operatoren miteinander verketten, was bedeutet, dass Sie die Ausgabe eines Vorgangs, z. B. eine Fensteraggregation, in einen anderen zustandsbehafteten Vorgang, z. B. eine Verknüpfung, übertragen können.

In Databricks Runtime 16.2 und höher können Sie in Workloads mit mehreren zustandsbehafteten Operatoren verwenden transformWithState . Siehe Erstellen einer benutzerdefinierten zustandsbehafteten Anwendung.

Die folgenden Beispiele veranschaulichen mehrere Muster, die Sie verwenden können.

Wichtig

Beim Arbeiten mit mehreren zustandsbehafteten Operatoren bestehen die folgenden Einschränkungen:

  • Benutzerdefinierte statusbehaftete Operatoren (FlatMapGroupWithState und applyInPandasWithState) werden nicht unterstützt.
  • Nur der Modus „Ausgabe anfügen“ wird unterstützt.

Aggregation verketteter Zeitfenster

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Aggregation von Zeitfenstern in zwei verschiedenen Streams, gefolgt von Stream-Stream-Fenster-Verknüpfung

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Stream-Stream-Zeitintervallverknüpfung gefolgt von Zeitfensteraggregation

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Statusausgleich für strukturiertes Streaming

Die Zustandsneuausgleich ist standardmäßig für alle Streaming-Arbeitslasten in Lakeflow Spark Declarative Pipelines aktiviert. In Databricks Runtime 11.3 LTS und höher können Sie die folgende Konfigurationsoption in der Spark-Clusterkonfiguration festlegen, um das erneute Ausgleichen des Zustands zu aktivieren:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Von der Zustandsneuordnung profitieren Pipelines für strukturiertes Streaming mit Clustergrößenänderungen. Zustandslose Streamingvorgänge profitieren nicht davon, auch wenn sich die Größe des Clusters ändert.

Hinweis

Die automatische Computeskalierung hat Einschränkungen beim Herunterskalieren der Clustergröße für strukturierten Streaming-Workloads. Databricks empfiehlt die Verwendung von Lakeflow Spark Declarative Pipelines mit verbesserter automatischer Skalierung für Streaming-Workloads. Siehe Optimieren der Clusternutzung von Lakeflow Spark Declarative Pipelines mit automatischer Skalierung.

Cluster-Resizing-Ereignisse lösen die Rebalancierung des Zustands aus. Mikrobatches können bei rebalancing-Ereignissen eine höhere Latenz haben, da der Zustand vom Cloudspeicher an die neuen Executoren geladen wird.