次の方法で共有


Azure Databricks へのインジェスト用に PostgreSQL を構成する

Important

Lakeflow Connect 用 PostgreSQL コネクタはパブリック プレビュー段階です。 パブリック プレビューに登録する場合は、Databricks アカウント チームにお問い合わせください。

このページでは、Lakeflow Connect を使用して PostgreSQL から Azure Databricks に取り込むためのソース セットアップ タスクについて説明します。

変更データ キャプチャの論理レプリケーション

PostgreSQL コネクタは、論理レプリケーションを使用してソース テーブルの変更を追跡します。 論理レプリケーションを使用すると、ソース データベースに対するトリガーや大きなオーバーヘッドを必要とせずに、コネクタでデータの変更 (挿入、更新、削除) をキャプチャできます。

Lakeflow PostgreSQL 論理レプリケーションには、次のものが必要です。

  1. Lakeflow Connect では、PostgreSQL バージョン 13 以降からのデータ レプリケーションがサポートされています。

  2. 論理レプリケーション用にデータベースを構成します。

    PostgreSQL パラメーター wal_levellogicalに設定する必要があります。

  3. レプリケートするすべてのテーブルを含むパブリケーションを作成します。

  4. レプリケートされるカタログごとにレプリケーション スロットを作成します。

レプリケーション スロットを作成する前に、パブリケーションを作成する必要があります。

論理レプリケーションの詳細については、PostgreSQL Web サイトの 論理レプリケーション に関するドキュメントを参照してください。

ソースセットアップタスクの概要

Azure Databricks にデータを取り込む前に、PostgreSQL で次のタスクを完了します。

  1. PostgreSQL 13 以降を確認する

  2. ネットワーク アクセス (セキュリティ グループ、ファイアウォール規則、または VPN) を構成する

  3. 論理レプリケーションを構成します。

    • 論理レプリケーションを有効にする (wal_level = logical)
  4. 省略可能: スキーマ変更の自動検出用にインライン DDL 追跡を構成します。 インライン DDL 追跡を選択する場合は、Databricks のサポートにお問い合わせください。

Important

複数の PostgreSQL データベースからレプリケートする場合は、データベースごとに個別のパブリケーションとレプリケーション スロットを作成する必要があります。 インライン DDL 追跡スクリプト (使用されている場合) も各データベースで実行する必要があります。

論理レプリケーションを構成する

PostgreSQL で論理レプリケーションを有効にするには、データベース設定を構成し、必要なオブジェクトを設定します。

WAL レベルを論理レベルに設定する

Write-Ahead ログ (WAL) は、論理レプリケーション用に構成する必要があります。 通常、この設定にはデータベースの再起動が必要です。

  1. 現在の wal_level 設定を確認します。

    SHOW wal_level;
    
  2. 値が logicalされていない場合は、サーバー構成で wal_level = logical 設定し、PostgreSQL サービスを再起動します。

レプリケーション ユーザーの作成

レプリケーション特権を持つ Databricks インジェスト専用ユーザーを作成します。

CREATE USER databricks_replication WITH PASSWORD 'your_secure_password';
GRANT CONNECT ON DATABASE your_database TO databricks_replication;
ALTER USER databricks_replication WITH REPLICATION;

特権の詳細な要件については、 PostgreSQL データベースのユーザー要件に関する記事を参照してください。

テーブルのレプリカ ID を設定する

レプリケートするテーブルごとに、レプリカ ID を構成します。 正しい設定は、テーブル構造によって異なります。

テーブル構造 必要なレプリカ ID Command
テーブルには主キーがあり、TOASTable 列 (たとえば、 TEXTBYTEA、大きな値を持つ VARCHAR(n) ) は含まれません。 DEFAULT ALTER TABLE schema_name.table_name REPLICA IDENTITY DEFAULT;
テーブルには主キーがありますが、大きな可変長 (TOASTable) 列が含まれています。 FULL ALTER TABLE schema_name.table_name REPLICA IDENTITY FULL;
テーブルに主キーがありません FULL ALTER TABLE schema_name.table_name REPLICA IDENTITY FULL;

レプリカ ID の設定の詳細については、PostgreSQL ドキュメントの レプリカ ID を 参照してください。

パブリケーションを作成する

レプリケートするテーブルを含むパブリケーションを各データベースに作成します。

-- Create a publication for specific tables
CREATE PUBLICATION databricks_publication FOR TABLE schema_name.table1, schema_name.table2;

-- Or create a publication for all tables in a database
CREATE PUBLICATION databricks_publication FOR ALL TABLES;

レプリケートする PostgreSQL データベースごとに個別のパブリケーションを作成する必要があります。

レプリケーション スロット のパラメーターを構成する

レプリケーション スロットを作成する前に、次のサーバー パラメーターを構成します。

レプリケーション スロットの WAL リテンション期間を制限する

パラメーター: max_slot_wal_keep_size

(既定値) にmax_slot_wal_keep_size-1。これにより、遅延または非アクティブなレプリケーション スロットによるリテンションにより無制限の WAL が肥大化する可能性があるためです。 ワークロードに応じて、このパラメーターを有限値に設定します。

max_slot_wal_keep_sizeパラメーターの詳細については、PostgreSQL の公式ドキュメントを参照してください。

一部のマネージド クラウド プロバイダーでは、このパラメーターの変更を許可せず、代わりに組み込みのスロット監視と自動クリーンアップに依存しています。 運用アラートを設定する前に、プラットフォームの動作を確認します。

詳細については、以下を参照してください。

レプリケーション スロットの容量を構成する

パラメーター: max_replication_slots

レプリケートされる各 PostgreSQL データベースには、1 つの論理レプリケーション スロットが必要です。 このパラメーターを、レプリケートするデータベースの数と、既存のレプリケーションのニーズに少なくとも設定します。

WAL 送信者を構成する

パラメーター: max_wal_senders

このパラメーターは、WAL データをサブスクライバーにストリーミングする同時 WAL 送信側プロセスの最大数を定義します。 ほとんどの場合、効率的で一貫性のあるデータ レプリケーションを確保するために、レプリケーション スロットごとに 1 つの WAL 送信側プロセスが必要です。

max_wal_sendersは、使用されているレプリケーション スロットの数と少なくとも同じになるように構成し、その他の既存の使用量を計算します。 運用の柔軟性を提供するために、少し高く設定することをお勧めします。

レプリケーション スロットを作成する

Databricks インジェスト ゲートウェイが変更の追跡に使用するレプリケーション スロットを各データベースに作成します。

-- Create a replication slot with the pgoutput plugin
SELECT pg_create_logical_replication_slot('databricks_slot', 'pgoutput');

Important

  • レプリケーション スロットは、コネクタによって使用されるまで WAL データを保持します。 WAL リテンション期間を制限し、無制限の WAL の増加を防ぐために、 max_slot_wal_keep_size パラメーターを構成します。 詳細については、 レプリケーション スロット パラメーターの構成 を参照してください。
  • インジェスト パイプラインを削除する場合は、関連付けられているレプリケーション スロットを手動で削除する必要があります。 レプリケーション スロットのクリーンアップを参照してください。

省略可能: インライン DDL 追跡を構成する

インライン DDL 追跡は、コネクタがソース データベースからのスキーマ変更を自動的に検出して適用できるようにするオプションの機能です。 この機能は、既定では無効化されています。

Warnung

インライン DDL 追跡は現在プレビュー段階であり、ワークスペースで有効にするには Databricks サポートに問い合わせる必要があります。

どのスキーマ変更が自動的に処理され、どのスキーマが完全に更新される必要があるかについては、「 マネージド コネクタでスキーマの進化をどのように処理するか」「スキーマの進化」を参照してください。

インライン DDL 追跡を設定する

ワークスペースでインライン DDL 追跡が有効になっている場合は、 各 PostgreSQL データベースで次の手順を実行します。

  1. lakeflow_pg_ddl_change_tracking.sql スクリプトをダウンロードして実行します。

    \i lakeflow_pg_ddl_change_tracking.sql
    
  2. トリガーと監査テーブルが正常に作成されたことを確認します。

    -- Check for the DDL audit table
    SELECT * FROM pg_tables WHERE tablename = 'lakeflow_ddl_audit';
    
    -- Check for the event triggers
    SELECT * FROM pg_event_trigger WHERE evtname LIKE 'lakeflow%';
    
  3. DDL 監査テーブルをパブリケーションに追加します。

    ALTER PUBLICATION databricks_publication ADD TABLE public.lakeflow_ddl_audit;
    

クラウド固有の構成に関する注意事項

AWS RDS と Aurora

  • rds.logical_replication パラメーターがパラメーター グループ内で1に設定されていることを確認します。

  • Databricks ワークスペースからの接続を許可するようにセキュリティ グループを構成します。

  • レプリケーション ユーザーには、 rds_replication ロールが必要です。

    GRANT rds_replication TO databricks_replication;
    

Azure Database for PostgreSQL

  • Azure portal または CLI を使用して、サーバー パラメーターで論理レプリケーションを有効にします。
  • Databricks ワークスペースからの接続を許可するようにファイアウォール規則を構成します。
  • フレキシブル サーバーでは、論理レプリケーションがサポートされています。 単一サーバーの場合は、サポートされているレベルを使用していることを確認します。

GCP Cloud SQL for PostgreSQL

  • インスタンス設定で cloudsql.logical_decoding フラグを有効にします。
  • Databricks ワークスペースからの接続を許可するように、承認されたネットワークを構成します。
  • pglogical 拡張機能を使用している場合は、 cloudsql.enable_pglogical フラグが on に設定されていることを確認します。

構成を確認する

セットアップ タスクが完了したら、論理レプリケーションが正しく構成されていることを確認します。

  1. wal_levellogicalに設定されていることを確認します。

    SHOW wal_level;
    
  2. レプリケーション ユーザーが replication 特権を持っていることを確認します。

    SELECT rolname, rolreplication FROM pg_roles WHERE rolname = 'databricks_replication';
    
  3. パブリケーションが存在することを確認します。

    SELECT * FROM pg_publication WHERE pubname = 'databricks_publication';
    
  4. レプリケーション スロットが存在することを確認します。

    SELECT slot_name, slot_type, active, restart_lsn
    FROM pg_replication_slots
    WHERE slot_name = 'databricks_slot';
    
  5. テーブルのレプリカ ID を確認します。

    SELECT schemaname, tablename, relreplident
    FROM pg_tables t
    JOIN pg_class c ON t.tablename = c.relname
    WHERE schemaname = 'your_schema';
    

    relreplident列には、FULL レプリカ ID のfが表示されます。

次のステップ

ソースのセットアップが完了したら、インジェスト ゲートウェイとパイプラインを作成して PostgreSQL からデータを取り込むことができます。 PostgreSQL からのデータの取り込みを参照してください。