Important
Lakeflow Connect 用 PostgreSQL コネクタはパブリック プレビュー段階です。 パブリック プレビューに登録する場合は、Databricks アカウント チームにお問い合わせください。
このページでは、PostgreSQL インジェスト パイプラインを維持するための継続的な操作について説明します。
パイプラインの一般的なメンテナンス
このセクションのパイプライン メンテナンス タスクは、Lakeflow Connect のすべてのマネージド コネクタに適用されます。
一般的なパイプラインメンテナンスタスクについては、 一般的なパイプラインメンテナンスタスクを参照してください。
未使用のステージング ファイルを削除する
2025 年 1 月 6 日以降に作成したインジェスト パイプラインの場合、ボリューム ステージング データは 25 日後に自動的に削除されるようにスケジュールされ、30 日後に物理的に削除されます。 25 日以上正常に完了していないインジェスト パイプラインでは、宛先テーブル内にデータ ギャップが発生する可能性があります。 ギャップを回避するには、ターゲット テーブルの完全な更新をトリガーします。
2025 年 1 月 6 日より前に作成されたインジェスト パイプラインの場合、Databricks サポートに問い合わせて、ステージング CDC データの自動保持管理の手動有効化を依頼してください。
次のデータが自動的にクリーンアップされます。
- CDC データ ファイル
- スナップショット ファイル
- ステージング テーブル データ
コネクタ固有のパイプライン メンテナンス
このセクションのパイプライン メンテナンス タスクは、PostgreSQL コネクタに固有のものです。
レプリケーションに新しいテーブルを追加する
既存のレプリケーション フローに新しいテーブルを追加するには:
レプリケーション ユーザーに必要な特権を付与します。 必要な特権の完全な一覧については、 PostgreSQL データベースのユーザー要件に関する説明を参照してください。
構造に基づいて、新しいテーブルのレプリカ ID を設定します。 正しい レプリカ ID 設定の 選択に関するガイダンスについては、テーブルのレプリカ ID の設定を参照してください。
パブリケーションにテーブルを追加します。
ALTER PUBLICATION databricks_publication ADD TABLE schema_name.new_table;インジェスト パイプラインの構成を更新して、新しいテーブルを含めます。 これを行うには、Azure Databricks UI を使用するか、Databricks Asset Bundles バンドルまたは CLI コマンドで
ingestion_definitionを更新します。インジェスト ゲートウェイを再起動して、新しいテーブルを検出します。 ゲートウェイは定期的に新しいテーブルをチェックしますが、ゲートウェイを再起動すると検出プロセスが高速化されます。
レプリケーション スロットをクリーンアップする
インジェスト パイプラインを削除しても、** レプリケーション スロットはソース PostgreSQL データベースから自動的に削除されません**。 未使用のレプリケーション スロットにより、Write-Ahead ログ (WAL) ファイルが蓄積され、ソース データベースのディスク領域がいっぱいになる可能性があります。
すべてのレプリケーション スロットを一覧表示するには:
SELECT slot_name, slot_type, active, restart_lsn, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;
不要になったレプリケーション スロットを削除するには:
SELECT pg_drop_replication_slot('slot_name');
インライン DDL トラッキングを整理する
インライン DDL 追跡を無効にする場合は、各データベースに対して以下の手順を実行して、監査スクリプトによって作成されたオブジェクトをクリーンアップします。
イベント トリガーを削除します。
DROP EVENT TRIGGER IF EXISTS lakeflow_ddl_trigger CASCADE;パブリケーションから監査テーブルを削除します。
ALTER PUBLICATION databricks_publication DROP TABLE public.lakeflow_ddl_audit;監査テーブルを削除します。
DROP TABLE IF EXISTS public.lakeflow_ddl_audit CASCADE;
レプリケーション スロットの監視
レプリケーション スロットの状態を監視して、それらがアクティブで WAL データを使用していることを確認します。
SELECT slot_name,
active,
wal_status,
active_pid,
restart_lsn,
confirmed_flush_lsn,
pg_current_wal_lsn() AS current_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS replication_lag
FROM pg_replication_slots
WHERE slot_name LIKE 'databricks%';
レプリケーションのラグ値が大きいと、次のいずれかの問題が示される場合があります。
- インジェスト ゲートウェイは、ソース データベースの変更に対応していません。
- インジェスト ゲートウェイが長期間停止されました。
- ゲートウェイとソース データベース間のネットワーク接続の問題。
レプリケーション スロットが非アクティブ (active = false) で、対応するパイプラインが不要になっていることを確認した場合は、レプリケーション スロットを削除してリソースを解放します。
レプリケーション スロットのクリーンアップを参照してください。
WAL ディスクの使用状況を監視する
Write-Ahead ログ (WAL) ディスクの使用状況を監視して、ディスク領域の問題を防ぎます。
SELECT pg_size_pretty(sum(size)) AS wal_size
FROM pg_ls_waldir();
特定のレプリケーション スロットの WAL リテンション期間を確認するには:
SELECT slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS pending_wal
FROM pg_replication_slots
WHERE slot_name = 'your_slot_name';
注
ソースのセットアップ中に max_slot_wal_keep_size が適切に構成されている場合 ( レプリケーション スロットの WAL リテンション期間の制限で推奨)、非アクティブなレプリケーション スロットは無制限の WAL 増加を引き起こしません。 制限に達するとスロットは無効になり、データベースのクラッシュを防ぎます。
WAL ディスク使用率が高い場合は、次の手順を実行します。
インジェスト ゲートウェイが継続的に実行されていることを確認します。
ゲートウェイ ログで、WAL データの使用を妨げている可能性のあるエラーを確認します。
WAL リテンション期間 (PostgreSQL 13 以上) を制限するように
max_slot_wal_keep_sizeを設定することを検討してください。ALTER SYSTEM SET max_slot_wal_keep_size = '10GB'; SELECT pg_reload_conf();Warnung
max_slot_wal_keep_size設定すると、WAL リテンション期間の制限を超えた場合にレプリケーション スロットが無効になり、すべてのテーブルを完全に更新する必要があります。
インジェスト ゲートウェイを再起動する
ソース データベースの負荷を減らすため、インジェスト ゲートウェイは定期的に新しいテーブルのみをチェックします。 ゲートウェイが新しいテーブルを検出するまでに最大 6 時間かかる場合があります。 このプロセスを高速化する場合は、ゲートウェイを再起動します。
さらに、次の状況でゲートウェイを再起動します。
- ソース データベースの構成を変更しました。
- ゲートウェイでエラーまたはパフォーマンスの問題が発生しています。
パブリケーションを更新する
レプリケーションに含めるテーブルを変更する必要がある場合:
-- Add a table to the publication
ALTER PUBLICATION databricks_publication ADD TABLE schema_name.table_name;
-- Remove a table from the publication
ALTER PUBLICATION databricks_publication DROP TABLE schema_name.table_name;
-- List all tables in a publication
SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = 'databricks_publication';
パブリケーションを更新した後、インジェスト ゲートウェイを再起動して変更を適用します。