Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
A captura de dados de alteração (CDC) é um padrão de integração de dados que captura as alterações feitas nos dados em um sistema de origem, como inserções, atualizações e exclusões. Essas alterações, representadas como uma lista, são comumente chamadas de feed do CDC. Você pode processar seus dados muito mais rapidamente se operar em um feed CDC, em vez de ler todo o conjunto de dados de origem. Sistemas de gestão de bancos de dados transacionais, como SQL Server, MySQL e Oracle, geram feeds CDC. As tabelas delta geram seu próprio feed CDC, conhecido como feed de dados de alteração (CDF).
O diagrama a seguir mostra que, quando uma linha em uma tabela de origem que contém dados de funcionários é atualizada, ela gera um novo conjunto de linhas em um feed CDC que contém apenas as alterações. Cada linha do feed CDC normalmente contém metadados adicionais, incluindo a operação como UPDATE e uma coluna que pode ser usada para ordenar deterministicamente cada linha no feed CDC para que você possa lidar com atualizações fora de ordem. Por exemplo, a sequenceNum coluna no diagrama a seguir determina a ordem das linhas no feed CDC:
Processando um feed de dados de alteração: mantenha apenas os dados mais recentes versus mantenha versões históricas de dados
O processamento de um feed de dados alterado é conhecido como dimensões lentamente variáveis (SCD). Ao processar um feed CDC, você tem uma escolha a fazer:
- Você mantém apenas os dados mais recentes (ou seja, substitui os dados existentes)? Isso é conhecido como SCD Tipo 1.
- Ou você mantém um histórico de alterações nos dados? Isso é conhecido como SCD Tipo 2.
O processamento SCD Tipo 1 envolve a substituição de dados antigos por novos dados sempre que ocorre uma alteração. Isso significa que nenhum histórico das mudanças é mantido. Apenas a versão mais recente dos dados está disponível. É uma abordagem simples e é frequentemente usada quando o histórico de alterações não é importante, como corrigir erros ou atualizar campos não críticos, como endereços de e-mail de clientes.
O processamento SCD Tipo 2 mantém um registro histórico das alterações de dados criando registros adicionais para capturar diferentes versões dos dados ao longo do tempo. Cada versão dos dados é marcada com data e hora ou marcada com metadados que permitem aos usuários rastrear quando ocorreu uma alteração. Isso é útil quando é importante acompanhar a evolução dos dados, como acompanhar as alterações de endereço do cliente ao longo do tempo para fins de análise.
Exemplos de processamento SCD Tipo 1 e Tipo 2 com Lakeflow Spark Declarative Pipelines
Os exemplos nesta seção mostram como usar o SCD Tipo 1 e Tipo 2.
Etapa 1: Preparar dados de exemplo
Neste exemplo, você gerará um feed CDC de exemplo. Primeiro, crie um bloco de anotações e cole o seguinte código nele. Atualize as variáveis no início do bloco de código para um catálogo e esquema onde você tem permissão para criar tabelas e exibições.
Esse código cria uma nova tabela Delta que contém vários registros de alteração. O esquema é o seguinte:
-
id- Inteiro, identificador único deste funcionário -
name- String, nome do funcionário -
role- String, função do empregado -
country- String, código do país, onde o funcionário trabalha -
operation- Alterar o tipo (por exemplo,INSERT,UPDATE, ouDELETE) -
sequenceNum- Inteiro, identifica a ordem lógica dos eventos CDC nos dados de origem. O Lakeflow Spark Declarative Pipelines usa esse sequenciamento para manipular eventos de alteração que chegam fora de ordem.
# update these to the catalog and schema where you have permissions
# to create tables and views.
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
def write_employees_cdf_to_delta():
data = [
(1, "Alex", "chef", "FR", "INSERT", 1),
(2, "Jessica", "owner", "US", "INSERT", 2),
(3, "Mikhail", "security", "UK", "INSERT", 3),
(4, "Gary", "cleaner", "UK", "INSERT", 4),
(5, "Chris", "owner", "NL", "INSERT", 6),
# out of order update, this should be dropped from SCD Type 1
(5, "Chris", "manager", "NL", "UPDATE", 5),
(6, "Pat", "mechanic", "NL", "DELETE", 8),
(6, "Pat", "mechanic", "NL", "INSERT", 7)
]
columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")
write_employees_cdf_to_delta()
Você pode visualizar esses dados usando o seguinte comando SQL:
SELECT *
FROM mycatalog.myschema.employees_cdf
Etapa 2: Use o SCD Tipo 1 para manter apenas os dados mais recentes
Recomendamos usar a AUTO CDC API em um Lakeflow Spark Declarative Pipelines para processar um feed de dados de alteração em uma tabela SCD Tipo 1.
- Crie um novo bloco de notas.
- Cole o seguinte código nele.
- Crie e ligue-se a um pipeline.
A employees_cdf função lê a tabela que acabamos de criar acima como um fluxo porque a API, que você usará para alterar o create_auto_cdc_flow processamento de captura de dados, espera um fluxo de alterações como entrada. Você embrulha com um decorador @dp.temporary_view porque não quer materializar esse fluxo em uma tabela.
Em seguida, você usa dp.create_target_table para criar uma tabela de streaming que contém o resultado do processamento desse feed de dados de alteração.
Finalmente, usa dp.create_auto_cdc_flow para processar o fluxo de dados de alteração. Vejamos cada argumento:
-
target- A tabela de streaming de destino, que você definiu anteriormente. -
source- A visão sobre o fluxo de registos de mudança que definiste anteriormente. -
keys- Identifica linhas exclusivas no feed de alterações. Como você está usandoidcomo um identificador exclusivo, basta forneceridcomo a única coluna de identificação. -
sequence_by- O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. Você precisa desse sequenciamento para lidar com eventos de alteração que chegam fora de ordem. ForneçasequenceNumcomo a coluna de sequenciamento. -
apply_as_deletes- Como os dados de exemplo contêm operações de eliminação, utiliza-seapply_as_deletespara indicar quando um evento CDC deve ser tratado como umDELETEe não como um upsert. -
except_column_list- Contém uma lista de colunas que você não deseja incluir na tabela de destino. Neste exemplo, você usará esse argumento para excluirsequenceNumeoperation. -
stored_as_scd_type- Indica o tipo de SCD que pretende utilizar.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"
@dp.temporary_view
def employees_cdf():
return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")
dp.create_target_table(f"{catalog}.{schema}.{employees_table_current}")
dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_current}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
Execute esse pipeline clicando em Iniciar.
Em seguida, execute a seguinte consulta no editor SQL para verificar se os registros de alteração foram processados corretamente:
SELECT *
FROM mycatalog.myschema.employees_current
Observação
A atualização fora de ordem para o funcionário Chris foi descartada corretamente, pois sua função ainda está definida como Proprietário em vez de Gerente.
Etapa 3: Usar o SCD Tipo 2 para manter dados históricos
Neste exemplo, você cria uma segunda tabela de destino, chamada employees_historical, que contém um histórico completo de alterações nos registros de funcionários.
Adicione este código ao seu pipeline. A única diferença aqui é que stored_as_scd_type é definido como 2 em vez de 1.
dp.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")
dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_historical}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 2
)
Execute esse pipeline clicando em Iniciar.
Em seguida, execute a seguinte consulta no editor SQL para verificar se os registros de alteração foram processados corretamente:
SELECT *
FROM mycatalog.myschema.employees_historical
Você verá todas as alterações nos funcionários, incluindo os funcionários que foram excluídos, como Pat.
Etapa 4: Limpar recursos
Quando terminar, limpe os recursos seguindo estas etapas:
Exclua o pipeline:
Observação
Quando elimina o pipeline, ele elimina automaticamente as tabelas
employeeseemployees_historical.- Clique em Jobs & Pipelines e localize o nome do pipeline a ser excluído.
- Clique no
na mesma linha do nome do pipeline e, em seguida, clique em Eliminar.
Remova o caderno de notas.
Remova a tabela que contém o fluxo de dados de alteração.
- Clique em Nova > consulta.
- Cole e execute o seguinte código SQL, ajustando o catálogo e o esquema conforme apropriado:
DROP TABLE mycatalog.myschema.employees_cdf
Desvantagens de usar MERGE INTO e foreachBatch para captura de dados de alterações
O Databricks fornece um MERGE INTO comando SQL que você pode usar com a foreachBatch API para atualizar linhas em uma tabela Delta. Esta seção explora como essa técnica pode ser usada para casos de uso simples, mas esse método se torna cada vez mais complexo e frágil quando aplicado a cenários do mundo real.
Neste exemplo, você usará o mesmo feed de dados de alteração de exemplo usado nos exemplos anteriores.
Implementação ingênua com MERGE INTO e foreachBatch
Crie um bloco de anotações e copie o código a seguir para ele. Altere as variáveis catalog, schema e employees_table conforme apropriado. As catalog variáveis e schema devem ser definidas para locais no Unity Catalog onde você pode criar tabelas.
Quando você executa o bloco de anotações, ele faz o seguinte:
- Cria a tabela de destino no
create_table. Ao contrário decreate_auto_cdc_flow, que lida com esta etapa automaticamente, você precisa especificar o esquema. - Lê o feed de dados de alteração como um fluxo. Cada microlote é processado usando o
upsertToDeltamétodo, que executa umMERGE INTOcomando.
catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"
def upsertToDelta(microBatchDF, batchId):
microBatchDF.createOrReplaceTempView("updates")
microBatchDF.sparkSession.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING)
""")
create_table()
cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")
cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()
Para ver os resultados, execute a seguinte consulta SQL:
SELECT *
FROM mycatalog.myschema.employees_merge
Infelizmente, os resultados estão incorretos, como mostrado a seguir:
Várias atualizações para a mesma chave no mesmo microlote
O primeiro problema é que o código não lida com várias atualizações para a mesma chave no mesmo microlote. Por exemplo, utiliza-se INSERT para inserir o funcionário Chris e posteriormente atualizar a função dele de Proprietário para Gerente. Isso deve resultar em uma linha, mas em vez disso há duas linhas.
Qual alteração vence quando há várias atualizações para a mesma chave em um microlote?
A lógica torna-se mais complexa. O exemplo de código a seguir recupera a linha mais recente usando sequenceNum e mescla apenas esses dados na tabela de destino conforme descrito abaixo:
- Grupos pela chave primária,
id. - Seleciona todas as colunas da linha que tem o máximo
sequenceNumno lote dessa chave. - Explode a linha de volta.
Atualize o upsertToDelta método como mostrado a seguir e, em seguida, execute o código:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Quando você consulta a tabela de destino, você vê que o funcionário chamado Chris tem a função correta, mas ainda há outros problemas para resolver porque você ainda tem registros excluídos aparecendo na tabela de destino.
Atualizações fora de ordem em micro-lotes
Esta seção explora o problema das atualizações fora de ordem em microlotes. O diagrama a seguir ilustra o problema: e se a linha para Chris tiver uma UPDATE operação no primeiro microlote seguida por uma INSERT num microlote subsequente? O código não lida com isso corretamente.
Qual alteração vence quando há atualizações fora de ordem para a mesma chave em vários microlotes?
Para corrigir isso, expanda o código para armazenar uma versão em cada linha da seguinte maneira:
- Armazene a
sequenceNumdata em que uma linha foi atualizada pela última vez. - Para cada nova linha, verifique se o timestamp é superior ao armazenado e, em seguida, aplique a seguinte lógica:
- Se maior, use os novos dados do destino.
- Caso contrário, mantenha os dados na fonte.
Primeiro, atualize o createTable método para armazenar o sequenceNum já que você o usará para a versão de cada linha:
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING, sequenceNum INT)
""")
Em seguida, atualize upsertToDelta para lidar com versões de linha. A cláusula UPDATE SET de MERGE INTO precisa lidar separadamente com cada coluna.
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Tratamento de exclusões
Infelizmente, o código ainda tem um problema. Não processa operações de DELETE, como evidenciado pelo fato de que o empregado Pat ainda está na tabela de destino.
Vamos supor que as exclusões cheguem no mesmo microlote. Para manipulá-los, atualize o upsertToDelta método novamente para excluir a linha quando o registro de dados de alteração indicar exclusão, conforme mostrado a seguir:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Tratamento de atualizações que chegam fora de ordem após exclusões
Infelizmente, o código acima ainda não está correto porque não lida com casos em que um DELETE é seguido por um fora de ordem UPDATE em microlotes.
O algoritmo para lidar com este caso precisa manter registro de exclusões para que possa gerir atualizações subsequentes fora de ordem. Para tal:
- Em vez de excluir linhas imediatamente, exclua-as com um carimbo de data/hora ou
sequenceNum. As linhas apagadas de forma suave são marcadas para eliminação. - Redirecione todos os utilizadores para uma vista que filtre as lápides.
- Crie um trabalho de limpeza que remova as lápides ao longo do tempo.
Utilize o seguinte código:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Seus usuários não podem usar a tabela de destino diretamente, portanto, crie uma exibição que eles possam consultar:
CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL
Por fim, crie um trabalho de limpeza que remova periodicamente as linhas com lápides:
DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY