Compartilhar via


Recuperar um pipeline após falha de ponto de verificação em streaming

Esta página descreve como recuperar um pipeline no Lakeflow Spark Declarative Pipelines quando um ponto de verificação de streaming fica inválido ou é corrompido.

O que é um ponto de verificação de streaming?

No Apache Spark Structured Streaming, um ponto de verificação é um mecanismo usado para manter o estado de uma consulta de streaming. Esse estado inclui:

  • Informações de progresso: quais deslocamentos da origem foram processados.
  • Estado intermediário: dados que precisam ser mantidos em microlotes para operações com estado (por exemplo, agregações, mapGroupsWithState).
  • Metadados: informações sobre a execução da consulta de streaming.

Os pontos de verificação são essenciais para garantir a tolerância a falhas e a consistência de dados em aplicativos de streaming:

  • Tolerância a falhas: se um aplicativo de streaming falhar (por exemplo, devido a uma falha de nó, falha do aplicativo), o ponto de verificação permitirá que o aplicativo reinicie do último estado de ponto de verificação bem-sucedido em vez de reprocessar todos os dados desde o início. Isso evita a perda de dados e garante o processamento incremental.
  • Processamento exatamente uma vez: para muitas fontes de streaming, pontos de verificação, em conjunto com coletores idempotentes, habilite o processamento exatamente uma vez garante que cada registro seja processado exatamente uma vez, mesmo diante de falhas, impedindo duplicatas ou omissões.
  • Gerenciamento de estado: para transformações com estado, os pontos de verificação persistem o estado interno dessas operações, permitindo que a consulta de streaming continue processando corretamente novos dados com base no estado histórico acumulado.

Pontos de controle do pipeline

Os pipelines se baseiam no Streaming Estruturado e abstraem grande parte do gerenciamento de ponto de verificação subjacente, oferecendo uma abordagem declarativa. Quando você define uma tabela de streaming em seu pipeline, há um estado de ponto de verificação para cada gravação de fluxo na tabela de streaming. Esses locais de ponto de verificação são internos para o pipeline e não são acessíveis aos usuários.

Normalmente, você não precisa gerenciar ou entender os pontos de verificação subjacentes para tabelas de streaming, exceto nos seguintes casos:

  • Retroceder e reproduzir: se você quiser reprocessar os dados de um ponto específico no tempo, preservando o estado atual da tabela, será necessário redefinir o ponto de verificação da tabela de streaming.
  • Recuperando-se de uma falha de ponto de verificação ou corrupção: se uma consulta que está gravando na tabela de streaming falhou devido a erros relacionados ao ponto de verificação, isso causará uma falha grave e a consulta não poderá progredir ainda mais. Há três abordagens que você pode usar para se recuperar dessa classe de falha:
    • Atualização completa da tabela: isso redefine a tabela e apaga os dados existentes.
    • Atualização de tabela completa com backup e backfill: você faz um backup da tabela antes de executar uma atualização de tabela completa e fazer backup de dados antigos, mas isso é muito caro e deve ser o último recurso.
    • Redefina o ponto de verificação e continue de forma incremental: se você não puder perder dados existentes, deverá executar uma redefinição de ponto de verificação seletiva para os fluxos de streaming afetados.

Exemplo: falha de pipeline devido à alteração de código

Considere um cenário em que você tem um pipeline que processa um fluxo de dados de alterações juntamente com o instantâneo da tabela inicial de um sistema de armazenamento em nuvem, como o Amazon S3, e escreve em uma tabela de transmissão SCD-1.

O pipeline tem dois fluxos de streaming:

  • customers_incremental_flow: lê incrementalmente o feed CDC da tabela de origem customer , filtra registros duplicados e os coloca na tabela de destino.
  • customers_snapshot_flow: uma vez, leia o instantâneo inicial da tabela de origem customers e insira os registros na tabela de destino.

Exemplo CDC de Pipelines para recuperação de falha de ponto de checagem

@dp.temporary_view(name="customers_incremental_view")
  def query():
    return (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .load(customers_incremental_path)
        .dropDuplicates(["customer_id"])
    )

@dp.temporary_view(name="customers_snapshot_view")
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(customers_snapshot_path)
        .select("*")
    )

dp.create_streaming_table("customers")

dp.create_auto_cdc_flow(
    flow_name = "customers_incremental_flow",
    target = "customers",
    source = "customers_incremental_view",
    keys = ["customer_id"],
    sequence_by = col("sequenceNum"),
    apply_as_deletes = expr("operation = 'DELETE'"),
    apply_as_truncates = expr("operation = 'TRUNCATE'"),
    except_column_list = ["operation", "sequenceNum"],
    stored_as_scd_type = 1
)
dp.create_auto_cdc_flow(
    flow_name = "customers_snapshot_flow",
    target = "customers",
    source = "customers_snapshot_view",
    keys = ["customer_id"],
    sequence_by = lit(0),
    stored_as_scd_type = 1,
    once = True
)

Depois de implantar esse pipeline, ele é executado com êxito e começa a processar o feed de dados de alterações e o instantâneo inicial.

Posteriormente, você percebe que a lógica de eliminação de duplicação na customers_incremental_view consulta é redundante e causa um gargalo de desempenho. Você remove o para melhorar o dropDuplicates() desempenho:

@dp.temporary_view(name="customers_raw_view")
  def query():
    return (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .load()
        # .dropDuplicates()
    )

Depois de remover a dropDuplicates() API e reimplantar o pipeline, a atualização falhará com o seguinte erro:

Streaming stateful operator name does not match with the operator in state metadata.
This is likely to happen when a user adds/removes/changes stateful operators of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)];
Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST

Esse erro indica que a alteração não é permitida devido a uma incompatibilidade entre o estado do ponto de verificação e a definição de consulta atual, impedindo que o pipeline progrida ainda mais.

Falhas relacionadas ao ponto de verificação podem ocorrer por vários motivos além de apenas remover a dropDuplicates API. Cenários comuns incluem:

  • Adicionando ou removendo operadores com estado (por exemplo, introduzindo ou descartando dropDuplicates() ou agregando) em uma consulta de streaming existente.
  • Adicionando, removendo ou combinando fontes de streaming em uma consulta com ponto de verificação anteriormente (por exemplo, a união de uma consulta de streaming existente com uma nova ou a adição/remoção de fontes de uma operação de união existente).
  • Modificando o esquema de estado de operações de streaming com estado (como alterar as colunas usadas para eliminação de duplicação ou agregação).

Para obter uma lista abrangente de alterações com suporte e sem suporte, consulte o Guia de Streaming Estruturado do Spark e tipos de alterações em consultas de Streaming Estruturado.

Opções de recuperação

Há três estratégias de recuperação, dependendo dos requisitos de durabilidade dos dados e das restrições de recursos:

Methods Complexidade Custo Possível perda de dados Duplicação de dados em potencial Requer instantâneo inicial Redefinição de tabela completa
Atualização completa da tabela Low Medium Sim (se nenhum instantâneo inicial estiver disponível ou se os arquivos brutos tiverem sido excluídos na origem.) Não (Para aplicar alterações na tabela de destino.) Yes Yes
Atualização completa da tabela com backup e backfill Medium High Não Não (para coletores idempotentes. Por exemplo, CDC automático.) Não Não
Redefinir ponto de verificação da tabela Medium-High (Médio para fontes somente acréscimo que fornecem deslocamentos imutáveis.) Low Não (Requer consideração cuidadosa.) Não (Para escritores idempotentes. Por exemplo, CDC automático somente para a tabela de destino.) Não Não

Medium-High complexidade depende do tipo de origem de streaming e da complexidade da consulta.

Recommendations

  • Use uma atualização de tabela completa se você não quiser lidar com a complexidade de uma redefinição de ponto de verificação e poderá recompilar a tabela inteira. Isso também lhe dará uma opção para fazer alterações de código.
  • Use a atualização completa da tabela com backup e backfill se você não quiser lidar com a complexidade da redefinição de ponto de verificação e se estiver bem com o custo adicional de fazer um backup e fazer backup dos dados históricos.
  • Use o ponto de verificação da tabela de redefinição se você precisar preservar os dados existentes na tabela e continuar processando novos dados incrementalmente. No entanto, essa abordagem requer um tratamento cuidadoso da redefinição de ponto de verificação para verificar se os dados existentes na tabela não foram perdidos e se o pipeline pode continuar processando novos dados.

Redefinir ponto de verificação e continuar incrementalmente

Para redefinir o ponto de verificação e continuar o processamento incrementalmente, siga estas etapas:

  1. Pare o pipeline: verifique se o pipeline não tem atualizações ativas em execução.

  2. Determine a posição inicial do novo ponto de verificação: identifique o último deslocamento bem-sucedido ou carimbo de data/hora do qual você deseja continuar o processamento. Normalmente, esse é o deslocamento mais recente processado com êxito antes da falha ocorrer.

    No exemplo acima, como você está lendo os arquivos JSON usando o carregador automático, você pode usar a opção modifiedAfter para especificar a posição inicial para o novo ponto de verificação. Essa opção permite que você defina um carimbo de data/hora para quando o carregador automático deve começar a processar novos arquivos.

    Para fontes kafka, você pode usar a opção startingOffsets para especificar os deslocamentos dos quais a consulta de streaming deve começar a processar novos dados.

    Para fontes delta lake, você pode usar a opção startingVersion para especificar a versão da qual a consulta de streaming deve começar a processar novos dados.

  3. Faça alterações de código: você pode modificar a consulta de streaming para remover a dropDuplicates() API ou fazer outras alterações. Além disso, e verifique se você adicionou a opção modifiedAfter ao caminho de leitura do carregador automático.

    @dp.temporary_view(name="customers_incremental_view")
    def query():
        return (
        spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.includeExistingFiles", "true")
            .option("modifiedAfter", "2025-04-09T06:15:00")
            .load(customers_incremental_path)
            # .dropDuplicates(["customer_id"])
        )
    

    Observação

    Fornecer um carimbo de data/hora incorreto modifiedAfter pode levar à perda ou duplicação de dados. Verifique se o carimbo de data/hora está definido corretamente para evitar o processamento de dados antigos novamente ou a falta de novos dados.

    Se a consulta tiver uma união de stream-stream join ou stream-stream, você deverá aplicar a estratégia acima para todas as fontes de streaming participantes. Por exemplo:

    cdc_1 = spark.readStream.format("cloudFiles")...
    cdc_2 = spark.readStream.format("cloudFiles")...
    cdc_source = cdc_1..union(cdc_2)
    
  4. Identifique os nomes de fluxo associados à tabela de streaming para a qual você deseja redefinir o ponto de verificação. No exemplo, é customers_incremental_flow. Você pode encontrar o nome do fluxo no código do pipeline ou verificando a interface do usuário do pipeline ou os logs de eventos do pipeline.

  5. Redefinir o ponto de verificação: crie um notebook Python e anexe-o a um cluster do Azure Databricks.

    Você precisará das seguintes informações para poder redefinir o ponto de verificação:

    • URL do espaço de trabalho do Azure Databricks
    • Pipeline ID
    • Nome do fluxo para o qual você está redefinindo o ponto de verificação
    import requests
    import json
    
    # Define your Databricks instance and pipeline ID
    databricks_instance = "<DATABRICKS_URL>"
    token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
    pipeline_id = "<YOUR_PIPELINE_ID>"
    flows_to_reset = ["<YOUR_FLOW_NAME>"]
    # Set up the API endpoint
    endpoint = f"{databricks_instance}/api/2.0/pipelines/{pipeline_id}/updates"
    
    
    # Set up the request headers
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    
    # Define the payload
    payload = {
        "reset_checkpoint_selection": flows_to_reset
    }
    
    # Make the POST request
    response = requests.post(endpoint, headers=headers, data=json.dumps(payload))
    
    # Check the response
    if response.status_code == 200:
        print("Pipeline update started successfully.")
    else:
        print(f"Error: {response.status_code}, {response.text}")
    
  6. Execute o Pipeline: o pipeline começa a processar novos dados da posição inicial especificada com um novo ponto de verificação, preservando os dados de tabela existentes e continuando o processamento incremental.

Práticas recomendadas

  • Evite usar recursos de visualização privada em produção.
  • Teste suas alterações antes de fazer alterações em seu ambiente de produção.
    • Crie um pipeline de teste, idealmente em um ambiente inferior. Se isso não for possível, tente usar um catálogo e um esquema diferentes para seu teste.
    • Reproduza o erro.
    • Aplique as alterações.
    • Valide os resultados e tome uma decisão em qualquer lugar/no-go.
    • Implemente as alterações nos pipelines de produção.