Partilhar via


Manter os pipelines de ingestão do PostgreSQL

Importante

O conector PostgreSQL para Lakeflow Connect está em Visualização Pública. Entre em contato com a sua equipa de conta Databricks para se inscrever na Pré-visualização Pública.

Esta página descreve as operações em curso para manter os pipelines de ingestão do PostgreSQL.

Manutenção geral de tubulações

As tarefas de manutenção de pipeline nesta seção se aplicam a todos os conectores gerenciados no Lakeflow Connect.

Para tarefas gerais de manutenção de oleodutos, veja Tarefas comuns de manutenção de oleodutos.

Remover ficheiros de preparação não utilizados

Para os canais de ingestão que criar após 6 de janeiro de 2025, os dados de volume staging são automaticamente agendados para eliminação após 25 dias e removidos fisicamente após 30 dias. Um processo de ingestão que não foi concluído com êxito há 25 dias ou mais pode resultar em lacunas de dados nas tabelas de destino. Para evitar lacunas, acione uma atualização completa das tabelas-alvo.

Para pipelines de ingestão criados antes de 6 de janeiro de 2025, entre em contato com o Suporte Databricks para solicitar a ativação manual do gerenciamento automático de retenção para o faseamento de dados CDC.

Os seguintes dados são limpos automaticamente:

  • Arquivos de dados CDC
  • Arquivos de instantâneo
  • Dados da tabela de preparo

Manutenção de tubulação específica do conector

As tarefas de manutenção do pipeline nesta secção são específicas do conector PostgreSQL.

Adicionar novas tabelas à replicação

Para adicionar novas tabelas a um fluxo de replicação existente:

  1. Conceder os privilégios necessários ao utilizador de replicação. Para uma lista completa de privilégios obrigatórios, consulte os requisitos do utilizador da base de dados PostgreSQL.

  2. Defina a identidade da réplica para as novas tabelas com base na sua estrutura. Consulte Definir identidade de réplica para tabelas para orientações sobre como escolher a definição correta de identidade de réplica.

  3. Adicione as tabelas à publicação:

    ALTER PUBLICATION databricks_publication ADD TABLE schema_name.new_table;
    
  4. Atualize a configuração do pipeline de ingestão para incluir as novas tabelas. Pode fazer isto através do interface do Azure Databricks ou ao atualizar o ingestion_definition no seu pacote de Databricks Asset Bundles ou no seu comando de CLI.

  5. Reinicie o gateway de ingestão para descobrir as novas tabelas. O gateway verifica periodicamente a existência de novas tabelas, mas reiniciar o gateway acelera o processo de descoberta.

Limpar os slots de replicação

Quando elimina um pipeline de ingestão, ** o slot de replicação não é automaticamente removido da base de dados PostgreSQL de origem **. Slots de replicação não utilizados podem causar a acumulação de ficheiros de Write-Ahead Log (WAL), podendo potencialmente preencher o espaço em disco na base de dados de origem.

Para listar todos os slots de replicação:

SELECT slot_name, slot_type, active, restart_lsn, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;

Para eliminar um slot de replicação que já não é necessário:

SELECT pg_drop_replication_slot('slot_name');

Limpar o rastreamento DDL integrado

Se desativar o rastreamento DDL inline, execute os passos abaixo para cada base de dados para limpar os objetos criados pelo script de auditoria.

  1. Elimina os gatilhos de eventos:

    DROP EVENT TRIGGER IF EXISTS lakeflow_ddl_trigger CASCADE;
    
  2. Remover a tabela de auditoria da publicação:

    ALTER PUBLICATION databricks_publication DROP TABLE public.lakeflow_ddl_audit;
    
  3. Abandone a tabela de auditoria:

    DROP TABLE IF EXISTS public.lakeflow_ddl_audit CASCADE;
    

Slots de replicação do monitor

Monitorize o estado dos slots de replicação para garantir que estão ativos e a consumir dados WAL:

SELECT slot_name,
       active,
       wal_status,
       active_pid,
       restart_lsn,
       confirmed_flush_lsn,
       pg_current_wal_lsn() AS current_lsn,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS replication_lag
FROM pg_replication_slots
WHERE slot_name LIKE 'databricks%';

Valores elevados de atraso de replicação podem indicar um dos seguintes problemas:

  • O gateway de ingestão não acompanha as alterações na base de dados de origem.
  • O portal de ingestão foi interrompido por um período prolongado.
  • Problemas de conectividade de rede entre o gateway e a base de dados de origem.

Se um slot de replicação estiver inativo (active = false) e tiver confirmado que o pipeline correspondente já não é necessário, elimine o slot de replicação para libertar os recursos. Consulte Limpeza dos slots de replicação.

Monitorizar a utilização do disco WAL

Monitorizar a utilização do disco Write-Ahead Log (WAL) para evitar problemas de espaço no disco:

SELECT pg_size_pretty(sum(size)) AS wal_size
FROM pg_ls_waldir();

Para verificar a retenção do WAL para um slot específico de replicação:

SELECT slot_name,
       active,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS pending_wal
FROM pg_replication_slots
WHERE slot_name = 'your_slot_name';

Observação

Se max_slot_wal_keep_size estiver devidamente configurado durante a configuração da fonte (conforme recomendado em Limitar a retenção WAL para slots de replicação), os slots de replicação inativos não causarão crescimento WAL ilimitado. O slot será invalidado quando o limite for atingido, prevenindo falhas na base de dados.

Se o uso de disco WAL for elevado, execute os seguintes passos:

  1. Verifique se o gateway de ingestão está a funcionar continuamente.

  2. Verifique os registos do gateway para erros que possam estar a impedir o consumo de dados WALL.

  3. Considere definir max_slot_wal_keep_size para limitar a retenção de WAL (PostgreSQL 13 ou superior):

    ALTER SYSTEM SET max_slot_wal_keep_size = '10GB';
    SELECT pg_reload_conf();
    

    Advertência

    A configuração max_slot_wal_keep_size pode invalidar os slots de replicação se for ultrapassado o limite de retenção de WAL, exigindo uma atualização completa de todas as tabelas.

Reinicie o gateway de ingestão

Para diminuir a carga no banco de dados de origem, o gateway de ingestão verifica apenas periodicamente se há novas tabelas. Pode demorar até 6 horas para o gateway descobrir novas mesas. Se você quiser acelerar esse processo, reinicie o gateway.

Adicionalmente, reinicie o gateway nas seguintes situações:

  • Fizeste alterações de configuração à base de dados de origem.
  • O gateway está a apresentar erros ou problemas de desempenho.

Atualizar publicações

Se precisar de modificar quais as tabelas incluídas na replicação:

-- Add a table to the publication
ALTER PUBLICATION databricks_publication ADD TABLE schema_name.table_name;

-- Remove a table from the publication
ALTER PUBLICATION databricks_publication DROP TABLE schema_name.table_name;

-- List all tables in a publication
SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = 'databricks_publication';

Após atualizar a publicação, reinicie o gateway de ingestão para aplicar as alterações.