次の方法で共有


Microsoft Dynamics 365 インジェストのデータ ソースを構成する

Important

この機能は パブリック プレビュー段階です。 プレビューの登録は、[ プレビュー ] ページで確認できます。 Azure Databricks プレビューの管理を参照してください。

このページでは、Lakeflow Connect を使用して Azure Databricks に取り込むためのデータ ソースとして Microsoft Dynamics 365 を設定する方法について説明します。 Dynamics 365 コネクタは、Azure Synapse Link for Dataverse を使用して Azure Data Lake Storage (ADLS) Gen2 にデータをエクスポートし、Azure Databricks が取り込みます。

[前提条件]

D365 データ ソースを構成する前に、次のものが必要です。

  • リソースを作成するアクセス許可を持つアクティブな Azure サブスクリプション。
  • 管理者アクセス権を持つ Microsoft Dynamics 365 環境。
  • D365 インスタンスに関連付けられている Dataverse 環境。
  • Azure Databricks のワークスペース管理者またはメタストア管理者のアクセス許可。
  • Dataverse 環境で Azure Synapse Link を作成および構成するためのアクセス許可。
  • ADLS Gen2 ストレージ アカウント (またはアカウントを作成するためのアクセス許可)。
  • Microsoft Entra ID アプリケーションを作成および構成するためのアクセス許可。
  • Dataverse API v9.2 以降。
  • Azure Storage REST API バージョン 2021-08-06。
  • Azure Synapse Link for Dataverse バージョン 1.0 以降。

どの Dynamics 365 アプリケーションがサポートされていますか?

Dynamics 365 コネクタでは、次の 2 つのカテゴリのアプリケーションがサポートされています。

Dataverse ネイティブ アプリケーション: 次のようなアプリはデータを Dataverse に直接格納し、仮想エンティティや直接テーブルは必要ありません。

  • Dynamics 365 Sales
  • Dynamics 365 顧客サービス
  • Dynamics 365 Marketing
  • Dynamics 365 Field Service

Dataverse 以外のネイティブ アプリケーション: 次のようなアプリでは、Dataverse でデータを公開するために仮想エンティティまたは直接テーブルが必要です。

  • Dynamics 365 Finance & Operations (F&O)

手順 1: 仮想エンティティまたは直接テーブルを構成する (省略可能)

仮想エンティティと直接テーブルを使用すると、Dataverse 以外のソース (D365 Finance & Operations など) のデータを、データをコピーせずに Dataverse で使用できるようになります。 Dataverse 以外のソースの場合は、Azure Synapse Link を設定する前に、仮想エンティティまたは直接テーブルを構成する必要があります。

仮想エンティティを構成するには:

  1. Power Apps で、[環境] ページに移動し、[Dynamics 365 アプリ] をクリックします。
  2. Dataverse で F&O エンティティを仮想エンティティとしてリンクするには、 Finance and Operations Virtual Entity ソリューションをインストールします。
  3. Dataverse と F&O アプリケーションの間でサービス間 (S2S) 承認を設定します。 これにより、Dataverse はアプリケーションと通信できます。 詳細については、Microsoft のドキュメントを参照してください。
  1. 取り込む仮想エンティティごとに、[詳細プロパティ] で [変更履歴の追跡] を有効にします。
  2. 既定では、F&O 仮想エンティティ ソリューションは、Dataverse テーブルの一覧で既定で一部の仮想エンティティを公開します。 ただし、追加のエンティティは手動で公開できます。
    1. Dataverse 環境の [詳細設定] ページに移動します。
    2. 右上にあるフィルター アイコンをクリックして、高度な検索にアクセスします。
    3. ドロップダウン メニューから [利用可能な財務エンティティ] と [操作エンティティ ] を選択し、[ 結果] をクリックします。
    4. 公開する仮想エンティティを選択します。
    5. [ エンティティ管理者 ] ページで、[ 表示][True] に切り替え、[ 保存して閉じる] をクリックします。

これで、dataverse テーブルの一覧に、 mserp_で始まる名前のエンティティを表示できるようになりました。

Important

仮想エンティティと直接テーブルは、Azure Synapse Link に表示される前に適切に構成され、同期されている必要があります。 それらを利用できるようになるまでしばらく時間を置く必要があります。

この手順では、 Azure Data Lake への Synapse Link for Dataverse を 使用して、取り込むテーブルを選択します。

Dataverse から Azure Data Lake への Synapse Link は、以前は Azure Data Lake Storage Gen2 へのデータのエクスポートと呼ばれるサービスを置き換えます。 名前付けにもかかわらず、この機能は Azure Synapse Analytics を使用したり依存したりしません。Dataverse から ADLS Gen 2 への継続的なエクスポート サービスです。

既定では、現在、Microsoft は Synapse Link プロファイルあたり最大 1,000 個のテーブルをサポートしています。 アプリケーションでさらにテーブルが選択されている場合は、複数のプロファイルを作成する必要があります。

  • Power Apps ポータルで、[分析] をクリックし、[Azure Synapse にリンク] をクリックします。

  • [ 新しいリンク] をクリックします。 Dataverse は、同じテナントから有効なサブスクリプションを自動的に反映します。 ドロップダウンから適切なサブスクリプションを選択します。

  • [ Connect to your Azure Synapse Analytics Workspace]\(Azure Synapse Analytics ワークスペースに接続 する\) チェック ボックスをオンにしないでください。 (コネクタは現在 Parquet をサポートしていないため、データは CSV に格納されている必要があります)。

    このコネクタを使用するには、別の Synapse Link プロファイルに既にリンクされている既存のストレージ アカウントに Dataverse テーブルを追加することはできません。 新しい Synapse Link プロファイルを作成するには、リンクされていない Azure サブスクリプションにアクセスできる必要があります。

  • [ Synapse Link Creation]\(Synapse リンクの作成\) ページで、[ 詳細設定] をクリックします。 次に、[ 詳細な構成設定の表示] を切り替えます。

  • [ 増分更新フォルダー構造を有効にする] を切り替え、目的の Synapse Link 更新間隔を設定します。 最小値は 5 分です。 この間隔は、この Synapse Link に含まれるすべてのテーブルに適用されます。 (別の手順で Databricks パイプラインのスケジュールを設定します)。

  • 同期するテーブルを選択します。 [追加のみ] と [ パーティション ] の設定は既定値のままにします。

    • Dataverse ネイティブ アプリから取り込む場合は、Dataverse セクションから関連する Dataverse テーブルを直接選択します。
    • F&O から取り込む場合は、 D365 Finance & Operations セクションから直接テーブルを選択するか、 Dataverse セクション (プレフィックス mserp_) から仮想エンティティを選択できます。 仮想エンティティの詳細については、 手順 1 を参照してください。
  • [保存] をクリックします。

  • 変更を保存すると、 最初の Synapse Link 同期が開始されます。

    F&O ユーザーの場合、この初期同期は、数百ギガバイトの大きなテーブルに対して数時間かかる場合があります。 ただし、エンティティの初期同期に時間がかかりすぎる場合は、F&Oアプリを使用してテーブルにインデックスを作成することで、同期プロセスを高速化できます。

    • F&O 環境でインデックスを作成するテーブルに移動します。
    • テーブルの拡張機能を作成します。
    • テーブル拡張機能内で、新しいインデックスを定義します。
    • インデックスに含めるフィールドを追加します。 (これは、これらのフィールドに基づいてデータベース検索を高速化するのに役立ちます)。
    • 変更を保存して F&O 環境にデプロイします。

手順 3: インジェスト用の Entra ID アプリケーションを作成する

この手順では、Databricks へのインジェストをサポートする Unity カタログ接続を作成するために必要な Entra ID 情報を収集します。

  • 右側のパネルに表示されている Entra ID テナントの テナント ID (portal.azure.com>>Microsoft Entra ID>>Overview タブ>>テナント ID) を収集します。

  • Azure Synapse Link を作成すると、選択したテーブルを同期するための ADLS コンテナーが Azure Synapse によって作成されます。 Synapse Link の管理ページにアクセスして、ADLS コンテナー名を見つけます。

  • ADLS コンテナーのアクセス資格情報を収集します。

    • Microsoft Entra ID アプリをまだ作成していない場合は作成します。
    • クライアント シークレットを収集します。
    • アプリ ID (portal.azure.com>>Microsoft Entra ID>>管理>>アプリ登録) を収集します。
  • ADLS コンテナーへのアクセス権を Entra ID アプリに付与します (まだアクセス権がない場合)。

    Entra ID アプリケーションが、各 Synapse Link プロファイルに関連付けられている ADLS コンテナーにアクセスできることを確認します。 複数の環境またはアプリケーションからデータを取り込む場合は、アプリケーションに関連するすべてのコンテナーにロールの割り当てがあることを確認します。

    • Azure ストレージ アカウントに移動し、コンテナーまたはストレージ アカウントを選択します。 (Databricks では、最小限の特権を維持するためにコンテナー レベルを推奨しています)。

    • [ アクセス制御 (IAM)]、[ ロールの割り当ての追加] の順にクリックします。

    • ストレージ BLOB 共同作成者→読み取り/書き込み/削除アクセス ロールを選択します。 (組織でこれを許可していない場合は、Databricks アカウント チームに連絡してください)。

    • [ 次へ]、[メンバーの選択] の順に クリックします

    • [ ユーザー、グループ、またはサービス プリンシパル] を選択し、[ アプリの登録を検索] を選択します。 (検索結果にアプリが存在しない場合は、検索バーにオブジェクト ID を明示的に入力し、 Enter キーを押します)。

    • [ Review + Assign をクリックします。

    • アクセス許可が正しく構成されていることを確認するには、コンテナーの アクセス制御を確認します。

手順 4: Dynamics 365 パイプラインを作成する

オプション A: UI を使用する

左側のメニューで、[ 新規] をクリックし、[ データの追加またはアップロード] をクリックします。

[ データの追加] ページで、[ Dynamics 365 ] タイルをクリックします。

そこからウィザードの指示に従います。

オプション B: API を使用する

B1. Dynamics 365 接続を作成する この手順では、D365 資格情報を安全に格納し、Databricks へのインジェストを開始するための Unity カタログ接続を作成します。

  • ワークスペースで、[ Catalog>>External Data>>Connections>>Create Connection] をクリックします。

  • 一意の接続名を指定し、接続の種類として Dynamics 365 を選択します。

  • 前の手順で作成した Entra ID アプリの クライアント シークレットクライアント ID を入力します。 スコープを変更しないでください。 [次へ] をクリックします。

  • Azure ストレージ アカウント名テナント IDADLS コンテナー名を入力し、[接続の作成] をクリックします。

  • 接続名を書き留めます。

B2。 Dynamics 365 パイプラインを作成する

この手順では、インジェスト パイプラインを設定します。 取り込まれた各テーブルは、宛先内で同じ名前の対応するストリーミング テーブルを取得します。

インジェスト パイプラインを作成するには、次の 2 つのオプションがあります。

  • ノートブックを使用する
  • Databricks コマンド ライン インターフェイスを使用する (CLI)

どちらの方法でも、パイプラインを作成する Databricks サービスへの API 呼び出しが行われます。

ノートブックを使用する場合:

  1. 付録のノートブック テンプレートをコピーします。
  2. ノートブックの最初のセルを変更せずに実行します。
  3. パイプラインの詳細 (取り込むテーブル、データを格納する場所など) を使用して、ノートブックの 2 番目のセルを変更します。
  4. テンプレート ノートブックの 2 番目のセルを実行します。これにより、create_pipeline が実行されます。
  5. list_pipelineを実行して、パイプライン ID とその詳細を表示できます。
  6. edit_pipelineを実行してパイプライン定義を編集できます。
  7. delete_pipelineを実行してパイプラインを削除できます。

Databricks CLI を使用する場合:

パイプラインを作成するために

databricks pipelines create --json "<pipeline_definition OR json file path>"

パイプラインを編集するには:

databricks pipelines update --json "<<pipeline_definition OR json file path>"

パイプライン定義を取得するには:

databricks pipelines get "<your_pipeline_id>"

パイプラインを削除するには:

databricks pipelines delete "<your_pipeline_id>"

詳細については、いつでも次のコマンドを実行できます。

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

B3. パイプラインを開始し、スケジュールを設定し、アラートを設定する

パイプラインが作成されたら、Databricks ワークスペースに再びアクセスします。

  1. 左側のパネルで、[ パイプライン] をクリックします。 新しいパイプラインがこの一覧に表示されます。
  2. パイプラインの詳細を表示するには、パイプラインの名前をクリックします。
  3. パイプラインの詳細ページで、[ 開始] をクリックすると、パイプラインをすぐに実行できます。 [スケジュール] をクリックしてパイプラインを スケジュールすることもできます。 スケジュール設定の詳細については、ジョブ用のパイプラインタスクを参照してください。

スケジュールに通知を追加して、パイプラインに通知を設定することもできます。

  1. パイプラインの詳細ページで、[ スケジュール] をクリックします。 次に、スケジュールのいずれかを選択します。
  2. パイプラインのスケジュールを管理するジョブに到達します。 右側のパネルで、[ジョブ通知] で必要な通知を設定 します

最後に、任意のスケジュールの通知を追加できます。

  1. 新しいスケジュールを作成するには、[ スケジュール] をクリックします。
  2. そのスケジュールに通知を追加するには、[ その他のオプション] をクリックします。

手順 5: 追加機能を構成する (省略可能)

コネクタには、履歴追跡用の SCD タイプ 2、列レベルの選択と選択解除、CI/CD 用の Databricks アセット バンドルなどの追加機能が用意されています。 例については、 付録 を参照してください。

付録

ノートブック テンプレート

セル 1

このセルは変更しないでください。

# DO NOT MODIFY

# This sets up the API utils for creating managed ingestion pipelines in Databricks.

import requests
import json

notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
api_token = notebook_context.apiToken().get()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"

headers = {
    'Authorization': 'Bearer {}'.format(api_token),
    'Content-Type': 'application/json'
}

def check_response(response):
    if response.status_code == 200:
        print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
    else:
        print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")

def create_pipeline(pipeline_definition: str):
  response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
  check_response(response)

def edit_pipeline(id: str, pipeline_definition: str):
  response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
  check_response(response)

def delete_pipeline(id: str):
  response = requests.delete(url=f"{api_url}/{id}", headers=headers)
  check_response(response)

def list_pipeline(filter: str):
  body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
  response = requests.get(url=api_url, headers=headers, data=body)
  check_response(response)

def get_pipeline(id: str):
  response = requests.get(url=f"{api_url}/{id}", headers=headers)
  check_response(response)

def start_pipeline(id: str, full_refresh: bool=False):
  body = f"""
  {{
    "full_refresh": {str(full_refresh).lower()},
    "validate_only": false,
    "cause": "API_CALL"
  }}
  """
  response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
  check_response(response)

def stop_pipeline(id: str):
  print("cannot stop pipeline")

セル 2

次のコードでは、プレビュー チャネルを変更しないでください。

Azure Synapse Link によって同期されたすべてのテーブルを取り込む場合は、スキーマ レベルの仕様を使用します (ただし、パイプラインごとに 250 を超えるテーブルを追加することは Databricks では推奨されません)。特定のテーブルのみを取り込む場合は、テーブル レベルの仕様を使用します。

ソース テーブル名が、Synapse Link 管理ページの [名前] 列に表示されるテーブル名と一致していることを確認します。

# Option A: schema-level spec
pipeline_spec = """
{
 "name": "<YOUR_PIPELINE_NAME>",
 "ingestion_definition": {
     "connection_name": "<YOUR_CONNECTION_NAME>",
     "objects": [
        {
          "schema": {
            "source_schema": "objects",
            "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
            "destination_schema": "<YOUR_DATABRICKS_SCHEMA>"
          }
        }
      ]
 },
 "channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)
# Option B: table-level spec
pipeline_spec = """
{
 "name": "<YOUR_PIPELINE_NAME>",
 "ingestion_definition": {
     "connection_name": "<YOUR_CONNECTION_NAME>",
     "objects": [
        {
          "table": {
            "source_schema": "objects",
            "source_table": "<YOUR_F_AND_O_TABLE_NAME>",
            "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
            "destination_schema": "<YOUR_DATABRICKS_SCHEMA>"
          }
        }
      ]
 },
 "channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)

例: SCD タイプ 2

既定では、API は SCD 型 1 を使用します。 ソースでデータが編集された場合、それに応じて宛先のデータが上書きされます。 履歴データを保持し、SCD タイプ 2 を使用する場合は、構成で指定します。 例えば次が挙げられます。

# Schema-level spec with SCD type 2
pipeline_spec = """
{
 "name": "<YOUR_PIPELINE_NAME>",
 "ingestion_definition": {
     "connection_name": "<YOUR_CONNECTION_NAME>",
     "objects": [
        {
          "schema": {
            "source_schema": "objects",
            "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
            "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
        "table_configuration": {
              "scd_type": "SCD_TYPE_2"
            }
          }
        }
      ]
 },
 "channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)
# Table-level spec with SCD type 2
pipeline_spec = """
{
 "name": "<YOUR_PIPELINE_NAME>",
 "ingestion_definition": {
     "connection_name": "<YOUR_CONNECTION_NAME>",
     "objects": [
        {
          "table": {
            "source_schema": "objects",
            "source_table": "<YOUR_F_AND_O_TABLE_NAME>",
            "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
            "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
        "table_configuration": {
              "scd_type": "SCD_TYPE_2"
            }
          }
        }
      ]
 },
 "channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)

例: 列レベルの選択と選択解除

既定では、選択したテーブルのすべての列が API によって取り込まれます。 ただし、特定の列を含めるか除外するかを選択できます。 例えば次が挙げられます。

# Table spec with included and excluded columns.
pipeline_spec = """
{
 "name": "<YOUR_PIPELINE_NAME>",
 "ingestion_definition": {
     "connection_name": "<YOUR_CONNECTON_NAME>",
     "objects": [
        {
          "table": {
            "source_schema": "objects",
            "source_table": "<YOUR_F_AND_O_TABLE_NAME>",
            "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
            "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
        "table_configuration": {
          "include_columns": ["<COLUMN_A>", "<COLUMN_B>", "<COLUMN_C>"]
            }
          }
        }
      ]
 },
 "channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)