次の方法で共有


PostgreSQL インジェスト パイプラインを維持する

Important

Lakeflow Connect 用 PostgreSQL コネクタはパブリック プレビュー段階です。 パブリック プレビューに登録する場合は、Databricks アカウント チームにお問い合わせください。

このページでは、PostgreSQL インジェスト パイプラインを維持するための継続的な操作について説明します。

パイプラインの一般的なメンテナンス

このセクションのパイプライン メンテナンス タスクは、Lakeflow Connect のすべてのマネージド コネクタに適用されます。

一般的なパイプラインメンテナンスタスクについては、 一般的なパイプラインメンテナンスタスクを参照してください。

未使用のステージング ファイルを削除する

2025 年 1 月 6 日以降に作成したインジェスト パイプラインの場合、ボリューム ステージング データは 25 日後に自動的に削除されるようにスケジュールされ、30 日後に物理的に削除されます。 25 日以上正常に完了していないインジェスト パイプラインでは、宛先テーブル内にデータ ギャップが発生する可能性があります。 ギャップを回避するには、ターゲット テーブルの完全な更新をトリガーします。

2025 年 1 月 6 日より前に作成されたインジェスト パイプラインの場合、Databricks サポートに問い合わせて、ステージング CDC データの自動保持管理の手動有効化を依頼してください。

次のデータが自動的にクリーンアップされます。

  • CDC データ ファイル
  • スナップショット ファイル
  • ステージング テーブル データ

コネクタ固有のパイプライン メンテナンス

このセクションのパイプライン メンテナンス タスクは、PostgreSQL コネクタに固有のものです。

レプリケーションに新しいテーブルを追加する

既存のレプリケーション フローに新しいテーブルを追加するには:

  1. レプリケーション ユーザーに必要な特権を付与します。 必要な特権の完全な一覧については、 PostgreSQL データベースのユーザー要件に関する説明を参照してください。

  2. 構造に基づいて、新しいテーブルのレプリカ ID を設定します。 正しい レプリカ ID 設定の 選択に関するガイダンスについては、テーブルのレプリカ ID の設定を参照してください。

  3. パブリケーションにテーブルを追加します。

    ALTER PUBLICATION databricks_publication ADD TABLE schema_name.new_table;
    
  4. インジェスト パイプラインの構成を更新して、新しいテーブルを含めます。 これを行うには、Azure Databricks UI を使用するか、Databricks Asset Bundles バンドルまたは CLI コマンドで ingestion_definition を更新します。

  5. インジェスト ゲートウェイを再起動して、新しいテーブルを検出します。 ゲートウェイは定期的に新しいテーブルをチェックしますが、ゲートウェイを再起動すると検出プロセスが高速化されます。

レプリケーション スロットをクリーンアップする

インジェスト パイプラインを削除しても、** レプリケーション スロットはソース PostgreSQL データベースから自動的に削除されません**。 未使用のレプリケーション スロットにより、Write-Ahead ログ (WAL) ファイルが蓄積され、ソース データベースのディスク領域がいっぱいになる可能性があります。

すべてのレプリケーション スロットを一覧表示するには:

SELECT slot_name, slot_type, active, restart_lsn, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;

不要になったレプリケーション スロットを削除するには:

SELECT pg_drop_replication_slot('slot_name');

インライン DDL トラッキングを整理する

インライン DDL 追跡を無効にする場合は、各データベースに対して以下の手順を実行して、監査スクリプトによって作成されたオブジェクトをクリーンアップします。

  1. イベント トリガーを削除します。

    DROP EVENT TRIGGER IF EXISTS lakeflow_ddl_trigger CASCADE;
    
  2. パブリケーションから監査テーブルを削除します。

    ALTER PUBLICATION databricks_publication DROP TABLE public.lakeflow_ddl_audit;
    
  3. 監査テーブルを削除します。

    DROP TABLE IF EXISTS public.lakeflow_ddl_audit CASCADE;
    

レプリケーション スロットの監視

レプリケーション スロットの状態を監視して、それらがアクティブで WAL データを使用していることを確認します。

SELECT slot_name,
       active,
       wal_status,
       active_pid,
       restart_lsn,
       confirmed_flush_lsn,
       pg_current_wal_lsn() AS current_lsn,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS replication_lag
FROM pg_replication_slots
WHERE slot_name LIKE 'databricks%';

レプリケーションのラグ値が大きいと、次のいずれかの問題が示される場合があります。

  • インジェスト ゲートウェイは、ソース データベースの変更に対応していません。
  • インジェスト ゲートウェイが長期間停止されました。
  • ゲートウェイとソース データベース間のネットワーク接続の問題。

レプリケーション スロットが非アクティブ (active = false) で、対応するパイプラインが不要になっていることを確認した場合は、レプリケーション スロットを削除してリソースを解放します。 レプリケーション スロットのクリーンアップを参照してください。

WAL ディスクの使用状況を監視する

Write-Ahead ログ (WAL) ディスクの使用状況を監視して、ディスク領域の問題を防ぎます。

SELECT pg_size_pretty(sum(size)) AS wal_size
FROM pg_ls_waldir();

特定のレプリケーション スロットの WAL リテンション期間を確認するには:

SELECT slot_name,
       active,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS pending_wal
FROM pg_replication_slots
WHERE slot_name = 'your_slot_name';

ソースのセットアップ中に max_slot_wal_keep_size が適切に構成されている場合 ( レプリケーション スロットの WAL リテンション期間の制限で推奨)、非アクティブなレプリケーション スロットは無制限の WAL 増加を引き起こしません。 制限に達するとスロットは無効になり、データベースのクラッシュを防ぎます。

WAL ディスク使用率が高い場合は、次の手順を実行します。

  1. インジェスト ゲートウェイが継続的に実行されていることを確認します。

  2. ゲートウェイ ログで、WAL データの使用を妨げている可能性のあるエラーを確認します。

  3. WAL リテンション期間 (PostgreSQL 13 以上) を制限するように max_slot_wal_keep_size を設定することを検討してください。

    ALTER SYSTEM SET max_slot_wal_keep_size = '10GB';
    SELECT pg_reload_conf();
    

    Warnung

    max_slot_wal_keep_size設定すると、WAL リテンション期間の制限を超えた場合にレプリケーション スロットが無効になり、すべてのテーブルを完全に更新する必要があります。

インジェスト ゲートウェイを再起動する

ソース データベースの負荷を減らすため、インジェスト ゲートウェイは定期的に新しいテーブルのみをチェックします。 ゲートウェイが新しいテーブルを検出するまでに最大 6 時間かかる場合があります。 このプロセスを高速化する場合は、ゲートウェイを再起動します。

さらに、次の状況でゲートウェイを再起動します。

  • ソース データベースの構成を変更しました。
  • ゲートウェイでエラーまたはパフォーマンスの問題が発生しています。

パブリケーションを更新する

レプリケーションに含めるテーブルを変更する必要がある場合:

-- Add a table to the publication
ALTER PUBLICATION databricks_publication ADD TABLE schema_name.table_name;

-- Remove a table from the publication
ALTER PUBLICATION databricks_publication DROP TABLE schema_name.table_name;

-- List all tables in a publication
SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = 'databricks_publication';

パブリケーションを更新した後、インジェスト ゲートウェイを再起動して変更を適用します。