다음을 통해 공유


스트리밍 체크포인트 장애로부터 파이프라인 복구

이 페이지에서는 스트리밍 검사점이 유효하지 않거나 손상될 때 Lakeflow Spark 선언적 파이프라인에서 파이프라인을 복구하는 방법을 설명합니다.

스트리밍 검사점이란?

Apache Spark 구조적 스트리밍에서 검사점은 스트리밍 쿼리의 상태를 유지하는 데 사용되는 메커니즘입니다. 이 상태에는 다음이 포함됩니다.

  • 진행률 정보: 처리된 원본의 오프셋입니다.
  • 중간 상태: 상태 저장 작업(예: 집계)을 위해 마이크로 일괄 처리에서 유지 관리해야 하는 데이터입니다 mapGroupsWithState.
  • 메타데이터: 스트리밍 쿼리의 실행에 대한 정보입니다.

검사점은 스트리밍 애플리케이션에서 내결함성 및 데이터 일관성을 보장하는 데 필수적입니다.

  • 내결함성: 스트리밍 애플리케이션이 실패하는 경우(예: 노드 오류, 애플리케이션 크래시) 검사점을 사용하면 처음부터 모든 데이터를 다시 처리하는 대신 마지막으로 성공한 검사점 상태에서 애플리케이션을 다시 시작할 수 있습니다. 이렇게 하면 데이터 손실이 방지되고 증분 처리가 보장됩니다.
  • 정확히 한 번 처리: 많은 스트리밍 원본의 경우 검사점은 idempotent 싱크와 함께 정확히 한 번 처리를 사용하도록 설정하면 오류가 발생하더라도 각 레코드가 정확히 한 번 처리되어 중복 또는 누락을 방지할 수 있습니다.
  • 상태 관리: 상태 저장 변환의 경우 검사점은 이러한 작업의 내부 상태를 유지하므로 스트리밍 쿼리가 누적된 기록 상태에 따라 새 데이터를 계속 올바르게 처리할 수 있습니다.

파이프라인 검사점

파이프라인은 구조적 스트리밍을 기반으로 하며 기본 검사점 관리의 대부분을 추상화하여 선언적 접근 방식을 제공합니다. 파이프라인에서 스트리밍 테이블을 정의할 때 스트리밍 테이블에 쓰는 각 흐름에 대한 검사점 상태가 있습니다. 이러한 검사점 위치는 파이프라인 내부이며 사용자가 액세스할 수 없습니다.

일반적으로 다음 경우를 제외하고 스트리밍 테이블의 기본 검사점을 관리하거나 이해할 필요가 없습니다.

  • 되감기 및 재생: 테이블의 현재 상태를 유지하면서 특정 시점의 데이터를 다시 처리하려면 스트리밍 테이블의 검사점을 다시 설정해야 합니다.
  • 검사점 오류 또는 손상에서 복구: 검사점 관련 오류로 인해 스트리밍 테이블에 쓰는 쿼리가 실패한 경우 하드 오류가 발생하며 쿼리가 더 이상 진행될 수 없습니다. 이 오류 클래스에서 복구하는 데 사용할 수 있는 세 가지 방법이 있습니다.
    • 전체 테이블 새로 고침: 테이블을 다시 설정하여 기존 데이터를 초기화합니다.
    • 백업 및 백필을 사용하여 전체 테이블 새로 고침: 전체 테이블 새로 고침을 수행하기 전에 테이블의 백업을 수행하고 이전 데이터를 백필하지만 비용이 많이 들며 최후의 수단이어야 합니다.
    • 검사점을 다시 설정하여 증분 방식으로 계속합니다. 기존 데이터를 잃을 여유가 없는 경우 영향을 받는 스트리밍 흐름에 대해 선택적 검사점 재설정을 수행해야 합니다.

예: 코드 변경으로 인한 파이프라인 오류

Amazon S3와 같은 클라우드 스토리지 시스템의 초기 테이블 스냅샷과 함께 변경 데이터 피드를 처리하고 SCD-1 스트리밍 테이블에 쓰는 파이프라인이 있는 시나리오를 고려합니다.

파이프라인에는 두 가지 스트리밍 흐름이 있습니다.

  • customers_incremental_flow: 원본 테이블 CDC 피드를 customer 증분 방식으로 읽고, 중복 레코드를 필터링하고, 대상 테이블에 업서트합니다.
  • customers_snapshot_flow: 원본 테이블의 초기 스냅샷을 customers 한 번 읽고 레코드를 대상 테이블에 업서트합니다.

검사점 오류를 복구하기 위한 파이프라인 CDC 예제

@dp.temporary_view(name="customers_incremental_view")
  def query():
    return (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .load(customers_incremental_path)
        .dropDuplicates(["customer_id"])
    )

@dp.temporary_view(name="customers_snapshot_view")
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(customers_snapshot_path)
        .select("*")
    )

dp.create_streaming_table("customers")

dp.create_auto_cdc_flow(
    flow_name = "customers_incremental_flow",
    target = "customers",
    source = "customers_incremental_view",
    keys = ["customer_id"],
    sequence_by = col("sequenceNum"),
    apply_as_deletes = expr("operation = 'DELETE'"),
    apply_as_truncates = expr("operation = 'TRUNCATE'"),
    except_column_list = ["operation", "sequenceNum"],
    stored_as_scd_type = 1
)
dp.create_auto_cdc_flow(
    flow_name = "customers_snapshot_flow",
    target = "customers",
    source = "customers_snapshot_view",
    keys = ["customer_id"],
    sequence_by = lit(0),
    stored_as_scd_type = 1,
    once = True
)

이 파이프라인을 배포한 후 성공적으로 실행되고 변경 데이터 피드 및 초기 스냅샷 처리를 시작합니다.

나중에 쿼리의 중복 제거 논리 customers_incremental_view 가 중복되어 성능 병목 현상이 발생한다는 것을 알게 됩니다. 다음을 dropDuplicates() 제거하여 성능을 향상시킵니다.

@dp.temporary_view(name="customers_raw_view")
  def query():
    return (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .load()
        # .dropDuplicates()
    )

API를 dropDuplicates() 제거하고 파이프라인을 다시 배포한 후 다음 오류와 함께 업데이트가 실패합니다.

Streaming stateful operator name does not match with the operator in state metadata.
This is likely to happen when a user adds/removes/changes stateful operators of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)];
Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST

이 오류는 검사점 상태와 현재 쿼리 정의 간의 불일치로 인해 변경이 허용되지 않으므로 파이프라인이 더 이상 진행되지 않음을 나타냅니다.

검사점 관련 오류는 API 제거 dropDuplicates 외에 다양한 이유로 발생할 수 있습니다. 일반적인 시나리오는 다음과 같습니다.

  • 기존 스트리밍 쿼리에서 상태 저장 연산자(예: 도입 또는 삭제 dropDuplicates() 또는 집계)를 추가하거나 제거합니다.
  • 이전에 검사점이 지정된 쿼리에서 스트리밍 원본 추가, 제거 또는 결합(예: 기존 스트리밍 쿼리를 새 쿼리와 통합하거나 기존 공용 구조체 작업에서 원본 추가/제거).
  • 상태 저장 스트리밍 작업의 상태 스키마 수정(예: 중복 제거 또는 집계에 사용되는 열 변경).

지원되는 변경 내용과 지원되지 않는 변경 내용의 포괄적인 목록은 Spark 구조적 스트리밍 가이드구조적 스트리밍 쿼리의 변경 형식을 참조하세요.

복구 옵션

데이터 내구성 요구 사항 및 리소스 제약 조건에 따라 세 가지 복구 전략이 있습니다.

메서드 복잡성 비용 잠재적 데이터 손실 잠재적인 데이터 중복 초기 스냅샷 필요 전체 테이블 재설정
전체 테이블 새로 고침 Low 미디엄 예(초기 스냅샷을 사용할 수 없거나 원본에서 원시 파일이 삭제된 경우) 아니요(변경 내용 적용 대상 테이블의 경우) Yes Yes
백업 및 백필을 사용하여 전체 테이블 새로 고침 미디엄 High 아니오 아니요(idempotent 싱크의 경우) 예를 들어 자동 CDC입니다.) 아니오 아니오
테이블 검사점 다시 설정 Medium-High(변경할 수 없는 오프셋을 제공하는 추가 전용 원본의 경우 보통) Low 아니요(신중하게 고려해야 합니다.) 아니요(idempotent 기록기. 예를 들어 대상 테이블에만 CDC를 자동으로 지정합니다.) 아니오 아니오

Medium-High 복잡성은 스트리밍 원본 유형과 쿼리의 복잡성에 따라 달라집니다.

Recommendations

  • 검사점 재설정의 복잡성을 처리하지 않으려는 경우 전체 테이블 새로 고침을 사용하고 전체 테이블을 다시 계산할 수 있습니다. 이렇게 하면 코드를 변경할 수 있는 옵션도 제공됩니다.
  • 검사점 재설정의 복잡성을 처리하지 않으려면 백업 및 백필과 함께 전체 테이블 새로 고침을 사용하고 백업을 수행하고 기록 데이터를 백필하는 추가 비용을 감수해도 됩니다.
  • 테이블의 기존 데이터를 유지하고 새 데이터를 증분 방식으로 계속 처리해야 하는 경우 다시 설정 테이블 검사점을 사용합니다. 그러나 이 방법을 사용하려면 테이블의 기존 데이터가 손실되지 않고 파이프라인이 새 데이터를 계속 처리할 수 있는지 확인하기 위해 검사점 재설정을 신중하게 처리해야 합니다.

검사점 다시 설정 및 증분 방식으로 계속

검사점을 재설정하고 증분 방식으로 처리를 계속하려면 다음 단계를 수행합니다.

  1. 파이프라인 중지: 파이프라인에 활성 업데이트가 실행되고 있지 않은지 확인합니다.

  2. 새 검사점의 시작 위치를 결정합니다. 처리를 계속할 마지막 성공한 오프셋 또는 타임스탬프를 식별합니다. 일반적으로 오류가 발생하기 전에 성공적으로 처리된 최신 오프셋입니다.

    위의 예제에서는 자동 로더를 사용하여 JSON 파일을 읽고 있으므로 이 modifiedAfter 옵션을 사용하여 새 검사점의 시작 위치를 지정할 수 있습니다. 이 옵션을 사용하면 자동 로더가 새 파일 처리를 시작해야 하는 시기에 대한 타임스탬프를 설정할 수 있습니다.

    Kafka 원본의 startingOffsets 경우 이 옵션을 사용하여 스트리밍 쿼리가 새 데이터 처리를 시작해야 하는 오프셋을 지정할 수 있습니다.

    Delta Lake 원본의 경우 이 startingVersion 옵션을 사용하여 스트리밍 쿼리에서 새 데이터 처리를 시작해야 하는 버전을 지정할 수 있습니다.

  3. 코드 변경: 스트리밍 쿼리를 수정하여 API를 dropDuplicates() 제거하거나 다른 내용을 변경할 수 있습니다. 또한 자동 로더 읽기 경로에 modifiedAfter 옵션을 추가했음을 확인합니다.

    @dp.temporary_view(name="customers_incremental_view")
    def query():
        return (
        spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.includeExistingFiles", "true")
            .option("modifiedAfter", "2025-04-09T06:15:00")
            .load(customers_incremental_path)
            # .dropDuplicates(["customer_id"])
        )
    

    비고

    잘못된 modifiedAfter 타임스탬프를 제공하면 데이터가 손실되거나 중복됩니다. 이전 데이터를 다시 처리하거나 새 데이터가 누락되지 않도록 타임스탬프가 올바르게 설정되어 있는지 확인합니다.

    쿼리에 스트림 스트림 조인 또는 스트림 스트림 공용 구조체가 있는 경우 참여하는 모든 스트리밍 원본에 대해 위의 전략을 적용해야 합니다. 다음은 그 예입니다.

    cdc_1 = spark.readStream.format("cloudFiles")...
    cdc_2 = spark.readStream.format("cloudFiles")...
    cdc_source = cdc_1..union(cdc_2)
    
  4. 검사점을 다시 설정하려는 스트리밍 테이블과 연결된 흐름 이름을 식별합니다. 이 예제에서는 다음과 같습니다 customers_incremental_flow. 파이프라인 코드에서 또는 파이프라인 UI 또는 파이프라인 이벤트 로그를 확인하여 흐름 이름을 찾을 수 있습니다.

  5. 검사점 다시 설정: Python Notebook을 만들고 Azure Databricks 클러스터에 연결합니다.

    검사점을 다시 설정하려면 다음 정보가 필요합니다.

    • Azure Databricks 작업 영역 URL
    • 파이프라인 ID
    • 검사점을 다시 설정하는 흐름 이름
    import requests
    import json
    
    # Define your Databricks instance and pipeline ID
    databricks_instance = "<DATABRICKS_URL>"
    token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
    pipeline_id = "<YOUR_PIPELINE_ID>"
    flows_to_reset = ["<YOUR_FLOW_NAME>"]
    # Set up the API endpoint
    endpoint = f"{databricks_instance}/api/2.0/pipelines/{pipeline_id}/updates"
    
    
    # Set up the request headers
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    
    # Define the payload
    payload = {
        "reset_checkpoint_selection": flows_to_reset
    }
    
    # Make the POST request
    response = requests.post(endpoint, headers=headers, data=json.dumps(payload))
    
    # Check the response
    if response.status_code == 200:
        print("Pipeline update started successfully.")
    else:
        print(f"Error: {response.status_code}, {response.text}")
    
  6. 파이프라인 실행: 파이프라인은 증분 처리를 계속하면서 기존 테이블 데이터를 유지하면서 새 검사점을 사용하여 지정된 시작 위치에서 새 데이터를 처리하기 시작합니다.

모범 사례

  • 프로덕션 환경에서 프라이빗 미리 보기 기능을 사용하지 마세요.
  • 프로덕션 환경을 변경하기 전에 변경 내용을 테스트합니다.
    • 더 낮은 환경에서 이상적으로 테스트 파이프라인을 만듭니다. 이것이 가능하지 않은 경우 테스트에 다른 카탈로그와 스키마를 사용하세요.
    • 오류를 재현합니다.
    • 변경 내용을 적용합니다.
    • 결과의 유효성을 검사하고 이동/no-go결정을 내립니다.
    • 프로덕션 파이프라인에 대한 변경 내용을 롤아웃합니다.