Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Dieser Artikel enthält Codebeispiele und Erläuterungen grundlegender Konzepte, die zum Ausführen Ihrer ersten strukturierten Streaming-Abfragen auf Azure Databricks erforderlich sind. Sie können strukturiertes Streaming für nahezu Echtzeit- und inkrementelle Verarbeitungsworkloads verwenden.
Strukturiertes Streaming ist eine von mehreren Technologien, die in Lakeflow Spark Declarative Pipelines Streaming-Tabellen betreiben. Databricks empfiehlt die Verwendung von Lakeflow Spark Declarative Pipelines für alle neuen ETL-, Aufnahme- und strukturierten Streaming-Workloads. Siehe Lakeflow Spark Declarative Pipelines.
Hinweis
Während Lakeflow Spark Declarative Pipelines eine leicht modifizierte Syntax zum Deklarieren von Streamingtabellen bereitstellt, gilt die allgemeine Syntax zum Konfigurieren von Streaminglesevorgängen und Transformationen für alle Streaming-Anwendungsfälle in Azure Databricks. Lakeflow Spark Declarative Pipelines vereinfacht auch das Streaming durch die Verwaltung von Statusinformationen, Metadaten und zahlreichen Konfigurationen.
Verwenden des automatischen Ladens zum Lesen von Streamingdaten aus dem Objektspeicher
Im folgenden Beispiel wird das Laden von JSON-Daten mit Auto Loader veranschaulicht, das zum Kennzeichnen von Format und Optionen cloudFiles verwendet. Die schemaLocation-Option ermöglicht die Schemaableitung und Evolution. Fügen Sie den folgenden Code in eine Zelle des Databricks-Notebooks ein und führen Sie die Zelle aus, um einen Streaming-DataFrame mit dem Namen raw_df zu erstellen:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Wie bei anderen Lesevorgängen in Azure Databricks lädt das Konfigurieren eines Streaming-Lesevorgangs tatsächlich keine Daten. Sie müssen eine Aktion für die Daten auslösen, bevor der Datenstrom beginnt.
Hinweis
Das Aufrufen von display() bei einem Streaming DataFrame startet einen Streamingauftrag. Bei den meisten Anwendungsfällen für strukturiertes Streaming sollte die Aktion, die einen Stream auslöst, das Schreiben von Daten in eine Senke sein. Weitere Informationen finden Sie unter Produktionsüberlegungen für strukturiertes Streaming.
Durchführen einer Streamingtransformation
Strukturiertes Streaming unterstützt die meisten Transformationen, die in Azure Databricks und Spark SQL verfügbar sind. Sie können sogar MLflow-Modelle als UDFs laden und Streamingvorhersagen als Transformation erstellen.
Im folgenden Codebeispiel wird eine einfache Transformation abgeschlossen, um die aufgenommenen JSON-Daten mit zusätzlichen Informationen mithilfe von Spark SQL-Funktionen zu erweitern:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
Das resultierende transformed_df enthält Abfrageanweisungen zum Laden und Transformieren jedes Datensatzes, sobald er in der Datenquelle eingeht.
Hinweis
Strukturiertes Streaming behandelt Datenquellen als ungebundene oder unendliche Datasets. Daher werden einige Transformationen in strukturierten Streaming-Workloads nicht unterstützt, da sie eine unendliche Anzahl von Elementen sortieren müssen.
Die meisten Aggregationen und viele Verknüpfungen erfordern die Verwaltung von Zustandsinformationen mit Wasserzeichen, Fenstern und Ausgabemodus. Siehe Anwenden von Wasserzeichen zum Steuern von Schwellenwerten für die Datenverarbeitung.
Durchführen eines inkrementellen Batchschreibvorgangs in Delta Lake
Im folgenden Beispiel wird mithilfe eines angegebenen Dateipfads und Prüfpunkts in Delta Lake geschrieben.
Wichtig
Stellen Sie immer sicher, dass Sie für jeden von Ihnen konfigurierten Streaming-Writer einen eindeutigen Prüfpunktspeicherort angeben. Der Prüfpunkt stellt die eindeutige Identität für Ihren Datenstrom bereit, wobei alle verarbeiteten Datensätze und Statusinformationen nachverfolgt werden, die Ihrer Streamingabfrage zugeordnet sind.
Die availableNow-Einstellung für den Trigger weist strukturiertes Streaming an, alle zuvor unverarbeiteten Datensätze aus dem Quelldatensatz zu verarbeiten und dann herunterzufahren, sodass Sie den folgenden Code sicher ausführen können, ohne sich Gedanken über das Verlassen eines Datenstroms machen zu müssen:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
In diesem Beispiel kommen keine neuen Datensätze in unserer Datenquelle an, sodass die Ausführung dieses Codes keine neuen Datensätze verarbeitet.
Warnung
Die Ausführung von strukturiertem Streaming kann verhindern, dass die automatische Abschaltung die Computerressourcen herunterfährt. Um unerwartete Kosten zu vermeiden, müssen Sie Streamingabfragen beenden.
Lesen von Daten aus Delta Lake, Transformation und Schreiben in Delta Lake
Delta Lake verfügt über umfangreiche Unterstützung für die Arbeit mit strukturiertem Streaming sowohl als Quelle als auch als Senke. Weitere Informationen finden Sie unter Delta-Tabelle: Streaming für Lese- und Schreibvorgänge.
Das folgende Beispiel zeigt eine Beispielsyntax, um alle neuen Datensätze aus einer Delta-Tabelle inkrementell zu laden, sie mit einer Momentaufnahme einer anderen Delta-Tabelle zu verbinden und in eine Delta-Tabelle zu schreiben:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
Sie müssen über die erforderlichen Berechtigungen verfügen, um Quelltabellen zu lesen und in Zieltabellen und den angegebenen Prüfpunktspeicherort zu schreiben. Verwenden Sie die relevanten Werte aus Ihren Datenquellen und Datenempfängern, um alle mit spitzen Klammern (<>) gekennzeichneten Parameter auszufüllen.
Hinweis
Lakeflow Spark Declarative Pipelines bietet eine vollständig deklarative Syntax zum Erstellen von Delta Lake-Pipelines und verwaltet Eigenschaften wie Trigger und Prüfpunkte automatisch. Siehe Lakeflow Spark Declarative Pipelines.
Lesen von Daten aus Kafka, Transformieren und Schreiben in Kafka
Apache Kafka und andere Nachrichtenbusse bieten mit die niedrigste Latenz, die für große Datasets verfügbar ist. Sie können Azure Databricks verwenden, um Transformationen auf Daten anzuwenden, die aus Kafka aufgenommen wurden, und dann Daten zurück in Kafka schreiben.
Hinweis
Das Schreiben von Daten in den Cloudobjektspeicher erhöht zusätzlichen Latenzaufwand. Wenn Sie Daten aus einem Nachrichtenbus in Delta Lake speichern möchten, aber die niedrigste Latenz für Streaming-Workloads erfordern, empfiehlt Databricks, separate Streamingaufträge für das Aufnehmen von Daten in das Lakehouse zu konfigurieren und nahezu Echtzeittransformationen für Downstream-Nachrichtenbussenken anzuwenden.
Das folgende Codebeispiel veranschaulicht ein einfaches Muster zum Anreichern von Daten aus Kafka durch Verknüpfen mit Daten in einer Delta-Tabelle und anschließendes Zurückschreiben in Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
Sie müssen über die erforderlichen Berechtigungen für den Zugriff auf Ihren Kafka-Dienst verfügen. Verwenden Sie die relevanten Werte aus Ihren Datenquellen und Datenempfängern, um alle mit spitzen Klammern (<>) gekennzeichneten Parameter auszufüllen. Siehe Stream-Verarbeitung mit Apache Kafka und Azure Databricks.