Udostępnij przez


Uruchom pierwsze zadanie Streamingu Strukturalnego

Ten artykuł zawiera przykłady kodu i wyjaśnienie podstawowych pojęć niezbędnych do uruchamiania pierwszych zapytań przesyłania strumieniowego ze strukturą w usłudze Azure Databricks. Możesz użyć Structured Streaming do obsługi obciążeń przetwarzania przyrostowego i niemal w czasie rzeczywistym.

Przesyłanie strumieniowe ze strukturą to jedna z kilku technologii, które zasilają tabele przesyłania strumieniowego w potokach deklaratywnych platformy Lakeflow. Databricks zaleca używanie Lakeflow Spark Declarative Pipelines dla wszystkich nowych obciążeń ETL, przetwarzania i strumieniowego przetwarzania danych o zdefiniowanej strukturze. Zobacz Potoki deklaratywne platformy Spark w usłudze Lakeflow.

Uwaga

Chociaż Lakeflow Spark deklaratywne potoki udostępniają nieco zmodyfikowaną składnię do deklarowania tabel streamingu, ogólna składnia konfigurowania odczytów i przekształceń przesyłania strumieniowego dotyczy wszystkich zastosowań streamingu w usłudze Azure Databricks. Potoki deklaratywne platformy Lakeflow upraszczają również przesyłanie strumieniowe, zarządzając informacjami o stanie, metadanymi i wieloma konfiguracjami.

Użyj Auto Loader do odczytania danych przesyłanych strumieniowo z magazynu obiektów

W poniższym przykładzie pokazano ładowanie danych JSON za pomocą modułu automatycznego ładującego, które używa cloudFiles do oznaczania formatu i opcji. Opcja schemaLocation umożliwia wnioskowanie i ewolucję schematu. Wklej następujący kod w komórce notesu usługi Databricks i uruchom komórkę, aby utworzyć strumieniową ramkę danych o nazwie raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Podobnie jak w przypadku innych operacji odczytu na platformie Azure Databricks, konfigurowanie odczytu strumieniowego nie powoduje faktycznego załadowania danych. Musisz wykonać akcję na danych przed rozpoczęciem strumienia.

Uwaga

Wywołanie display() ramki danych przesyłania strumieniowego uruchamia zadanie przesyłania strumieniowego. W przypadku większości zastosowań streamingu strukturalnego akcja wyzwalająca strumień powinna polegać na zapisywaniu danych do odbiornika. Zobacz Zagadnienia dotyczące produkcji Structured Streaming.

Przeprowadź transformację strumieniową

Przesyłanie strumieniowe ze strukturą obsługuje większość przekształceń dostępnych w usługach Azure Databricks i Spark SQL. Modele MLflow można nawet załadować jako funkcje UDF i dokonywać prognoz w przesyłaniu strumieniowym jako transformację.

Poniższy przykład kodu wykonuje prostą transformację, aby wzbogacić pozyskane dane JSON o dodatkowe informacje przy użyciu funkcji Spark SQL:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

transformed_df Wynik zawiera instrukcje zapytania dotyczące ładowania i przekształcania każdego rekordu w momencie jego nadejścia do źródła danych.

Uwaga

Przesyłanie strumieniowe ze strukturą traktuje źródła danych jako niezwiązane lub nieskończone zestawy danych. W związku z tym niektóre przekształcenia nie są obsługiwane w obciążeniach pracy dla usługi Strukturalnego Przesyłania Strumieniowego, ponieważ wymagają one sortowania nieskończonej ilości elementów.

Większość agregacji i wielu łączeń wymaga zarządzania informacjami o stanie za pomocą znaków wodnych, okien i trybu wyjściowego. Zobacz Stosowanie wodnych znaków do kontroli progów przetwarzania danych.

Wykonaj przyrostowy zapis wsadowy do Delta Lake

Poniższy przykład zapisuje do Delta Lake przy użyciu określonej ścieżki pliku i punktu kontrolnego.

Ważne

Zawsze upewnij się, że wybierasz unikalną lokalizację punktu kontrolnego dla każdego pisarza strumieniowego, którego konfigurujesz. Punkt kontrolny zapewnia unikalną tożsamość strumienia, śledząc wszystkie przetworzone rekordy i informacje o stanie związane z zapytaniem przesyłanym strumieniowo.

Ustawienie availableNow wyzwalacza powoduje, że Strukturyzowane Przesyłanie Strumieniowe przetwarza wszystkie wcześniej nieprzetworzone rekordy ze źródłowego zestawu danych, a następnie zamyka się, aby można było bezpiecznie wykonać następujący kod bez obaw o pozostawienia uruchomionego strumienia.

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

W tym przykładzie żadne nowe rekordy nie docierają do naszego źródła danych, więc powtórzenie wykonywania tego kodu nie powoduje pozyskiwania nowych rekordów.

Ostrzeżenie

Structured Streaming może zapobiec automatycznemu zakończeniu pracy zasobów obliczeniowych. Aby uniknąć nieoczekiwanych kosztów, pamiętaj o przerwaniu zapytań przesyłania strumieniowego.

Odczytywanie danych z usługi Delta Lake, przekształcanie i zapisywanie w usłudze Delta Lake

Delta Lake ma rozbudowaną obsługę pracy z Strukturalnym Strumieniowaniem zarówno jako źródło danych, jak i odbiornik. Zobacz tabelę Delta odczytów i zapisów strumieniowych.

W poniższym przykładzie pokazano przykładową składnię umożliwiającą przyrostowe ładowanie wszystkich nowych rekordów z tabeli delty, łączenie ich z migawką innej tabeli delty i zapisywanie ich w tabeli delty:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Musisz mieć skonfigurowane uprawnienia do odczytu tabel źródłowych, zapisu w tabelach docelowych oraz dostępu do lokalizacji punktu kontrolnego. Wypełnij wszystkie parametry oznaczone nawiasami kątowymi (<>) przy użyciu odpowiednich wartości dla źródeł danych i ujść.

Uwaga

Lakeflow Spark Deklaratywne Potoki zapewnia w pełni deklaratywną składnię do tworzenia potoków Delta Lake i automatycznie zarządza właściwościami, takimi jak wyzwalacze i punkty kontrolne. Zobacz Potoki deklaratywne platformy Spark w usłudze Lakeflow.

Odczytywanie danych z platformy Kafka, przekształcanie i zapisywanie na platformie Kafka

Platforma Apache Kafka i inne magistrale obsługi komunikatów zapewniają jedne z najniższych opóźnień dostępnych dla dużych zestawów danych. Za pomocą usługi Azure Databricks można zastosować przekształcenia do danych pozyskanych z platformy Kafka, a następnie zapisywać dane z powrotem na platformie Kafka.

Uwaga

Zapisywanie danych w magazynie obiektów w chmurze zwiększa dodatkowe obciążenie związane z opóźnieniami. Jeśli chcesz przechowywać dane z magistrali komunikatów w Delta Lake, ale wymagasz jak najmniejszego opóźnienia w przypadku obciążeń przesyłanych strumieniowo, Databricks zaleca skonfigurowanie oddzielnych zadań przesyłania strumieniowego w celu zapisywania danych do Lakehouse i stosowania przekształceń niemal w czasie rzeczywistym dla podrzędnych wyjść magistrali komunikatów.

Poniższy przykład kodu przedstawia prosty wzorzec wzbogacania danych z platformy Kafka przez dołączenie ich do danych w tabeli delty, a następnie zapisanie z powrotem na platformie Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Musisz mieć odpowiednie uprawnienia skonfigurowane do uzyskiwania dostępu do usługi Kafka. Wypełnij wszystkie parametry oznaczone nawiasami kątowymi (<>) przy użyciu odpowiednich wartości dla źródeł danych i ujść. Zobacz Przetwarzanie strumieniowe przy użyciu platform Apache Kafka i Azure Databricks.