Udostępnij przez


Optymalizowanie przetwarzania stanowego przy użyciu punktów odniesienia

Aby skutecznie zarządzać danymi przechowywanymi w stanie aplikacji, użyj znaczników wodnych przy realizacji stanowego przetwarzania strumieniowego w deklaratywnych potokach Lakeflow Spark, z uwzględnieniem agregacji, połączeń i deduplikacji. W tym artykule opisano, jak używać znaków wodnych w zapytaniach kanału i zawiera przykłady zalecanych operacji.

Uwaga / Notatka

Aby zapewnić, że zapytania wykonujące agregacje są przetwarzane przyrostowo i nie są w pełni ponownie obliczane przy każdej aktualizacji, należy użyć znaków wodnych.

Co to jest znak wodny?

W przypadku przetwarzania strumienia znak wodny to funkcja platformy Apache Spark, która umożliwia zdefiniowanie progu opartego na czasie przetwarzania danych podczas wykonywania operacji stanowych, takich jak agregacje. Dane przychodzące są przetwarzane do momentu osiągnięcia progu, w którym przedział czasu zdefiniowany przez próg zostanie zamknięty. Znaki wodne mogą służyć do unikania problemów podczas przetwarzania zapytań, głównie podczas przetwarzania większych zestawów danych lub długotrwałego przetwarzania. Te problemy mogą obejmować duże opóźnienie w tworzeniu wyników, a nawet błędy braku pamięci (OOM) ze względu na ilość danych przechowywanych w pamięci stanowej podczas przetwarzania. Ponieważ dane przesyłane strumieniowo są z natury nieuporządkowane, znaki wodne również wspierają prawidłowe obliczanie operacji, takich jak agregacje okien czasowych.

Aby dowiedzieć się więcej na temat używania znaków wodnych w przetwarzaniu strumieniowym, zobacz Watermarking in Apache Spark Structured Streaming (Znaki wodne w Apache Spark Structured Streaming) i Apply watermarks to control data processing thresholds (Stosowanie znaków wodnych w celu kontrolowania progów przetwarzania danych).

Jak zdefiniować znak wodny?

Możesz zdefiniować znak wodny, określając pole znacznika czasu oraz wartość reprezentującą próg czasu, w którym mogą dotrzeć dane opóźnione. Dane są uznawane za opóźnione, jeśli docierają po zdefiniowanym progu czasu. Jeśli na przykład próg jest zdefiniowany jako 10 minut, rekordy przychodzące po progu 10 minut mogą zostać usunięte.

Ponieważ rekordy dostarczane po zdefiniowanym progu mogą zostać porzucone, wybranie progu spełniającego wymagania dotyczące opóźnień i poprawności jest ważne. Wybranie mniejszego progu powoduje, że rekordy są emitowane wcześniej, ale także oznacza, że późne rekordy mogą zostać porzucone. Większy próg oznacza dłuższe oczekiwanie, ale prawdopodobnie większą kompletność danych. Ze względu na większy rozmiar stanu większy próg może również wymagać dodatkowych zasobów obliczeniowych. Ponieważ wartość progowa zależy od wymagań dotyczących danych i przetwarzania, testowanie i monitorowanie przetwarzania jest ważne, aby określić optymalny próg.

Używasz funkcji withWatermark() w języku Python, aby zdefiniować znak wodny. W języku SQL użyj klauzuli WATERMARK , aby zdefiniować znak wodny:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Używanie znaków wodnych w sprzężeniach strumień-strumień

W przypadku sprzężeń strumieniowych należy zdefiniować znacznik po obu stronach sprzężenia oraz klauzulę interwału czasowego. Ponieważ każde źródło przyłączenia ma niepełny widok danych, klauzula przedziału czasowego jest konieczna, aby poinformować silnik strumieniowy, kiedy dalsze dopasowania nie są możliwe. Klauzula interwału czasu musi używać tych samych pól używanych do definiowania znaków wodnych.

Ze względu na to, że każdy strumień wymaga różnych progów dla znaków wodnych, strumienie nie muszą mieć tych samych progów. Aby uniknąć brakujących danych, silnik przesyłania strumieniowego utrzymuje globalny znak wodny oparty na najwolniejszym strumieniu.

Poniższy przykład łączy strumień wyświetleń reklam i strumień kliknięć użytkowników na reklamach. W tym przykładzie kliknięcie musi nastąpić w ciągu 3 minut od wyświetlenia. Po upływie 3-minutowego interwału czasu wiersze ze stanu, którego nie można już dopasować, zostaną porzucone.

Python

from pyspark import pipelines as dp

dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Wykonaj agregacje okienne za pomocą znaczników czasowych

Typową operacją stanową na danych przesyłanych strumieniowo jest agregacja z użyciem okien. Agregacje okienne są podobne do agregacji pogrupowanych, z tą różnicą, że wartości agregujące są zwracane dla zestawu wierszy będących częścią zdefiniowanego okna.

Okno można zdefiniować jako określoną długość, a operację agregacji można wykonać na wszystkich wierszach, które są częścią tego okna. Spark Streaming obsługuje trzy typy okien:

  • Okna stałoczasowe (stałe): szereg stałych rozmiarów, nienakładających się i ciągłych interwałów czasowych. Rekord wejściowy należy tylko do jednego okna.
  • Okna przesuwne: Podobnie jak okna wirujące, okna przesuwne są o stałym rozmiarze, ale okna mogą się nakładać, a rekord może znajdować się w wielu oknach.

Gdy dane docierają po końcu okna plus długości znaku wodnego, żadne nowe dane nie są akceptowane dla okna, wynik agregacji zostaje wygenerowany, a stan okna zostaje usunięty.

Poniższy przykład oblicza sumę wyświetleń co 5 minut przy użyciu stałego okna. W tym przykładzie klauzula select używa aliasu impressions_window, a następnie samo okno jest definiowane jako część klauzuli GROUP BY . Okno musi być oparte na tej samej kolumnie znacznika czasu co znak wodny— kolumna clickTimestamp w tym przykładzie.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

W języku Python podobny przykład do obliczania zysku w stałych oknach godzinowych:

from pyspark import pipelines as dp

@dp.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Deduplikacja rekordów przesyłania strumieniowego

Strukturalne przesyłanie strumieniowe ma gwarancje dokładnego jednokrotnego przetwarzania, ale nie deduplikuje automatycznie rekordów ze źródeł danych. Na przykład, ponieważ wiele kolejek komunikatów ma gwarancje co najmniej jednokrotnego dostarczenia, podczas odczytu z jednej z tych kolejek komunikatów należy oczekiwać duplikatów zapisów. Za pomocą dropDuplicatesWithinWatermark() funkcji można deduplikować rekordy w dowolnym określonym polu, usuwając duplikaty ze strumienia, nawet jeśli niektóre pola różnią się (np. czas zdarzenia lub czas przybycia). Aby użyć funkcji dropDuplicatesWithinWatermark(), należy określić znak wodny. Wszystkie zduplikowane dane, które docierają w zakresie czasu określonym przez znak wodny, są odrzucane.

Uporządkowane dane są ważne, ponieważ dane poza kolejnością powodują nieprawidłowe skoki wartości znacznika postępu. Następnie, gdy pojawią się starsze dane, są uznawane za opóźnione i porzucone. Użyj opcji withEventTimeOrder, aby przetworzyć początkową migawkę zgodnie z kolejnością znacznika czasu określonego w znaku wodnym. Tę withEventTimeOrder opcję można zadeklarować w kodzie definiującym zestaw danych lub w ustawieniach potoku przy użyciu polecenia spark.databricks.delta.withEventTimeOrder.enabled. Przykład:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Uwaga / Notatka

Opcja withEventTimeOrder jest obsługiwana tylko w języku Python.

W poniższym przykładzie dane są uporządkowane według clickTimestamp, a rekordy przychodzące w ciągu 5 sekund od siebie nawzajem, które zawierają zduplikowane kolumny userId i clickAdId, są pomijane.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Optymalizowanie konfiguracji potoku pod kątem przetwarzania stanowego

Aby zapobiec problemom produkcyjnym i nadmiernemu opóźnieniu, usługa Databricks zaleca włączenie zarządzania stanem opartym na bazie bazy danych RocksDB na potrzeby przetwarzania strumienia stanowego, szczególnie jeśli przetwarzanie wymaga zaoszczędzenia dużej ilości stanu pośredniego.

Potoki bezserwerowe automatycznie zarządzają konfiguracjami przechowywania stanu.

Zarządzanie stanem opartym na bazie bazy danych RocksDB można włączyć, ustawiając następującą konfigurację przed wdrożeniem potoku:

{
  "configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Aby dowiedzieć się więcej na temat magazynu stanów bazy danych RocksDB, w tym zaleceń dotyczących konfiguracji bazy danych RocksDB, zobacz Configure RocksDB state store on Azure Databricks (Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks).