Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Użyj oczekiwań, aby zastosować ograniczenia jakości, które weryfikują dane podczas przepływów przez potoki ETL. Oczekiwania zapewniają lepszy wgląd w metryki jakości danych i umożliwiają niepowodzenie aktualizacji lub porzucanie rekordów podczas wykrywania nieprawidłowych rekordów.
Ten artykuł zawiera omówienie oczekiwań, w tym przykłady składni i opcje zachowania. Aby uzyskać bardziej zaawansowane przypadki użycia i zalecane najlepsze rozwiązania, zobacz Zalecenia dotyczące oczekiwań i zaawansowane wzorce.
Jakie są oczekiwania?
Oczekiwania są klauzulami opcjonalnymi w widoku zmaterializowanym potoku, tabeli przesyłania strumieniowego lub instrukcjami tworzenia widoku, które stosują kontrole jakości danych dla każdego rekordu przechodzącego przez zapytanie. Oczekiwania używają standardowych instrukcji logicznych SQL do określania ograniczeń. Można połączyć wiele oczekiwań dla pojedynczego zestawu danych i ustawić oczekiwania we wszystkich deklaracjach zestawów danych w potoku.
W poniższych sekcjach przedstawiono trzy składniki oczekiwań i podano przykłady składni.
Nazwa oczekiwania
Każde oczekiwanie musi mieć nazwę, która jest używana jako identyfikator do śledzenia i monitorowania oczekiwań. Wybierz nazwę, która komunikuje weryfikowane metryki. W poniższym przykładzie zdefiniowano oczekiwanie valid_customer_age na potwierdzenie, że wiek wynosi od 0 do 120 lat:
Ważne
Nazwa oczekiwania musi być unikatowa dla danego zestawu danych. Możesz ponownie użyć oczekiwań w wielu zestawach danych w pipeline'u. Zobacz Oczekiwania dotyczące przenoszenia i wielokrotnego użytku.
Python
@dp.table
@dp.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
Ograniczenie do oceny
Klauzula ograniczenia jest instrukcją warunkową SQL, która musi mieć wartość true lub false dla każdego rekordu. Ograniczenie zawiera rzeczywistą logikę sprawdzania poprawności. Gdy rekord zakończy się niepowodzeniem tego warunku, zostanie wyzwolone oczekiwanie.
Ograniczenia muszą używać prawidłowej składni SQL i nie mogą zawierać następujących elementów:
- Niestandardowe funkcje języka Python
- Wywołania usług zewnętrznych
- Podzapytania odwołujące się do innych tabel
Poniżej przedstawiono przykłady ograniczeń, które można dodać do instrukcji tworzenia zestawu danych:
Python
Składnia ograniczenia w języku Python to:
@dp.expect(<constraint-name>, <constraint-clause>)
Można określić wiele ograniczeń:
@dp.expect(<constraint-name>, <constraint-clause>)
@dp.expect(<constraint2-name>, <constraint2-clause>)
Examples:
# Simple constraint
@dp.expect("non_negative_price", "price >= 0")
# SQL functions
@dp.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dp.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dp.expect("non_negative_price", "price >= 0")
@dp.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dp.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dp.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
Składnia ograniczenia w języku SQL to:
CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> )
Wiele ograniczeń musi być rozdzielonych przecinkami:
CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> ),
CONSTRAINT <constraint2-name> EXPECT ( <constraint2-clause> )
Examples:
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0),
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
Akcja dotycząca nieprawidłowego rekordu
Należy określić akcję, aby określić, co się stanie, gdy rekord zakończy się niepowodzeniem sprawdzania poprawności. W poniższej tabeli opisano dostępne akcje:
| Akcja | Składnia SQL | Składnia języka Python | Wynik |
|---|---|---|---|
| ostrzegaj (ustawienie domyślne) | EXPECT |
dp.expect |
Nieprawidłowe rekordy są zapisywane do celu docelowego. |
| upuść | EXPECT ... ON VIOLATION DROP ROW |
dp.expect_or_drop |
Nieprawidłowe rekordy są porzucane, zanim dane zostaną zapisane w obiekcie docelowym. Liczba porzuconych rekordów jest rejestrowana wraz z innymi metrykami zestawu danych. |
| niepowodzenie | EXPECT ... ON VIOLATION FAIL UPDATE |
dp.expect_or_fail |
Nieprawidłowe rekordy uniemożliwiają pomyślne zaktualizowanie. Interwencja ręczna jest wymagana przed ponownym przetworzeniem. To oczekiwanie powoduje awarię pojedynczego przepływu danych i nie powoduje niepowodzenia innych przepływów w pipeline'u. |
Możesz również zaimplementować zaawansowaną logikę w celu kwarantanny nieprawidłowych rekordów bez niepowodzenia lub porzucania danych. Przejrzyj nieprawidłowe rekordy w kwarantannie .
Metryki śledzenia oczekiwań
Możesz zobaczyć metryki śledzenia dla warn lub drop działań z poziomu interfejsu użytkownika rurociągu. Ponieważ fail aktualizacja kończy się niepowodzeniem po wykryciu nieprawidłowego rekordu, metryki nie są rejestrowane.
Aby wyświetlić metryki oczekiwań, wykonaj następujące kroki:
- Na pasku bocznym obszaru roboczego usługi Azure Databricks kliknij pozycję Zadania i potoki.
- Kliknij nazwę swojego potoku.
- Kliknij zestaw danych ze zdefiniowanym oczekiwaniem.
- Wybierz kartę Jakość danych na prawym pasku bocznym.
Można wyświetlić metryki jakości danych, wysyłając zapytanie do dziennika zdarzeń deklaratywnych potoków Lakeflow Spark. Zobacz Zapytania dotyczące jakości danych lub metryki oczekiwań.
Zachowaj nieprawidłowe rekordy
Zachowywanie nieprawidłowych rekordów jest domyślnym zachowaniem oczekiwań. Użyj operatora expect w sytuacji, gdy chcesz zachować rekordy naruszające oczekiwania, ale jednocześnie zbierać metryki dotyczące liczby rekordów spełniających lub niespełniających ograniczeń. Rekordy naruszające oczekiwania są dodawane do docelowego zestawu danych wraz z prawidłowymi rekordami:
Python
@dp.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Usuwanie nieprawidłowych rekordów
Użyj operatora , expect_or_drop aby zapobiec dalszemu przetwarzaniu nieprawidłowych rekordów. Rekordy naruszające oczekiwania są porzucane z docelowego zestawu danych:
Python
@dp.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Niepowodzenie w nieprawidłowych rekordach
Jeśli nieprawidłowe rekordy są niedopuszczalne, użyj expect_or_fail operatora , aby zatrzymać wykonywanie natychmiast, gdy rekord zakończy się niepowodzeniem weryfikacji. Jeśli operacja jest aktualizacją tabeli, system atomowo cofa transakcję.
Python
@dp.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Ważne
Jeśli masz wiele przepływów równoległych zdefiniowanych w rurociągu, awaria pojedynczego przepływu nie wpływa na awarię innych przepływów.
Rozwiązywanie problemów z nieudanymi aktualizacjami, które nie spełniają oczekiwań
Gdy potok zakończy się niepowodzeniem z powodu naruszenia założeń, należy naprawić kod potoku, aby poprawnie obsłużyć nieprawidłowe dane, zanim uruchomisz potok ponownie.
pl-PL: Oczekiwania skonfigurowane tak, aby obsługiwać niepowodzenia potoków, modyfikują plan zapytań Spark dla transformacji w celu śledzenia informacji niezbędnych do wykrywania i zgłaszania naruszeń. Te informacje umożliwiają określenie, który rekord wejściowy spowodował naruszenie wielu zapytań. Potoki Deklaratywne Lakeflow Spark dostarczają dedykowany komunikat o błędzie do zgłaszania takich naruszeń. Oto przykład komunikatu o naruszeniu oczekiwań:
[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false
Zarządzanie wieloma oczekiwaniami
Uwaga / Notatka
Chociaż język SQL i Python obsługują wiele oczekiwań w jednym zestawie danych, tylko język Python umożliwia grupowanie wielu oczekiwań i określanie akcji zbiorczych.
Istnieje możliwość grupowania wielu oczekiwań i określania zbiorczych akcji przy użyciu funkcji expect_all, expect_all_or_dropi expect_all_or_fail.
Te dekoratory akceptują słownik języka Python jako argument, gdzie klucz jest nazwą oczekiwania, a wartość jest ograniczeniem oczekiwania. Możesz wykorzystać ponownie ten sam zestaw oczekiwań w wielu zestawach danych w potoku danych. Poniżej przedstawiono przykłady poszczególnych operatorów języka expect_all Python:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dp.table
@dp.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dp.table
@dp.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dp.table
@dp.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset
Ograniczenia
- Ponieważ tylko tabele przesyłania strumieniowego i zmaterializowane widoki obsługują oczekiwania, metryki jakości danych są obsługiwane tylko dla tych typów obiektów.
- Metryki jakości danych nie są dostępne, gdy:
- W zapytaniu nie zdefiniowano żadnych oczekiwań.
- Przepływ używa operatora, który nie obsługuje oczekiwań.
- Typ przepływu, taki jak ujścia, nie obsługuje oczekiwań.
- Brak aktualizacji skojarzonej tabeli przesyłania strumieniowego ani zmaterializowanego widoku dla danego przebiegu przepływu.
- Konfiguracja potoku nie obejmuje wymaganych ustawień umożliwiających przechwytywanie metryk, takich jak
pipelines.metrics.flowTimeReporter.enabled.
- W niektórych przypadkach
COMPLETEDprzepływ może nie zawierać metryk. Zamiast tego, metryki są raportowane w każdej mikro-partii w zdarzeniuflow_progressze stanemRUNNING.