Udostępnij przez


Odzyskaj potok po awarii punktu kontrolnego przesyłania strumieniowego

Na tej stronie opisano sposób odzyskiwania potoku w deklaratywnych potokach Lakeflow Spark, gdy punkt kontrolny przesyłania strumieniowego staje się nieprawidłowy lub uszkodzony.

Co to jest punkt kontrolny przesyłania strumieniowego?

W strumieniu ze strukturą platformy Apache Spark punkt kontrolny jest mechanizmem służącym do utrwalania stanu zapytania przesyłania strumieniowego. Ten stan obejmuje:

  • Informacje o postępie: które przesunięcia ze źródła zostały przetworzone.
  • Stan pośredni: dane, które muszą być przechowywane w mikrosadach na potrzeby operacji stanowych (na przykład agregacji, mapGroupsWithState).
  • Metadane: informacje o wykonywaniu zapytania przesyłania strumieniowego.

Punkty kontrolne są niezbędne do zapewnienia odporności na uszkodzenia i spójności danych w aplikacjach przesyłania strumieniowego:

  • Odporność na uszkodzenia: jeśli aplikacja przesyłania strumieniowego zakończy się niepowodzeniem (na przykład z powodu awarii węzła, awarii aplikacji), punkt kontrolny umożliwia aplikacji ponowne uruchomienie z ostatniego pomyślnego stanu punktu kontrolnego zamiast ponownego przetwarzania wszystkich danych od początku. Zapobiega to utracie danych i zapewnia przetwarzanie przyrostowe.
  • Przetwarzanie dokładnie jednokrotne: w przypadku wielu źródeł przesyłania strumieniowego punkty kontrolne w połączeniu z ujściami idempotentnymi umożliwiają dokładnie jednokrotne przetwarzanie gwarantuje, że każdy rekord jest przetwarzany dokładnie raz, nawet w obliczu awarii, zapobiegając duplikatom lub pominięciem.
  • Zarządzanie stanem: w przypadku przekształceń stanowych punkty kontrolne utrzymują stan wewnętrzny tych operacji, co umożliwia prawidłowe kontynuowanie przetwarzania nowych danych na podstawie skumulowanego stanu historycznego.

Punkty kontrolne potoku

Potoki bazują na Structured Streaming i upraszczają podstawowe zarządzanie punktami kontrolnymi, oferując podejście deklaratywne. Podczas definiowania tabeli przesyłania strumieniowego w potoku istnieje stan punktu kontrolnego dla każdego przepływu zapisu w tabeli przesyłania strumieniowego. Te lokalizacje punktów kontrolnych są wewnętrzne dla potoku i nie są dostępne dla użytkowników.

Zwykle nie trzeba zarządzać podstawowymi punktami kontrolnymi dla tabel przesyłania strumieniowego ani nie rozumieć ich, z wyjątkiem następujących przypadków:

  • Przewijanie i odtwarzanie: jeśli chcesz ponownie przetworzyć dane z określonego punktu w czasie przy zachowaniu bieżącego stanu tabeli, musisz zresetować punkt kontrolny tabeli przesyłania strumieniowego.
  • Odzyskiwanie po niepowodzeniu lub uszkodzeniu punktu kontrolnego: jeśli zapisywanie zapytania w tabeli przesyłania strumieniowego nie powiodło się z powodu błędów związanych z punktem kontrolnym, powoduje to trudną awarię, a zapytanie nie może kontynuować dalszych postępów. Istnieją trzy podejścia, których można użyć do odzyskania po tej klasie awarii:
    • Odświeżanie pełnej tabeli: spowoduje zresetowanie tabeli i wyczyszczenie istniejących danych.
    • Pełne odświeżanie tabeli za pomocą kopii zapasowej i wypełniania kopii zapasowej: należy utworzyć kopię zapasową tabeli przed wykonaniem pełnego odświeżania tabeli i wypełniania starych danych, ale jest to bardzo kosztowne i powinno być ostatnią metodą.
    • Zresetuj punkt kontrolny i kontynuuj przyrostowo: jeśli nie możesz sobie pozwolić na utratę istniejących danych, musisz przeprowadzić selektywne resetowanie punktu kontrolnego dla przepływów przesyłania strumieniowego, których dotyczy problem.

Przykład: Niepowodzenie potoku z powodu zmiany kodu

Rozważmy scenariusz, w którym masz potok, który przetwarza strumień danych zmian wraz z początkową migawką tabeli z systemu przechowywania danych w chmurze, takim jak Amazon S3, i zapisuje w tabeli przesyłania strumieniowego SCD-1.

Potok ma dwa przepływy przesyłania strumieniowego:

  • customers_incremental_flow: przyrostowo odczytuje źródło danych CDC tabeli źródłowej customer , filtruje zduplikowane rekordy i umieszcza je w tabeli docelowej.
  • customers_snapshot_flow: Jednorazowo odczytaj początkową migawkę tabeli źródłowej customers i upserts rekordy do tabeli docelowej.

Przykład potoków CDC do odzyskiwania po awarii punktu kontrolnego

@dp.temporary_view(name="customers_incremental_view")
  def query():
    return (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .load(customers_incremental_path)
        .dropDuplicates(["customer_id"])
    )

@dp.temporary_view(name="customers_snapshot_view")
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(customers_snapshot_path)
        .select("*")
    )

dp.create_streaming_table("customers")

dp.create_auto_cdc_flow(
    flow_name = "customers_incremental_flow",
    target = "customers",
    source = "customers_incremental_view",
    keys = ["customer_id"],
    sequence_by = col("sequenceNum"),
    apply_as_deletes = expr("operation = 'DELETE'"),
    apply_as_truncates = expr("operation = 'TRUNCATE'"),
    except_column_list = ["operation", "sequenceNum"],
    stored_as_scd_type = 1
)
dp.create_auto_cdc_flow(
    flow_name = "customers_snapshot_flow",
    target = "customers",
    source = "customers_snapshot_view",
    keys = ["customer_id"],
    sequence_by = lit(0),
    stored_as_scd_type = 1,
    once = True
)

Po wdrożeniu tego potoku zostanie uruchomiony pomyślnie i rozpocznie przetwarzanie zestawienia zmian danych i początkowej migawki.

Później zdajesz sobie sprawę, że logika deduplikacji w customers_incremental_view zapytaniu jest nadmiarowa i powoduje wąskie gardło wydajności. Usuń element , dropDuplicates() aby zwiększyć wydajność:

@dp.temporary_view(name="customers_raw_view")
  def query():
    return (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .load()
        # .dropDuplicates()
    )

Po usunięciu interfejsu API i ponownym uruchomieniu dropDuplicates() potoku aktualizacja kończy się niepowodzeniem z powodu następującego błędu:

Streaming stateful operator name does not match with the operator in state metadata.
This is likely to happen when a user adds/removes/changes stateful operators of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)];
Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST

Ten błąd wskazuje, że zmiana nie jest dozwolona z powodu niezgodności między stanem punktu kontrolnego a bieżącą definicją zapytania, co uniemożliwia dalsze postępy potoku.

Błędy związane z punktem kontrolnym mogą wystąpić z różnych powodów poza usunięciem interfejsu dropDuplicates API. Typowe scenariusze obejmują:

  • Dodawanie lub usuwanie operatorów stanowych (na przykład wprowadzenie lub upuszczanie dropDuplicates() lub agregacje) w istniejącym zapytaniu przesyłanym strumieniowo.
  • Dodawanie, usuwanie lub łączenie źródeł przesyłania strumieniowego w wcześniej określonym zapytaniu kontrolnym (na przykład łączenie istniejącego zapytania przesyłania strumieniowego z nowym lub dodawanie/usuwanie źródeł z istniejącej operacji unii).
  • Modyfikowanie schematu stanu stanowych operacji przesyłania strumieniowego (takich jak zmiana kolumn używanych do deduplikacji lub agregacji).

Aby uzyskać pełną listę obsługiwanych i nieobsługiwanych zmian, zobacz Przewodnik przesyłania strumieniowego ze strukturą platformy Spark i typy zmian w zapytaniach przesyłania strumieniowego ze strukturą.

Opcje odzyskiwania

Istnieją trzy strategie odzyskiwania, w zależności od wymagań dotyczących trwałości danych i ograniczeń zasobów:

Methods Złożoność Koszt Potencjalna utrata danych Potencjalne duplikowanie danych Wymaga początkowej migawki Resetowanie pełnej tabeli
Odświeżanie pełnej tabeli Low Średni Tak (jeśli nie jest dostępna żadna początkowa migawka lub jeśli nieprzetworzone pliki zostały usunięte ze źródła). Nie (Aby zastosować tabelę docelową zmian). Tak Tak
Odświeżanie pełnej tabeli za pomocą kopii zapasowej i wypełniania kopii zapasowych Średni High Nie. Nie (w przypadku ujściów idempotentnych. Na przykład auto CDC). Nie. Nie.
Resetuj punkt kontrolny tabeli Medium-High (Średni dla źródeł tylko do dołączania, które zapewniają niezmienne przesunięcia). Low Nie (wymaga starannego rozważenia). Nie (dla idempotentnych składników zapisywania. Na przykład automatyczne cdC tylko do tabeli docelowej. Nie. Nie.

Medium-High złożoność zależy od typu źródła przesyłania strumieniowego i złożoności zapytania.

Rekomendacje

  • Użyj odświeżania pełnej tabeli, jeśli nie chcesz zajmować się złożonością resetowania punktu kontrolnego i możesz ponownie skompilować całą tabelę. Umożliwi to również wprowadzanie zmian w kodzie.
  • Użyj pełnego odświeżania tabeli z kopiami zapasowymi i wypełnianiem, jeśli nie chcesz zajmować się złożonością resetowania punktów kontrolnych, i jesteś w porządku z dodatkowym kosztem tworzenia kopii zapasowej i wypełniania danych historycznych.
  • Użyj punktu kontrolnego tabeli resetowania, jeśli musisz zachować istniejące dane w tabeli i kontynuować przetwarzanie nowych danych przyrostowo. Jednak takie podejście wymaga starannej obsługi resetowania punktu kontrolnego w celu sprawdzenia, czy istniejące dane w tabeli nie zostaną utracone i że potok może kontynuować przetwarzanie nowych danych.

Resetowanie punktu kontrolnego i kontynuowanie przyrostowe

Aby zresetować punkt kontrolny i kontynuować przetwarzanie przyrostowo, wykonaj następujące kroki:

  1. Zatrzymaj potok: upewnij się, że potok nie ma uruchomionych aktywnych aktualizacji.

  2. Określ pozycję początkową dla nowego punktu kontrolnego: zidentyfikuj ostatnie pomyślne przesunięcie lub znacznik czasu, z którego chcesz kontynuować przetwarzanie. Zazwyczaj jest to najnowsze przesunięcie pomyślnie przetworzone przed wystąpieniem błędu.

    W powyższym przykładzie, ponieważ odczytujesz pliki JSON przy użyciu autoloadera, możesz użyć modifiedAfter opcji , aby określić pozycję początkową dla nowego punktu kontrolnego. Ta opcja umożliwia ustawienie znacznika czasu, gdy autoloader powinien rozpocząć przetwarzanie nowych plików.

    W przypadku źródeł platformy Kafka można użyć startingOffsets opcji , aby określić przesunięcia, z których zapytanie przesyłane strumieniowo powinno rozpocząć przetwarzanie nowych danych.

    W przypadku źródeł usługi Delta Lake można użyć startingVersion opcji , aby określić wersję, z której zapytanie przesyłane strumieniowo powinno rozpocząć przetwarzanie nowych danych.

  3. Wprowadź zmiany w kodzie: możesz zmodyfikować zapytanie przesyłane strumieniowo, dropDuplicates() aby usunąć interfejs API lub wprowadzić inne zmiany. Sprawdź również, czy dodano opcję do ścieżki modifiedAfter odczytu autoloadera.

    @dp.temporary_view(name="customers_incremental_view")
    def query():
        return (
        spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.includeExistingFiles", "true")
            .option("modifiedAfter", "2025-04-09T06:15:00")
            .load(customers_incremental_path)
            # .dropDuplicates(["customer_id"])
        )
    

    Uwaga / Notatka

    Podanie nieprawidłowego modifiedAfter znacznika czasu może prowadzić do utraty lub duplikowania danych. Sprawdź, czy znacznik czasu jest poprawnie ustawiony, aby uniknąć ponownego przetwarzania starych danych lub brakujących nowych danych.

    Jeśli zapytanie ma połączenie sprzężenia strumienia lub strumienia strumienia, należy zastosować powyżej strategię dla wszystkich uczestniczących źródeł przesyłania strumieniowego. Przykład:

    cdc_1 = spark.readStream.format("cloudFiles")...
    cdc_2 = spark.readStream.format("cloudFiles")...
    cdc_source = cdc_1..union(cdc_2)
    
  4. Zidentyfikuj nazwy przepływu skojarzone z tabelą przesyłania strumieniowego, dla której chcesz zresetować punkt kontrolny. W tym przykładzie jest to customers_incremental_flow. Nazwę przepływu można znaleźć w kodzie potoku lub sprawdzając interfejs użytkownika potoku lub dzienniki zdarzeń potoku.

  5. Zresetuj punkt kontrolny: utwórz notes języka Python i dołącz go do klastra usługi Azure Databricks.

    Aby zresetować punkt kontrolny, potrzebne będą następujące informacje:

    • Adres URL obszaru roboczego usługi Azure Databricks
    • Identyfikator potoku
    • Nazwy przepływu, dla których resetujesz punkt kontrolny
    import requests
    import json
    
    # Define your Databricks instance and pipeline ID
    databricks_instance = "<DATABRICKS_URL>"
    token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
    pipeline_id = "<YOUR_PIPELINE_ID>"
    flows_to_reset = ["<YOUR_FLOW_NAME>"]
    # Set up the API endpoint
    endpoint = f"{databricks_instance}/api/2.0/pipelines/{pipeline_id}/updates"
    
    
    # Set up the request headers
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    
    # Define the payload
    payload = {
        "reset_checkpoint_selection": flows_to_reset
    }
    
    # Make the POST request
    response = requests.post(endpoint, headers=headers, data=json.dumps(payload))
    
    # Check the response
    if response.status_code == 200:
        print("Pipeline update started successfully.")
    else:
        print(f"Error: {response.status_code}, {response.text}")
    
  6. Uruchom potok: potok rozpoczyna przetwarzanie nowych danych z określonej pozycji początkowej przy użyciu nowego punktu kontrolnego, zachowując istniejące dane tabeli przy jednoczesnym kontynuowaniu przetwarzania przyrostowego.

Najlepsze rozwiązania

  • Unikaj używania prywatnych funkcji w wersji zapoznawczej w środowisku produkcyjnym.
  • Przetestuj zmiany przed wprowadzeniem zmian w środowisku produkcyjnym.
    • Utwórz potok testowy, najlepiej w niższym środowisku. Jeśli nie jest to możliwe, spróbuj użyć innego katalogu i schematu dla testu.
    • Odtwórz błąd.
    • Zastosuj wprowadzone zmiany.
    • Zweryfikuj wyniki i podjąć decyzję wno-go.
    • Wprowadzanie zmian w potokach produkcyjnych.