Compartilhar via


Manter pipelines de ingestão do PostgreSQL

Importante

O conector do PostgreSQL para Lakeflow Connect está em Versão Prévia Pública. Entre em contato com sua equipe de conta do Databricks para se inscrever na Versão Prévia Pública.

Esta página descreve as operações em andamento para manter os pipelines de ingestão do PostgreSQL.

Manutenção geral do pipeline

As tarefas de manutenção de pipeline nesta seção se aplicam a todos os conectores gerenciados no Lakeflow Connect.

Para tarefas gerais de manutenção de pipeline, consulte tarefas comuns de manutenção de pipeline.

Remover arquivos de preparo não utilizados

Para pipelines de ingestão criados após 6 de janeiro de 2025, os dados de preparo de volume são agendados automaticamente para exclusão após 25 dias e removidos fisicamente após 30 dias. Um pipeline de ingestão que não foi concluído com êxito por 25 dias ou mais pode resultar em lacunas de dados nas tabelas de destino. Para evitar lacunas, dispare uma atualização completa das tabelas de destino.

Para pipelines de ingestão criados antes de 6 de janeiro de 2025, entre em contato com o Suporte do Databricks para solicitar a habilitação manual do gerenciamento automático de retenção para preparar dados CDC.

Os seguintes dados são limpos automaticamente:

  • Arquivos de dados CDC
  • Arquivos de instantâneo
  • Dados da tabela de preparo

Manutenção específica do conector para o pipeline

As tarefas de manutenção de pipeline nesta seção são específicas para o conector postgreSQL.

Adicionar novas tabelas à replicação

Para adicionar novas tabelas a um fluxo de replicação existente:

  1. Conceda os privilégios necessários ao usuário de replicação. Para obter uma lista completa de privilégios necessários, consulte os requisitos de usuário do banco de dados PostgreSQL.

  2. Defina a identidade da réplica para as novas tabelas com base em sua estrutura. Consulte Definir identidade de réplica para tabelas para obter diretrizes sobre como escolher a configuração correta de identidade de réplica.

  3. Adicione as tabelas à publicação:

    ALTER PUBLICATION databricks_publication ADD TABLE schema_name.new_table;
    
  4. Atualize a configuração do pipeline de ingestão para incluir as novas tabelas. Você pode fazer isso por meio da interface do usuário do Azure Databricks ou atualizando o ingestion_definition comando do pacote ou da CLI do Databricks Asset Bundles.

  5. Reinicie o gateway de ingestão para identificar as novas tabelas. O gateway verifica periodicamente se há novas tabelas, mas reiniciar o gateway acelera o processo de descoberta.

Limpar os slots de replicação

Quando você exclui um pipeline de ingestão, ** o slot de replicação não é removido automaticamente do banco de dados PostgreSQL de origem **. Os slots de replicação não utilizados podem fazer com que os arquivos WAL (Write-Ahead Log) se acumulem, potencialmente preenchendo espaço em disco no banco de dados de origem.

Para listar todos os slots de replicação:

SELECT slot_name, slot_type, active, restart_lsn, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;

Para remover um slot de replicação que não é mais necessário:

SELECT pg_drop_replication_slot('slot_name');

Limpar o acompanhamento de DDL integrada

Se você desabilitar o acompanhamento de DDL inline, execute as etapas abaixo para cada banco de dados para limpar os objetos criados pelo script de auditoria.

  1. Remova os desencadeadores de eventos:

    DROP EVENT TRIGGER IF EXISTS lakeflow_ddl_trigger CASCADE;
    
  2. Remova a tabela de auditoria da publicação:

    ALTER PUBLICATION databricks_publication DROP TABLE public.lakeflow_ddl_audit;
    
  3. Remova a tabela de auditoria:

    DROP TABLE IF EXISTS public.lakeflow_ddl_audit CASCADE;
    

Monitorar slots de replicação

Monitore o status dos slots de replicação para garantir que eles estejam ativos e consumindo dados WAL:

SELECT slot_name,
       active,
       wal_status,
       active_pid,
       restart_lsn,
       confirmed_flush_lsn,
       pg_current_wal_lsn() AS current_lsn,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS replication_lag
FROM pg_replication_slots
WHERE slot_name LIKE 'databricks%';

Valores de retardo de replicação grandes podem indicar um dos seguintes problemas:

  • O gateway de ingestão não está acompanhando as alterações no banco de dados de origem.
  • O gateway de ingestão foi suspenso por um longo período.
  • Problemas de conectividade de rede entre o gateway e o banco de dados de origem.

Se um slot de replicação estiver inativo (active = false) e você tiver confirmado que o pipeline correspondente não é mais necessário, solte o slot de replicação para liberar os recursos. Veja Limpeza dos slots de replicação.

Monitorar o uso do disco WAL

Monitore o uso do disco do WAL (log de gravação antecipada) para evitar problemas de espaço em disco.

SELECT pg_size_pretty(sum(size)) AS wal_size
FROM pg_ls_waldir();

Para verificar a retenção de WAL para um slot de réplica específico:

SELECT slot_name,
       active,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS pending_wal
FROM pg_replication_slots
WHERE slot_name = 'your_slot_name';

Observação

Se max_slot_wal_keep_size for configurado corretamente durante a configuração do sistema de origem (conforme recomendado em Limitar a retenção de WAL para slots de replicação), os slots de replicação inativos não causarão crescimento ilimitado do WAL. O slot será invalidado quando o limite for atingido, evitando falhas no banco de dados.

Se o uso do disco WAL for alto, execute as seguintes etapas:

  1. Verifique se o gateway de ingestão de dados está executado continuamente.

  2. Verifique se há erros nos logs de gateway que podem estar impedindo que ele consuma dados WAL.

  3. Considere configurar max_slot_wal_keep_size para limitar a retenção WAL (PostgreSQL 13 ou superior):

    ALTER SYSTEM SET max_slot_wal_keep_size = '10GB';
    SELECT pg_reload_conf();
    

    Aviso

    A configuração max_slot_wal_keep_size pode fazer com que os slots de replicação sejam invalidados se o limite de retenção do WAL for excedido, exigindo um novo carregamento completo de todas as tabelas.

Reiniciar o gateway de ingestão

Para diminuir a carga no banco de dados de origem, o gateway de ingestão verifica periodicamente se há novas tabelas. Pode levar até 6 horas para o gateway descobrir novas tabelas. Se você quiser acelerar esse processo, reinicie o gateway.

Além disso, reinicie o gateway nas seguintes situações:

  • Você fez alterações de configuração no banco de dados de origem.
  • O gateway está enfrentando erros ou problemas de desempenho.

Atualizar publicações

Se você precisar modificar quais tabelas estão incluídas na replicação:

-- Add a table to the publication
ALTER PUBLICATION databricks_publication ADD TABLE schema_name.table_name;

-- Remove a table from the publication
ALTER PUBLICATION databricks_publication DROP TABLE schema_name.table_name;

-- List all tables in a publication
SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = 'databricks_publication';

Depois de atualizar a publicação, reinicie o gateway de ingestão para aplicar as alterações.