Udostępnij przez


Przesyłanie danych strumieniowych do lakehouse za pomocą Spark Structured Streaming

Przesyłanie strumieniowe ze strukturą to skalowalny i odporny na uszkodzenia aparat przetwarzania strumieniowego oparty na platformie Spark. Spark zajmuje się uruchamianiem operacji przesyłania strumieniowego przyrostowo i nieprzerwanie wraz z ciągłym napływem danych.

Przesyłanie strumieniowe ze strukturą stało się dostępne na platformie Spark 2.2. Od tego czasu jest to zalecane podejście do przesyłania strumieniowego danych. Podstawową zasadą strumienia strukturalnego jest traktowanie strumienia danych na żywo jako tabeli, w której nowe dane są zawsze dołączane w sposób ciągły, podobnie jak nowy wiersz w tabeli. Istnieje kilka zdefiniowanych wbudowanych źródeł plików przesyłania strumieniowego, takich jak CSV, JSON, ORC, Parquet i wbudowana obsługa usług obsługi komunikatów, takich jak Kafka i Event Hubs.

Ten artykuł zawiera szczegółowe informacje na temat optymalizacji przetwarzania i przyjmowania zdarzeń poprzez strukturalne przesyłanie strumieniowe w Spark w środowiskach produkcyjnych o wysokiej wydajności. Sugerowane podejścia obejmują:

  • Optymalizacja przepływności przesyłania strumieniowego danych
  • Optymalizowanie operacji zapisu w tabeli delty i
  • Grupowanie zdarzeń

Definicje zadań platformy Spark i notesy platformy Spark

Notesy platformy Spark to doskonałe narzędzie do weryfikowania pomysłów i wykonywania eksperymentów w celu uzyskania szczegółowych informacji z danych lub kodu. Notatniki są szeroko używane do przygotowania danych, wizualizacji, uczenia maszynowego i innych zastosowań w analizie dużych zbiorów danych. Definicje zadań platformy Spark to nieinterakcyjne zadania zorientowane na kod uruchomione w klastrze Spark przez długi czas. Definicje zadań platformy Spark zapewniają niezawodność i dostępność.

Notatniki Spark są doskonałym źródłem do testowania logiki kodu i aby spełnić wszystkie wymagania biznesowe. Jednak aby zachować działanie w scenariuszu produkcyjnym, definicje zadań platformy Spark z włączonymi zasadami ponawiania są najlepszym rozwiązaniem.

Zasady ponawiania definicji zadań Spark

W Microsoft Fabric jest możliwość ustawienia polityki ponawiania dla zadań definicji Spark. Chociaż skrypt w zadaniu może być nieskończony, infrastruktura uruchamiana przez skrypt może spowodować wystąpienie problemu wymagającego zatrzymania zadania. Zadanie można też wyeliminować z powodu podstawowych potrzeb związanych z stosowaniem poprawek infrastruktury. Zasady ponawiania umożliwiają użytkownikowi ustawienie reguł automatycznego ponownego uruchamiania zadania, jeśli zostanie ono zatrzymane z powodu problemów źródłowych. Parametry określają częstotliwość ponownego uruchamiania zadania, maksymalnie nieskończone ponawianie prób i ustawianie czasu między ponownymi próbami. Dzięki temu użytkownicy mogą mieć pewność, że ich zadania definicji zadań platformy Spark będą działać w nieskończoność, dopóki użytkownik nie zdecyduje się ich zatrzymać.

Źródła przesyłania strumieniowego

Konfigurowanie przesyłania strumieniowego za pomocą usługi Event Hubs wymaga podstawowej konfiguracji, która obejmuje nazwę przestrzeni nazw usługi Event Hubs, nazwę centrum, nazwę klucza dostępu współdzielonego i grupę odbiorców. Grupa konsumencka to przegląd całego centrum zdarzeń. Umożliwia to wielu aplikacjom konsumującym strumienie mieć oddzielny widok strumienia zdarzeń i odczytywać strumień niezależnie we własnym tempie i z ich przesunięciami.

Partycje są istotną częścią obsługi dużej ilości danych. Pojedynczy procesor ma ograniczoną pojemność do obsługi zdarzeń na sekundę, podczas gdy wiele procesorów może wykonać lepsze zadanie podczas równoległego wykonywania. Partycje umożliwiają równoległe przetwarzanie dużych ilości zdarzeń.

Jeśli korzysta się z zbyt wielu partycji przy niskim tempie wczytywania danych, to czytniki partycji przetwarzają tylko niewielką część tych danych, co prowadzi do nieoptymalnego przetwarzania. Idealna liczba partycji zależy bezpośrednio od żądanej szybkości przetwarzania. Jeśli chcesz skalować przetwarzanie zdarzeń, rozważ dodanie większej liczby partycji. Nie ma określonego limitu przepływności dla partycji. Jednak zagregowana przepływność w przestrzeni nazw jest ograniczona przez liczbę jednostek przepływności. W miarę zwiększania liczby jednostek przepływności w przestrzeni nazw, możesz potrzebować dodatkowych partycji, aby umożliwić czytnikom współbieżnym osiągnięcie maksymalnej przepływności.

Zaleceniem jest zbadanie i przetestowanie najlepszej liczby partycji dla scenariusza przepływności. Często jednak można zobaczyć scenariusze o wysokiej przepływności przy użyciu 32 lub większej liczby partycji.

Zaleca się użycie łącznika usługi Azure Event Hubs dla platformy Apache Spark (azure-event-hubs-spark) do połączenia aplikacji Spark z usługą Azure Event Hubs.

Lakehouse jako ujście przesyłania strumieniowego

Delta Lake to otwarta warstwa magazynowa, która zapewnia transakcje ACID (niepodzielność, spójność, izolacja i trwałość) na bazie rozwiązań magazynu danych typu data lake. Usługa Delta Lake obsługuje również skalowalną obsługę metadanych, ewolucję schematu, podróż w czasie (przechowywanie wersji danych), format otwarty i inne funkcje.

W inżynierii danych Fabric, Delta Lake służy do:

  • Łatwe wstawianie lub aktualizowanie oraz usuwanie danych za pomocą Spark SQL.
  • Kompaktowanie danych w celu zminimalizowania czasu spędzonego na wykonywanie zapytań dotyczących danych.
  • Wyświetl stan tabel przed wykonaniem operacji i po nim.
  • Pobierz historię operacji wykonywanych na tabelach.

Funkcja Delta jest dodawana jako jeden z możliwych formatów ujścia danych wyjściowych używanych w funkcji writeStream. Aby uzyskać więcej informacji na temat istniejących ujść danych wyjściowych, zobacz Przewodnik programowania przesyłania strumieniowego ze strukturą platformy Spark.

W poniższym przykładzie pokazano, jak można przesyłać strumieniowo dane do usługi Delta Lake.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 

df = spark \ 
  .readStream \ 
  .format("eventhubs") \ 
  .options(**ehConf) \ 
  .load()  

Schema = StructType([StructField("<column_name_01>", StringType(), False), 
                     StructField("<column_name_02>", StringType(), False), 
                     StructField("<column_name_03>", DoubleType(), True), 
                     StructField("<column_name_04>", LongType(), True), 
                     StructField("<column_name_05>", LongType(), True)]) 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .toTable("deltaeventstable") 

Informacje o fragmencie kodu w przykładzie:

  • format() to instrukcja, która definiuje format wyjściowy danych.
  • outputMode() definiuje sposób zapisywania nowych wierszy w strumieniu (czyli dołączania, zastępowania).
  • toTable() utrwala przesyłane strumieniowo dane do tabeli delty utworzonej przy użyciu wartości przekazanej jako parametr.

Optymalizowanie zapisów Delta

Partycjonowanie danych jest krytyczną częścią tworzenia niezawodnego rozwiązania do przesyłania strumieniowego: partycjonowanie poprawia sposób organizowania danych, a także poprawia przepływność. Pliki łatwo fragmentują się po operacjach Delta, co skutkuje powstaniem zbyt wielu małych plików. I zbyt duże pliki są również problemem, ze względu na długi czas zapisywania ich na dysku. Wyzwaniem podczas partycjonowania danych jest znalezienie odpowiedniej równowagi, która skutkuje optymalnymi rozmiarami plików. Platforma Spark obsługuje partycjonowanie w pamięci i na dysku. Prawidłowo partycjonowane dane mogą zapewnić najlepszą wydajność podczas utrwalania danych w usłudze Delta Lake i wykonywania zapytań dotyczących danych z usługi Delta Lake.

  • Podczas partycjonowania danych na dysku można wybrać sposób partycjonowania danych na podstawie kolumn przy użyciu partitionBy(). partitionBy() to funkcja używana do partycjonowania dużego modelu semantycznego na mniejsze pliki na podstawie jednej lub wielu kolumn dostarczonych podczas zapisywania na dysku. Partycjonowanie to sposób na zwiększenie wydajności zapytań podczas pracy z dużym modelem semantycznym. Unikaj wybierania kolumny, która generuje zbyt małe lub zbyt duże partycje. Zdefiniuj partycję na podstawie zestawu kolumn z dobrą kardynalnością i podziel dane na pliki o optymalnym rozmiarze.
  • Partycjonowanie danych w pamięci można wykonywać przy użyciu przekształceń repartition() lub coalesce(), dystrybuując dane na wielu węzłach roboczych i tworząc wiele zadań, które mogą równolegle odczytywać i przetwarzać dane w oparciu o zasady Resilient Distributed Dataset (RDD). Umożliwia podzielenie modelu semantycznego na partycje logiczne, które można obliczyć na różnych węzłach klastra.
    • funkcja repartition() służy do zwiększania lub zmniejszania liczby partycji w pamięci. Ponowne partycjonowanie przetasowuje całe dane w sieci i równoważy je we wszystkich partycjach.
    • coalesce() służy tylko do efektywnego zmniejszania liczby partycji. Jest to zoptymalizowana wersja funkcji repartition(), w której przenoszenie danych pomiędzy wszystkimi partycjami jest niższe przy użyciu funkcji coalesce().

Połączenie obu metod partycjonowania jest dobrym rozwiązaniem w scenariuszu o wysokiej przepływności. repartition() tworzy określoną liczbę partycji w pamięci, podczas gdy partitionBy() zapisuje pliki na dysku dla każdej partycji pamięci i kolumny partycjonowania. W poniższym przykładzie pokazano użycie obu strategii partycjonowania w tym samym zadaniu platformy Spark: dane są najpierw podzielone na 48 partycji w pamięci (przy założeniu, że mamy łącznie 48 rdzeni procesora CPU), a następnie partycjonowane na dysku na podstawie dwóch istniejących kolumn w ładunku.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 
import json 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Zoptymalizowany zapis

Inną opcją optymalizacji zapisów w usłudze Delta Lake jest użycie zoptymalizowanego zapisu. Zoptymalizowany zapis to opcjonalna funkcja, która poprawia sposób zapisywania danych w tabeli delty. Platforma Spark scala lub dzieli partycje przed zapisaniem danych, maksymalizując przepływność danych zapisywanych na dysku. Jednak wiąże się z pełnym tasowaniem, więc w przypadku niektórych zadań może spowodować obniżenie wydajności. Zadania korzystające z funkcji coalesce() i/lub repartition() w celu partycjonowania danych na dysku można refaktoryzować, aby zamiast tego rozpocząć korzystanie z zoptymalizowanego zapisu.

Poniższy kod jest przykładem użycia Zoptymalizowanego Zapisu. Należy pamiętać, że partitionBy() jest nadal używane.

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", true) 
 
rawData = df \ 
 .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Grupowanie zdarzeń

Aby zminimalizować liczbę operacji i poprawić czas spędzony na pozyskiwaniu danych do Delta Lake, grupowanie zdarzeń w partie jest praktycznym rozwiązaniem.

Wyzwalacze określają częstotliwość, z jaką powinno być wykonywane (wyzwalane) zapytanie strumieniowe i emitowane nowe dane. Skonfigurowanie ich definiuje okresowy interwał czasu przetwarzania dla mikropakietów, akumulując dane i grupując zdarzenia w kilka trwałych operacji, zamiast ciągle zapisywać na dysku.

W poniższym przykładzie pokazano zapytanie przesyłania strumieniowego, w którym zdarzenia są okresowo przetwarzane w interwałach jednej minuty.

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .trigger(processingTime="1 minute") \ 
  .toTable("deltaeventstable") 

Zaletą łączenia przetwarzania wsadowego zdarzeń w operacjach zapisywania tabeli Delta jest to, że tworzy większe pliki Delta z większą ilością danych, unikając małych plików. Należy przeanalizować ilość pozyskiwanych danych i znaleźć najlepszy czas przetwarzania, aby zoptymalizować rozmiar plików Parquet utworzonych przez bibliotekę delty.

Monitorowanie

Platforma Spark 3.1 i nowsze wersje mają wbudowany ustrukturyzowany interfejs użytkownika przesyłania strumieniowego zawierający następujące metryki przesyłania strumieniowego:

  • Szybkość wprowadzania
  • Szybkość procesu
  • Wiersze wejściowe
  • Czas trwania serii
  • Czas trwania operacji