Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Use a declaração AUTO CDC ... INTO para criar um fluxo que utiliza a funcionalidade de captura de dados de alteração (CDC) do Lakeflow Spark Declarative Pipelines. Esta declaração lê as alterações de uma fonte CDC e as aplica a um destino de streaming.
- Para saber mais sobre CDC, confira o que é CDC (captura de dados de alteração)?.
- Para obter mais detalhes sobre como usar
AUTO CDC, consulte as APIs AUTO CDC: Simplificação da captura de dados de alteração com fluxos de trabalho. - Para obter mais detalhes sobre
CREATE FLOW, consulte CREATE FLOW (pipelines).
Sintaxe
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Você define restrições de qualidade de dados para o destino usando a mesma CONSTRAINT cláusula que outras consultas de pipeline. Confira Gerenciar a qualidade dos dados com as expectativas do pipeline.
O comportamento padrão dos eventos INSERT e UPDATE é realizar um upsert dos eventos CDC da origem: atualizar quaisquer linhas na tabela de destino que correspondam às chaves especificadas ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de DELETE eventos pode ser especificada com a APPLY AS DELETE WHEN condição.
Importante
Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Para tabelas SCD tipo 2, ao especificar o esquema da tabela de destino, você também deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados do campo sequence_by.
Confira as APIs AUTO CDC: Simplifique a captura de alterações de dados com pipelines.
Parâmetros
flow_nameO nome do fluxo a ser criado.
sourceA fonte dos dados. A origem deve ser uma fonte de streaming . Use a palavra-chave STREAM para usar a semântica de streaming para ler a fonte. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler de fontes estáticas ou somente de acréscimos. Para ingerir dados que tenham confirmações de alterações, você pode usar Python e a opção
SkipChangeCommitspara lidar com erros.Para obter mais informações sobre dados de fluxo, consulte Transformar dados com pipelines.
KEYSA coluna ou combinação de colunas que identificam exclusivamente uma linha nos dados de origem. Os valores nessas colunas são usados para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino.
Para definir uma combinação de colunas, use uma lista separada por vírgulas de colunas.
Essa cláusula ia necessária.
IGNORE NULL UPDATESPermite a ingestão de atualizações que contêm um subconjunto das colunas de destino. Quando um evento CDC corresponde a uma linha existente e "IGNORAR ATUALIZAÇÕES NULAS" está especificado, colunas com um valor de
nullmanterão seus valores existentes no destino. Isso também se aplica a colunas aninhadas com valornull.Essa cláusula é opcional.
O padrão é substituir as colunas existentes pelos valores de
null.APPLY AS DELETE WHENEspecifica quando um evento CDC deve ser tratado como um
DELETEem vez de um upsert.Para fontes scd tipo 2, para lidar com dados fora de ordem, a linha excluída é temporariamente mantida como uma pedra de tumba na tabela Delta subjacente e uma exibição é criada no metastore que filtra essas pedras de tumba. O intervalo de retenção pode ser configurado com a propriedade da
pipelines.cdc.tombstoneGCThresholdInSecondstabela.Essa cláusula é opcional.
APPLY AS TRUNCATE WHENEspecifica quando um evento CDC deve ser tratado como uma tabela
TRUNCATEcompleta. Como essa cláusula dispara um truncamento completo da tabela de destino, ela deve ser usada apenas para casos específicos que requerem essa funcionalidade.A
APPLY AS TRUNCATE WHENcláusula tem suporte apenas para SCD tipo 1. O SCD tipo 2 não dá suporte à operação de truncação.Essa cláusula é opcional.
SEQUENCE BYO nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. O processamento em pipeline usa esse sequenciamento para gerenciar eventos de alteração que chegam fora de ordem.
Se várias colunas forem necessárias para sequenciamento, use uma
STRUCTexpressão: ela será ordenada primeiro pelo primeiro campo de struct, depois pelo segundo campo, se houver um empate e assim por diante.As colunas especificadas devem ser tipos de dados classificáveis.
Essa cláusula é necessária.
COLUMNSEspecifica um subconjunto de colunas a serem incluídas na tabela de destino. Você pode:
- Especifique a lista completa de colunas a serem incluídas:
COLUMNS (userId, name, city). - Especifique uma lista de colunas a serem excluídas:
COLUMNS * EXCEPT (operation, sequenceNum)
Essa cláusula é opcional.
O padrão é incluir todas as colunas na tabela de destino quando a
COLUMNScláusula não for especificada.- Especifique a lista completa de colunas a serem incluídas:
STORED ASSe deseja armazenar registros como SCD tipo 1 ou SCD tipo 2.
Essa cláusula é opcional.
O padrão é SCD tipo 1.
TRACK HISTORY ONEspecifica um subconjunto de colunas de saída para gerar registros de histórico quando houver alterações nessas colunas especificadas. Você pode:
- Especifique a lista completa de colunas a serem rastreadas:
COLUMNS (userId, name, city). - Especifique uma lista de colunas a serem excluídas do acompanhamento:
COLUMNS * EXCEPT (operation, sequenceNum)
Essa cláusula é opcional. O padrão é acompanhar o histórico de todas as colunas de saída quando houver alterações, equivalentes a
TRACK HISTORY ON *.- Especifique a lista completa de colunas a serem rastreadas:
Exemplos
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);