Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Um die im Zustand gespeicherten Daten effektiv zu verwalten, verwenden Sie Wasserzeichen bei der Durchführung der zustandsbehafteten Stream-Verarbeitung in Lakeflow Spark Declarative Pipelines, einschließlich Aggregationen, Verknüpfungen und Deduplizierung. In diesem Artikel wird beschrieben, wie Sie Wasserzeichen in Ihren Pipelineabfragen verwenden, und es werden Beispiele für die empfohlenen Vorgänge gegeben.
Hinweis
Um sicherzustellen, dass Abfragen, die Aggregationen ausführen, inkrementell verarbeitet und nicht vollständig mit jedem Update neu komputiert werden, müssen Sie Wasserzeichen verwenden.
Was ist ein Wasserzeichen?
Bei der Datenstromverarbeitung ist ein Wasserzeichen ein Feature von Apache Spark, das einen zeitbasierten Schwellenwert für die Verarbeitung von Daten definieren kann, wenn zustandsbehaftete Vorgänge, wie z.B. Aggregationen, ausgeführt werden. Die eingehenden Daten werden verarbeitet, bis der Schwellenwert erreicht ist, zu dem zeitpunkt, an dem das durch den Schwellenwert definierte Zeitfenster geschlossen wird. Wasserzeichen können verwendet werden, um Probleme während der Abfrageverarbeitung zu vermeiden, hauptsächlich bei der Verarbeitung größerer Datasets oder langer Verarbeitung. Diese Probleme können eine hohe Latenz bei der Verarbeitung von Ergebnissen und sogar Out-of-Memory-Fehler (OOM) aufgrund der Menge an Daten umfassen, die im Zustand während der Verarbeitung gespeichert werden. Da Streamingdaten inhärent ungeordnet sind, unterstützen Wasserzeichen auch die ordnungsgemäße Berechnung von Vorgängen wie Zeitfensteraggregationen.
Weitere Informationen zur Verwendung von Wasserzeichen in der Datenstromverarbeitung finden Sie unter Wasserzeichen in Apache Spark Structured Streaming und Anwenden von Wasserzeichen zur Steuerung von Datenverarbeitungsschwellenwerten.
Wie definieren Sie ein Wasserzeichen?
Sie definieren ein Wasserzeichen, indem Sie ein Zeitstempelfeld und einen Wert angeben, der den Zeitschwellenwert für verspätete Daten darstellt. Daten werden verspätet betrachtet, wenn sie nach dem definierten Zeitschwellenwert eintreffen. Wenn der Schwellenwert beispielsweise als 10 Minuten definiert ist, werden Datensätze, die nach dem Schwellenwert von 10 Minuten ankommen, möglicherweise verworfen.
Da Datensätze, die nach dem definierten Schwellenwert eingehen, möglicherweise verworfen werden, ist es wichtig, einen Schwellenwert auszuwählen, der Ihre Latenz im Vergleich zu den Korrektheitsanforderungen erfüllt. Die Auswahl eines kleineren Grenzwerts führt dazu, dass Datensätze früher ausgegeben werden, bedeutet aber auch, dass verzögerte Datensätze eher verworfen werden. Ein größerer Schwellenwert bedeutet eine längere Wartezeit, aber möglicherweise mehr Vollständigkeit der Daten. Aufgrund der größeren Zustandsgröße kann ein größerer Schwellenwert auch zusätzliche Computerressourcen erfordern. Da der Schwellenwert von Ihren Daten- und Verarbeitungsanforderungen abhängt, ist es wichtig, ihre Verarbeitung zu testen und zu überwachen, um einen optimalen Schwellenwert zu ermitteln.
Sie verwenden die withWatermark() Funktion in Python, um ein Wasserzeichen zu definieren. Verwenden Sie in SQL die WATERMARK Klausel, um ein Wasserzeichen zu definieren:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Verwenden Sie Wasserzeichen mit Stream-Stream-Verknüpfungen
Bei Datenstromverknüpfungen müssen Sie ein Wasserzeichen auf beiden Seiten der Verknüpfung und eine Zeitintervallklausel definieren. Da jede Verknüpfungsquelle eine unvollständige Ansicht der Daten enthält, ist die Zeitintervallklausel erforderlich, um dem Streamingmodul mitzuteilen, wann keine weiteren Übereinstimmungen vorgenommen werden können. Die Zeitintervallklausel muss dieselben Felder verwenden, die zum Definieren der Wasserzeichen verwendet werden.
Da es vorkommen kann, dass für jeden Datenstrom unterschiedliche Schwellenwerte für Wasserzeichen erforderlich sind, müssen die Ströme nicht über die gleichen Schwellenwerte verfügen. Um fehlende Daten zu vermeiden, verwaltet das Streamingmodul ein globales Wasserzeichen basierend auf dem langsamsten Datenstrom.
Im folgenden Beispiel wird ein Stream von Anzeigenimpressionen und ein Stream von Benutzerklicks auf Anzeigen verbunden. In diesem Beispiel muss innerhalb von 3 Minuten nach dem Eindruck ein Klick erfolgen. Nach Ablauf des 3-Minuten-Zeitintervalls werden Zeilen aus dem Zustand gelöscht, der nicht mehr abgeglichen werden kann.
Python
from pyspark import pipelines as dp
dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Durchführen von Fensteraggregationen mit Wasserzeichen
Ein häufiger zustandsbehafteter Vorgang bei Streamingdaten ist eine fensterbasierte Aggregation. Fensteraggregationen ähneln gruppierten Aggregationen, mit der Ausnahme, dass Aggregatwerte für den Satz von Zeilen zurückgegeben werden, die Teil des definierten Fensters sind.
Ein Fenster kann als bestimmte Länge definiert werden, und ein Aggregationsvorgang kann für alle Zeilen ausgeführt werden, die Teil dieses Fensters sind. Spark Streaming unterstützt drei Arten von Fenstern:
- Tumbling (feste) Fenster: Eine Reihe von Zeitintervallen fester Größe, die nicht überlappen und aufeinanderfolgend sind. Ein Eingabedatensatz gehört nur zu einem einzelnen Fenster.
- Gleitfenster: Ähnlich wie bei stürzenden Fenstern sind gleitende Fenster fester Größe, fenster können sich jedoch überlappen, und ein Datensatz kann in mehrere Fenster fallen.
Wenn Daten über das Ende des Fensters und die Länge des Wasserzeichens hinaus gelangen, werden keine neuen Daten für das Fenster akzeptiert, das Ergebnis der Aggregation wird ausgegeben, und der Zustand für das Fenster wird gelöscht.
Im folgenden Beispiel wird alle 5 Minuten mithilfe eines festen Fensters eine Summe der Impressionen berechnet. In diesem Beispiel verwendet die Auswahlklausel den Alias impressions_window, und dann wird das Fenster selbst als Teil der GROUP BY Klausel definiert. Das Fenster muss auf derselben Zeitstempelspalte basieren wie das Wasserzeichen, die clickTimestamp Spalte in diesem Beispiel.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Ein ähnliches Beispiel in Python zum Berechnen des Gewinns über stündliche feste Fenster:
from pyspark import pipelines as dp
@dp.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Deduplizieren von Streamingdatensätzen
Strukturiertes Streaming bietet genau einmal Verarbeitungsgarantien, führt aber nicht automatisch zu Duplikaten von Datensätzen aus Datenquellen. Da viele Nachrichtenwarteschlangen Garantien für mindestens einmalige Ausführung bieten, sollte man beim Lesen aus einer dieser Nachrichtenwarteschlangen mit doppelten Datensätzen rechnen. Sie können die dropDuplicatesWithinWatermark() Funktion verwenden, um Datensätze für jedes angegebene Feld zu deduplizieren und Duplikate aus einem Datenstrom zu entfernen, auch wenn sich einige Felder unterscheiden (z. B. Ereigniszeit oder Ankunftszeit). Sie müssen ein Wasserzeichen angeben, das die dropDuplicatesWithinWatermark() Funktion verwenden soll. Alle doppelten Daten, die innerhalb des durch das Wasserzeichen angegebenen Zeitraums eingehen, werden gelöscht.
Sortierte Daten sind wichtig, da out-of-order-Daten dazu führen, dass der Wasserzeichenwert falsch voranspringt. Wenn dann ältere Daten eintreffen, wird sie als verspätet betrachtet und verworfen. Verwenden Sie die withEventTimeOrder Option, um die anfängliche Momentaufnahme basierend auf dem im Wasserzeichen angegebenen Zeitstempel in der Reihenfolge zu verarbeiten. Die withEventTimeOrder Option kann im Code deklariert werden, der das Dataset definiert, oder in den Pipelineeinstellungen mithilfe spark.databricks.delta.withEventTimeOrder.enabledvon . Beispiel:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Hinweis
Die withEventTimeOrder Option wird nur mit Python unterstützt.
Im folgenden Beispiel werden die Daten geordnet nach clickTimestamp verarbeitet, und Datensätze, die innerhalb von 5 Sekunden voneinander ankommen und die doppelten userId- und clickAdId-Spalten enthalten, werden verworfen.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Optimieren der Pipelinekonfiguration für zustandsbehaftete Verarbeitung
Um Produktionsprobleme und übermäßige Latenz zu verhindern, empfiehlt Databricks die Aktivierung der RocksDB-basierten Zustandsverwaltung für die zustandsbehaftete Datenstromverarbeitung, insbesondere, wenn ihre Verarbeitung eine große Menge an Zwischenzustand spart.
Serverlose Pipelines verwalten automatisch Zustandsspeicherkonfigurationen.
Sie können die RocksDB-basierte Zustandsverwaltung aktivieren, indem Sie die folgende Konfiguration festlegen, bevor Sie eine Pipeline bereitstellen:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Weitere Informationen zum RocksDB-Zustandsspeicher, einschließlich Konfigurationsempfehlungen für RocksDB, finden Sie unter Configure RocksDB state store on Azure Databricks.