Udostępnij przez


Wybierz tryb danych wyjściowych dla Structured Streaming

W tym artykule omówiono wybieranie trybu wyjściowego dla procesu stanowego przesyłania strumieniowego. Tylko strumienie stanowe zawierające agregacje wymagają konfiguracji trybu wyjściowego.

Łączenia obsługują tylko tryb dołączania wyników, a tryb wyjściowy nie ma wpływu na deduplikację. Dowolne operatory mapGroupsWithState stanowe i flatMapGroupsWithState emitują rekordy przy użyciu własnej logiki niestandardowej, więc tryb wyjściowy strumienia nie ma wpływu na ich zachowanie.

W przypadku przesyłania strumieniowego bezstanowego wszystkie tryby wyjściowe zachowują się tak samo.

Aby poprawnie skonfigurować tryb wyjściowy, musisz zrozumieć stanowe przesyłanie strumieniowe, znaki wodne i wyzwalacze. Odwiedź następujące artykuły:

Co to jest tryb wyjściowy?

Tryb wyjściowy zapytania przesyłania strumieniowego ze strukturą określa, które rekordy operatory zapytania emitują podczas każdego wyzwalacza. Trzy typy rekordów, które mogą być emitowane, to:

  • Rejestry, które przyszłe przetwarzanie nie zmienia.
  • Rekordy, które uległy zmianie od ostatniego wyzwalacza.
  • Wszystkie rekordy w tabeli stanów.

Znajomość emisji typów rekordów jest ważna dla operatorów z pamięcią, ponieważ określony wiersz generowany przez operator z pamięcią może się zmieniać w zależności od wyzwalacza. Na przykład, gdy operator agregacji przesyłania strumieniowego otrzymuje więcej wierszy dla określonego okna, wartości agregacji tego okna mogą ulec zmianie między wyzwalaczami.

W przypadku operatorów bezstanowych rozróżnienie między typami rekordów nie ma wpływu na zachowanie operatora. Rekordy emitowane przez operatora bezstanowego podczas wyzwalania są zawsze rekordami źródłowymi przetwarzanymi podczas tego wyzwalania.

Dostępne tryby wyjściowe

Istnieją trzy tryby wyjściowe, które informują operatora, które rekordy mają być wysyłane podczas określonego wyzwalacza.

Tryb wyjściowy opis
Tryb dołączania (ustawienie domyślne) Domyślnie zapytania przesyłane strumieniowo działają w trybie dołączania. W tym trybie operatory emitują tylko wiersze, które nie zmieniają się w przyszłych wyzwalaczach. Operatory stanowe używają wodowskazu, aby określić, kiedy tak się stanie.
tryb aktualizacji W trybie aktualizacji operatory emitują wszystkie wiersze, które zmieniły się podczas wyzwalacza, nawet jeśli emitowany rekord może ulec zmianie w kolejnym wyzwalaczu.
Tryb pełny Tryb kompletny działa tylko z agregacjami przesyłania strumieniowego. W trybie pełnym wszystkie wiersze wynikowe wytworzone przez operatora są przekazywane dalej.

Zagadnienia dotyczące środowiska produkcyjnego

W przypadku wielu operacji przesyłania strumieniowego zachowujących stan, musisz wybrać między trybem dołączania a trybem aktualizacji. W poniższych sekcjach opisano zagadnienia, które mogą informować o twojej decyzji.

Uwaga

Tryb kompletny ma pewne aplikacje, ale może działać słabo w miarę skalowania danych. Databricks zaleca używanie zmaterializowanych widoków, aby uzyskać gwarancje semantyczne związane z kompletnym trybem działania w kontekście przetwarzania przyrostowego dla wielu operacji stanowych. Zobacz zmaterializowane widoki.

Semantyka aplikacji

Semantyka aplikacji opisuje sposób używania danych przesyłanych strumieniowo przez aplikacje podrzędne.

Jeśli usługi podrzędne muszą wykonać jedną akcję dla każdego podrzędnego zapisu, w większości przypadków użyj trybu dołączania. Jeśli na przykład masz usługę powiadomień podrzędnych wysyłającą powiadomienia dla każdego nowego rekordu zapisanego w ujściu, tryb dołączania gwarantuje, że każdy rekord jest zapisywany tylko raz. Tryb aktualizacji zapisuje rekord za każdym razem, gdy zmienia się informacje o stanie, co spowodowałoby liczne aktualizacje.

Jeśli usługi podrzędne potrzebują nowych wyników, włączenie trybu aktualizacji gwarantuje, że ujście pozostanie tak aktualne, jak to możliwe. Przykłady obejmują model uczenia maszynowego, który odczytuje funkcje w czasie rzeczywistym lub pulpit nawigacyjny analizy śledzący agregacje w czasie rzeczywistym.

Zgodność operatora i ujścia

Przesyłanie strumieniowe ze strukturą nie obsługuje wszystkich operacji dostępnych na platformie Apache Spark, a niektóre operacje przesyłania strumieniowego nie są obsługiwane we wszystkich trybach wyjściowych. Aby uzyskać więcej informacji na temat ograniczeń operatora, zobacz dokumentację przesyłania strumieniowego OSS.

Nie wszystkie ujścia obsługują wszystkie tryby wyjściowe. Zarówno Delta Lake, która obsługuje wszystkie tabele zarządzane przez Unity Catalog, jak i Kafka obsługują wszystkie tryby wyjściowe. Aby uzyskać więcej informacji na temat zgodności obsługi ujścia, zobacz dokumentację przesyłania strumieniowego OSS.

Opóźnienie i koszty

Tryb danych wyjściowych ma wpływ na czas, jaki musi upłynąć przed zapisaniem rekordu, a częstotliwość i ilość zapisanych danych mogą mieć wpływ na koszty związane z potokami przesyłania strumieniowego.

Tryb dołączania zmusza operatorów stanowych do emitowania wyników tylko po sfinalizowaniu wyników stanowych, co trwa co najmniej tyle, co opóźnienie znacznika wody. Opóźnienie znaku wodnego 1 hour w trybie dołączania danych wyjściowych oznacza, że rekordy mają co najmniej 1-godzinne opóźnienie, zanim zostaną wysłane dalej.

Tryb aktualizacji skutkuje jednym zapisem na wyzwalacz dla każdej wartości zagregowanej. Jeśli Twój system pobiera opłaty za zapis na rekord, może to być kosztowne, jeśli rekordy są aktualizowane wiele razy przed upływem opóźnienia tzw. znacznika wodnego.

Przykłady konfiguracji

W poniższych przykładach kodu pokazano konfigurowanie trybu danych wyjściowych dla strumieniowego przesyłania aktualizacji do tabel Unity Catalog.

Python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Skala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

Zobacz dokumentację systemu operacyjnego dla PySpark DataStreamWriter.outputMode lub Scala DataStreamWriter.outputMode.

Przykład stanowego przesyłania strumieniowego i trybów wyjściowych

Poniższy przykład ma pomóc Ci zrozumieć, jak tryb wyjściowy współdziała ze znacznikami wodnymi w przypadku przesyłania strumieniowego z zachowaniem stanu.

Rozważ agregację przesyłania strumieniowego, która oblicza całkowity przychód wygenerowany co godzinę w sklepie z opóźnieniem znacznika czasu wynoszącym 15 minut. Pierwszy mikrobatch przetwarza następujące rekordy:

  • $15 o 14.40
  • 10 USD o 14:30
  • $30 o 15:10

W tym momencie znak czasowy silnika to 14:55, ponieważ odejmuje 15 minut (opóźnienie) od maksymalnego widocznego czasu (15:10). Operator agregacji przesyłania strumieniowego ma następujące elementy w swoim stanie:

  • [2pm, 3pm]: $25
  • [3pm, 4pm]: $30

W poniższej tabeli przedstawiono, co się stanie w każdym trybie wyjściowym.

Tryb wyjścia Wynik i przyczyna
Dołącz Operator agregacji strumieniowej nie przekazuje żadnych danych dalej. Wynika to z faktu, że oba te okna mogą ulec zmianie w miarę pojawiania się nowych wartości z kolejnym wyzwalaczem: znak wodny 2:55pm wskazuje, że rekordy po 23:55 mogą nadal przybywać, a te rekordy mogą należeć do okna [2pm, 3pm] lub okna [3pm, 4pm].
Aktualizacja Operator emituje oba rekordy, ponieważ oba rekordy otrzymały aktualizacje.
Ukończ Operator emituje wszystkie rekordy.

Teraz załóżmy, że strumień odbiera jeszcze jeden rekord:

  • 20 dolarów o 15:20

Znak wodny aktualizuje się na 15:05, ponieważ silnik odejmuje 15 minut od 15:20. W tym momencie operator agregacji strumieniowej ma następujące elementy w swoim stanie:

  • [2pm, 3pm]: $25
  • [3pm, 4pm]: $50

W poniższej tabeli przedstawiono, co się stanie w każdym trybie wyjściowym.

Tryb wyjścia Wynik i przyczyna
Dołącz Operator agregacji strumieniowej zauważa, że znacznik czasu 15:05 przekracza koniec okna [2pm, 3pm]. Zgodnie z definicją znaku wodnego to okno nie może już ulec zmianie, więc emituje okno [2pm, 3pm].
Aktualizacja Operator agregacji strumieniowej emituje okno [3pm, 4pm], ponieważ wartość stanu zmieniła się z $30 na $50.
Ukończ Operator emituje wszystkie rekordy.

Poniżej przedstawiono podsumowanie zachowania operatorów stanowych w każdym trybie dołączania:

  • W trybie dołączania zapisuj rekordy raz po opóźnieniu czasowym.
  • W trybie aktualizacji zapisuj rekordy, które uległy zmianie od poprzedniego wyzwalacza.
  • W trybie pełnym zapisz wszystkie rekordy, które kiedykolwiek zostały wygenerowane przez operator stanowy.