Compartilhar via


O que é a CDA (captura de dados de alterações)?

A CDC (captura de dados de alteração) é um padrão de integração de dados que captura alterações feitas em dados em um sistema de origem, como inserções, atualizações e exclusões. Essas alterações, representadas como uma lista, são comumente conhecidas como um feed CDC. Você pode processar seus dados muito mais rapidamente se operar em um feed CDC, em vez de ler todo o conjunto de dados de origem. Bancos de dados transacionais como SQL Server, MySQL e Oracle geram feeds CDC. As tabelas delta geram seu próprio feed CDC, conhecido como CDF (feed de dados de alteração).

O diagrama a seguir mostra que, quando uma linha em uma tabela de origem que contém dados de funcionário é atualizada, ela gera um novo conjunto de linhas em um feed CDC que contém apenas as alterações. Cada linha do feed CDC normalmente contém metadados adicionais, incluindo a operação, como UPDATE e uma coluna que pode ser usada para ordenar deterministicamente cada linha no feed CDC para que você possa lidar com atualizações fora de ordem. Por exemplo, a sequenceNum coluna no diagrama a seguir determina a ordem de linha no feed CDC:

Visão geral da captura de dados de alterações.

Processando um feed de dados de alterações: mantenha apenas os dados mais recentes versus mantenha versões históricas dos dados

O processamento de um feed de dados alterado é conhecido como SCD (dimensões de alteração lenta). Ao processar um feed de CDC, você precisa tomar uma decisão:

  • Você mantém apenas os dados mais recentes (ou seja, substituir dados existentes)? Isso é conhecido como SCD Tipo 1.
  • Ou você mantém um histórico de alterações nos dados? Isso é conhecido como SCD Tipo 2.

O processamento scd tipo 1 envolve a substituição de dados antigos com novos dados sempre que ocorre uma alteração. Isso significa que nenhum histórico das alterações é mantido. Somente a versão mais recente dos dados está disponível. É uma abordagem simples e geralmente é usada quando o histórico de alterações não é importante, como corrigir erros ou atualizar campos não críticos, como endereços de email do cliente.

Visão geral do SCD Tipo 1 de captura de dados de alteração.

O processamento scd tipo 2 mantém um registro histórico de alterações de dados criando registros adicionais para capturar diferentes versões dos dados ao longo do tempo. Cada versão dos dados é carimbada com data/hora e/ou marcada com metadados que permitem que os usuários rastreiem quando ocorreu uma alteração. Isso é útil quando é importante acompanhar a evolução dos dados, como acompanhar as alterações de endereço do cliente ao longo do tempo para fins de análise.

Visão geral sobre a captura de alteração de dados do Tipo SCD 2.

Exemplos de processamento SCD Tipo 1 e Tipo 2 com Pipelines Declarativos do Lakeflow Spark

Os exemplos nesta seção mostram como usar o SCD Tipo 1 e o Tipo 2.

Etapa 1: Preparar dados de exemplo

Neste exemplo, você gerará um feed CDC de exemplo. Primeiro, crie um notebook e cole o código a seguir nele. Atualize as variáveis no início do bloco de código para um catálogo e um esquema em que você tem permissão para criar tabelas e exibições.

Esse código cria uma nova tabela Delta que contém vários registros de alteração. O esquema é o seguinte:

  • id - Inteiro, identificador exclusivo deste funcionário
  • name - Cadeia de caracteres, nome do funcionário
  • role - Cadeia de caracteres, função de funcionário
  • country - Cadeia de caracteres, código do país, onde o funcionário trabalha
  • operation - Alterar tipo(por exemplo, INSERT, UPDATEou DELETE)
  • sequenceNum – Tipo inteiro, identifica a ordem lógica dos eventos de CDC nos dados de origem. O Lakeflow Spark Declarative Pipelines usa esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem.
# update these to the catalog and schema where you have permissions
# to create tables and views.

catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"

def write_employees_cdf_to_delta():
 data = [
   (1, "Alex", "chef", "FR", "INSERT", 1),
   (2, "Jessica", "owner", "US", "INSERT", 2),
   (3, "Mikhail", "security", "UK", "INSERT", 3),
   (4, "Gary", "cleaner", "UK", "INSERT", 4),
   (5, "Chris", "owner", "NL", "INSERT", 6),
   # out of order update, this should be dropped from SCD Type 1
   (5, "Chris", "manager", "NL", "UPDATE", 5),
   (6, "Pat", "mechanic", "NL", "DELETE", 8),
   (6, "Pat", "mechanic", "NL", "INSERT", 7)
 ]
 columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
 df = spark.createDataFrame(data, columns)
 df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")

write_employees_cdf_to_delta()

Você pode visualizar esses dados usando o seguinte comando SQL:

SELECT *
FROM mycatalog.myschema.employees_cdf

Etapa 2: Usar o SCD Tipo 1 para manter apenas os dados mais recentes

É recomendável usar a AUTO CDC API no Lakeflow Spark Declarative Pipelines para processar um feed de dados de alteração em uma tabela SCD Tipo 1.

  1. Criar um notebook novo.
  2. Cole o código a seguir nele.
  3. Crie e conecte-se a um pipeline.

A employees_cdf função lê a tabela que acabamos de criar acima como um fluxo porque a create_auto_cdc_flow API, que você usará para processamento de captura de dados de alterações, espera um fluxo de alterações como entrada. Você o encapsula com um decorador @dp.temporary_view porque não deseja converter este fluxo em uma tabela.

Em seguida, você usa dp.create_target_table para criar uma tabela de streaming que contém o resultado do processamento desse feed de dados de alterações.

Por fim, você usa dp.create_auto_cdc_flow para processar o fluxo de dados de alteração. Vamos dar uma olhada em cada argumento:

  • target - A tabela de streaming de destino, que você definiu anteriormente.
  • source - A exibição do fluxo de registros de alteração, que você definiu anteriormente.
  • keys - Identifica linhas exclusivas no feed de alterações. Como você está usando id como um identificador exclusivo, basta fornecer id como a única coluna de identificação.
  • sequence_by - O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. Você precisa desse sequenciamento para lidar com eventos de alteração que chegam fora de ordem. Forneça sequenceNum como a coluna de sequenciamento.
  • apply_as_deletes - Como os dados de exemplo contêm operações de exclusão, você usa apply_as_deletes para indicar quando um evento CDC deve ser tratado como um DELETE em vez de um upsert.
  • except_column_list - Contém uma lista de colunas que você não deseja incluir na tabela de destino. Neste exemplo, você usará esse argumento para excluir sequenceNum e operation.
  • stored_as_scd_type - Indica o tipo SCD que você deseja usar.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType

catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"

@dp.temporary_view
def employees_cdf():
 return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")

dp.create_target_table(f"{catalog}.{schema}.{employees_table_current}")

dp.create_auto_cdc_flow(
 target=f"{catalog}.{schema}.{employees_table_current}",
 source=employees_cdf_table,
 keys=["id"],
 sequence_by=col("sequenceNum"),
 apply_as_deletes=expr("operation = 'DELETE'"),
 except_column_list = ["operation", "sequenceNum"],
 stored_as_scd_type = 1
)

Execute este pipeline clicando em Iniciar.

Em seguida, execute a seguinte consulta no editor do SQL para verificar se os registros de alteração foram processados corretamente:

SELECT *
FROM mycatalog.myschema.employees_current

Observação

A atualização fora de ordem para o funcionário Chris foi descartada corretamente, pois sua função ainda está definida como Proprietário em vez de Gerente.

Exemplo de SCD Tipo 1 para captura de dados de alteração.

Etapa 3: Usar o SCD Tipo 2 para manter dados históricos

Neste exemplo, você cria uma segunda tabela de destino, chamada employees_historical, que contém um histórico completo de alterações nos registros de funcionários.

Adicione esse código ao pipeline. A única diferença aqui é que stored_as_scd_type é definido como 2 em vez de 1.

dp.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")

dp.create_auto_cdc_flow(
 target=f"{catalog}.{schema}.{employees_table_historical}",
 source=employees_cdf_table,
 keys=["id"],
 sequence_by=col("sequenceNum"),
 apply_as_deletes=expr("operation = 'DELETE'"),
 except_column_list = ["operation", "sequenceNum"],
 stored_as_scd_type = 2
)

Execute este pipeline clicando em Iniciar.

Em seguida, execute a seguinte consulta no editor do SQL para verificar se os registros de alteração foram processados corretamente:

SELECT *
FROM mycatalog.myschema.employees_historical

Você verá todas as alterações nos funcionários, incluindo os funcionários que foram excluídos, como Pat.

Exemplo de Captura de Dados de Alteração SCD Tipo 2.

Etapa 4: Limpar recursos

Quando terminar, limpe os recursos seguindo estas etapas:

  1. Exclua o pipeline:

    Observação

    Quando você exclui o pipeline, ele exclui automaticamente as tabelas employees e employees_historical.

    1. Clique em Jobs e Pipelines e depois encontre o nome do pipeline a ser deletado.
    2. Clique no ícone de mais opções. Na mesma linha do nome do pipeline, e clique em Excluir.
  2. Exclua o bloco de notas.

  3. Exclua a tabela que contém o fluxo de dados de alteração:

    1. Clique em Nova > Consulta.
    2. Cole e execute o seguinte código SQL, ajustando o catálogo e o esquema conforme apropriado:
DROP TABLE mycatalog.myschema.employees_cdf

Desvantagens de usar MERGE INTO e foreachBatch para a captura de dados de alteração

O Databricks fornece um comando SQL MERGE INTO que pode ser usado com a API foreachBatch para fazer upsert de linhas em uma tabela Delta. Esta seção explora como essa técnica pode ser usada para casos de uso simples, mas esse método se torna cada vez mais complexo e frágil quando aplicado a cenários do mundo real.

Neste exemplo, você usará o mesmo feed de dados de alteração de exemplo usado nos exemplos anteriores.

Implementação naive com MERGE INTO e foreachBatch

Crie um bloco de anotações e copie o código a seguir nele. Altere as variáveis catalog, schema e employees_table conforme apropriado. As variáveis catalog e schema devem ser definidas como locais no Catálogo do Unity onde você pode criar tabelas.

Quando você executa o notebook, ele faz o seguinte:

  • Cria a tabela de destino na create_table. Ao contrário de create_auto_cdc_flow, que trata essa etapa automaticamente, você precisa especificar o esquema.
  • Lê o feed de alterações de dados como um fluxo. Cada microbatch é processado usando o upsertToDelta método, que executa um MERGE INTO comando.
catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"

def upsertToDelta(microBatchDF, batchId):
 microBatchDF.createOrReplaceTempView("updates")
 microBatchDF.sparkSession.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET *
   WHEN NOT MATCHED THEN INSERT *
 """)

def create_table():
 spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
  spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
   (id INT, name STRING, age INT, country STRING)
 """)

create_table()

cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")

cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()

Para ver os resultados, execute a seguinte consulta SQL:

SELECT *
FROM mycatalog.myschema.employees_merge

Infelizmente, os resultados estão incorretos, conforme mostrado a seguir:

Exemplo de captura de dados alterados MERGE INTO.

Várias atualizações para a mesma chave na mesma microbatch

O primeiro problema é que o código não manipula várias atualizações para a mesma chave na mesma microbatch. Por exemplo, você usa INSERT para inserir o funcionário Chris e, em seguida, atualizou sua função de Proprietário para Gerente. Isso deve resultar em uma linha, mas, em vez disso, há duas linhas.

Qual alteração prevalece quando há várias atualizações na mesma chave em um micro-lote?

Os dados de alteração capturam várias atualizações para a mesma chave no mesmo exemplo de microbatch.

A lógica torna-se mais complexa. O exemplo de código a seguir recupera a linha sequenceNum mais recente e mescla apenas os dados na tabela de destino da seguinte maneira:

  • Agrupa pela chave primária, id.
  • Toma todas as colunas da linha que contém o valor máximo de sequenceNum no lote correspondente a essa chave.
  • Explode a linha de volta.

Atualize o upsertToDelta método conforme mostrado a seguir e execute o código:

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET *
   WHEN NOT MATCHED THEN INSERT *
 """)

Ao consultar a tabela de destino, você verá que o funcionário chamado Chris tem a função correta, mas ainda há outros problemas a serem resolvidos porque você ainda excluiu registros que aparecem na tabela de destino.

Os dados de alteração capturam várias atualizações para a mesma chave no mesmo resultado de exemplo de microbatch.

Atualizações fora de ordem em micro-lotes

Esta seção aborda o problema das atualizações fora de ordem em micro-lotes. O diagrama a seguir ilustra o problema: e se a linha de Chris tiver uma UPDATE operação na primeira microbatch seguida por uma INSERT em uma microbatch subsequente? O código não lida com isso corretamente.

Qual alteração prevalece quando há atualizações fora de sequência para a mesma chave em vários microbatches?

Captura de mudança de dados em atualizações fora de ordem em microlotes: exemplo.

Para corrigir isso, expanda o código para armazenar uma versão em cada linha da seguinte maneira:

  • Armazene sequenceNum quando uma linha foi atualizada pela última vez.
  • Para cada nova linha, verifique se o timestamp é superior ao armazenado e então aplique a lógica seguinte:
    • Se for maior, use os novos dados do alvo.
    • Caso contrário, mantenha os dados na origem.

Primeiro, atualize o método createTable para armazenar o sequenceNum, pois você o usará para versionar cada linha.

def create_table():
 spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
  spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
   (id INT, name STRING, age INT, country STRING, sequenceNum INT)
 """)

Em seguida, atualize upsertToDelta para lidar com versões de linha. A cláusula de UPDATE SET precisa tratar cada coluna separadamente.

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

Manipulando deleções

Infelizmente, o código ainda tem um problema. Ele não realiza operações de DELETE, como evidenciado pelo fato de que o funcionário "Pat" ainda está na tabela de destino.

Vamos supor que as exclusões cheguem na mesma microbatch. Para lidar com as mudanças, atualize novamente o método upsertToDelta para excluir a linha quando o registro de alteração de dados indicar a exclusão, conforme mostrado a seguir:

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

Manipulando atualizações que chegam fora de ordem após exclusões

Infelizmente, o código acima ainda não está muito correto porque não lida com casos em que um DELETE é seguido por um fora de ordem UPDATE entre microbatches.

Alterar as atualizações de tratamento de captura de dados que chegam fora de ordem após o exemplo de exclusões.

O algoritmo para lidar com esse caso precisa armazenar informações sobre exclusões para que ele possa lidar com as atualizações fora de ordem subsequentes. Para fazer isso:

  • Em vez de excluir linhas imediatamente, faça uma exclusão suave usando um carimbo de data/hora ou sequenceNum. Linhas excluídas suavemente são marcadas como excluídas.
  • Redirecione todos os usuários para uma exibição que filtra as pedras de tombamento.
  • Crie um trabalho de limpeza que remova as pedras de tumba ao longo do tempo.

Use o seguinte código:

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

Os usuários não podem usar a tabela de destino diretamente, portanto, crie uma exibição que eles possam consultar:

CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL

Por fim, crie um trabalho de limpeza que remova periodicamente linhas tombadas:

DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY