次の方法で共有


データ フロー グラフで WebAssembly (WASM) を使用する

Von Bedeutung

このページには、プレビュー段階にある Kubernetes デプロイ マニフェストを使用して Azure IoT Operations コンポーネントを管理する手順が含まれています。 この機能はいくつかの制限を設けて提供されており、運用環境のワークロードには使用しないでください。

ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件」を参照してください。

Azure IoT Operations のデータ フロー グラフは、エッジでのカスタム データ処理用 WebAssembly (WASM) モジュールをサポートします。 データ フロー パイプラインの一部として、カスタム ビジネス ロジックとデータ変換をデプロイできます。

ヒント

AI をインバンドで実行しますか? WASM 演算子内で小さな ONNX モデルをパッケージ化して実行するには、 WebAssembly データ フロー グラフで ONNX 推論を実行する方法に関するページを参照してください。

Von Bedeutung

データ フロー グラフは、現在、MQTT、Kafka、OpenTelemetry エンドポイントのみをサポートしています。 Data Lake、Microsoft Fabric OneLake、Azure Data Explorer、Local Storage などの他のエンドポイントの種類はサポートされていません。 詳細については、「既知の問題の」を参照してください。

[前提条件]

  • Arc 対応 Kubernetes クラスターに Azure IoT Operations インスタンスをデプロイします。 詳細については、「Deploy Azure IoT Operations」をご覧ください。
  • コンテナー レジストリを構成し、 WebAssembly (WASM) モジュールとグラフ定義のデプロイに関するページのガイダンスに従って、サンプル グラフ定義と WASM モジュールを追加します。

概要

Azure IoT Operations データ フロー グラフの WebAssembly (WASM) モジュールを使用すると、エッジでハイ パフォーマンスかつ安全にデータを処理できます。 WASM はサンドボックス環境で実行され、Rust と Python をサポートします。

WASM データ フロー グラフのしくみ

WASM データ フローの実装は次のワークフローに従います。

  1. WASM モジュールを開発する: サポートされている言語でカスタム処理ロジックを記述し、WebAssembly コンポーネント モデル形式にコンパイルします。 その他の詳細については、次をご覧ください。
  2. グラフ定義を開発する: YAML 構成ファイルを使用して、モジュール間でのデータの移行方法を定義します。 詳細については、「 WebAssembly グラフ定義の構成」を参照してください。
  3. レジストリに成果物を格納する: OCI 互換ツール (ORAS など) を使用して、コンパイル済みの WASM モジュールとグラフ定義をコンテナー レジストリにプッシュします。 詳細については、「 WebAssembly (WASM) モジュールとグラフ定義のデプロイ」を参照してください。
  4. レジストリ エンドポイントを構成する: Azure IoT Operations からコンテナー レジストリにアクセスできるように、認証と接続の詳細を設定します。 詳細については、 レジストリ エンドポイントの構成に関するページを参照してください。
  5. データ フロー グラフの作成: 操作エクスペリエンス Web UI または Bicep ファイルを使用して、グラフ定義を使用するデータ フローを定義します。
  6. デプロイと実行: Azure IoT Operations は、コンテナー レジストリからグラフ定義と WASM モジュールをプルして実行します。

次の例は、一般的なシナリオ用に WASM データ フロー グラフを構成する方法を示しています。 例では、ハードコーディングされた値と簡略化された構成を使用しているため、すぐに始めることができます。

例 1: 1 つの WASM モジュールを使用する基本的なデプロイ

この例では、WASM モジュールを使用して温度データを華氏から摂氏に変換します。 温度モジュールのソース コードは GitHub で入手できます。 WebAssembly (WASM) モジュールとグラフ定義のデプロイの手順の例に従った場合、graph-simple:1.0.0グラフ定義とプリコンパイル済みtemperature:1.0.0 モジュールは既にコンテナー レジストリにあります。

動作方法

グラフ定義で、単純な 3 ステージのパイプラインを作成します。

  1. ソース: MQTT から温度データを受け取ります
  2. マップ: 温度 WASM モジュールを使用してデータを処理します
  3. シンク: 変換されたデータを MQTT に送り返します

単純なグラフ定義のしくみとその構造の詳細については、「 例 1: 単純なグラフ定義」を参照してください。

入力形式:

{"temperature": {"value": 100.0, "unit": "F"}}

出力形式:

{"temperature": {"value": 37.8, "unit": "C"}}

次の構成では、この温度変換パイプラインを使用するデータ フロー グラフを作成します。 データ フロー グラフは、 graph-simple:1.0.0 YAML グラフ定義を参照し、コンテナー レジストリから温度モジュールをプルします。

データ フロー グラフを構成する

この構成では、温度変換ワークフローを実装する 3 つのノードを定義します。入力温度データをサブスクライブするソース ノード、WASM モジュールを実行するグラフ処理ノード、変換された結果を発行する宛先ノードです。

データ フロー グラフ リソースは、グラフ定義成果物をラップし、その抽象ソース/シンク操作を具象エンドポイントに接続します。

  • グラフ定義の source 操作は、データ フローのソース ノード (MQTT トピック) に接続します
  • グラフ定義の sink 操作は、データ フローの宛先ノード (MQTT トピック) に接続します
  • グラフ定義の処理操作はグラフ処理ノード内で実行されます

この分離により、処理ロジックを変更せずに、この分離により、処理ロジックを変更せずに、同じグラフ定義を異なる環境で異なるエンドポイントと共にデプロイできます。

  1. 操作エクスペリエンスでデータ フロー グラフを作成するには、[データ フロー] タブに移動します。

  2. [+ 作成] の横にあるドロップダウン メニューを選択し、[データ フロー グラフの作成] を選択します

    データ フロー グラフを作成する方法を示す操作エクスペリエンス インターフェイスのスクリーンショット。

  3. プレースホルダー名 new-data-flow を選択して、データ フローのプロパティを設定します。 データ フロー グラフの名前を入力し、使用するデータ フロー プロファイルを選択します。

  4. データ フロー図で、[ ソース ] を選択してソース ノードを構成します。 [ ソースの詳細] で、[ 資産 ] または [ データ フロー エンドポイント] を選択します。

    データ フロー グラフのソースを選択する方法を示す操作エクスペリエンス インターフェイスのスクリーンショット。

    1. [資産] を選択した場合は、データをプルする資産を選択し、[適用] をクリックします。

    2. [データ フロー エンドポイント] を選択した場合は、次の詳細を入力し、[適用] をクリックします。

      Setting Description
      データ フロー エンドポイント 既定の MQTT メッセージ ブローカー エンドポイントを使用するには、"既定値" を選択します。
      トピック 着信メッセージをサブスクライブするためのトピック フィルター。 [トピック]>[行の追加] を使用して、複数のトピックを追加します。
      メッセージ スキーマ 受信メッセージの逆シリアル化に使用するスキーマ。
  5. データ フロー ダイアグラムで、[ グラフ変換の追加 ] (省略可能) を選択して、グラフ処理ノードを追加します。 [グラフの選択] ウィンドウで、グラフの単純な 1 を選択し、[適用] をクリックします。

    単純なデータ フロー グラフを作成する方法を示す操作エクスペリエンス インターフェイスのスクリーンショット。

  6. ダイアグラム内のグラフ ノードを選択することで、グラフ演算子の設定を構成できます。 たとえば、モジュール温度/マップ演算子を選択し、key2example-value-2入力できます。 [ 適用 ] をクリックして変更を保存します。

    単純なデータ フロー グラフを構成する方法を示す操作エクスペリエンス インターフェイスのスクリーンショット。

  7. データ フローダイアグラムで、[ 宛先 ] を選択して宛先ノードを構成します。

  8. データ フロー グラフ名の下にある [保存] を選択して、データ フロー グラフを保存します。

データ フローをテストする

データ フローをテストするには、クラスター内から MQTT メッセージを送信します。 まず、「MQTT クライアントを使用して MQTT ブローカーへの接続をテストする」の手順に従って、MQTT クライアント ポッドをデプロイします。 ブローカーへの接続に必要な認証トークンと証明書は、この MQTT クライアントから提供されます。 MQTT クライアントをデプロイするには、次のコマンドを実行します。

kubectl apply -f https://raw.githubusercontent.com/Azure-Samples/explore-iot-operations/main/samples/quickstarts/mqtt-client.yaml

温度メッセージを送信する

最初のターミナル セッションで、華氏で温度データを送信するスクリプトを作成して実行します。

# Connect to the MQTT client pod
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
# Create and run temperature.sh from within the MQTT client pod
while true; do
  # Generate a random temperature value between 0 and 6000 Fahrenheit
  random_value=$(shuf -i 0-6000 -n 1)
  payload="{\"temperature\":{\"value\":$random_value,\"unit\":\"F\"}}"

  echo "Publishing temperature: $payload"

  # Publish to the input topic
  mosquitto_pub -h aio-broker -p 18883 \
    -m "$payload" \
    -t "sensor/temperature/raw" \
    -d \
    --cafile /var/run/certs/ca.crt \
    -D PUBLISH user-property __ts $(date +%s)000:0:df \
    -D CONNECT authentication-method 'K8S-SAT' \
    -D CONNECT authentication-data $(cat /var/run/secrets/tokens/broker-sat)

  sleep 1
done'

MQTT ユーザー プロパティ __ts は、ハイブリッド論理クロック (HLC) を使用してメッセージがタイムリーに処理されるように、メッセージにタイムスタンプを追加するために使用されます。 タイムスタンプがあると、データ フローがメッセージを受け入れるかドロップするかを決定するのに役立ちます。 このプロパティの形式は <timestamp>:<counter>:<nodeid> です。 これにより、データ フロー処理の精度が向上しますが、必須ではありません。

このスクリプトを実行すると、ランダムな温度データが 1 秒ごとに sensor/temperature/raw トピックに発行されます。 次のようになります。

Publishing temperature: {"temperature":{"value":1234,"unit":"F"}}
Publishing temperature: {"temperature":{"value":5678,"unit":"F"}}

温度データの発行を継続するため、スクリプトは実行したままにしておきます。

プロセス済みのメッセージをサブスクライブする

2 つ目のターミナル セッション (MQTT クライアント ポッドにも接続されています) で、出力トピックをサブスクライブして、変換された温度値を確認します。

# Connect to the MQTT client pod
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
mosquitto_sub -h aio-broker -p 18883 -t "sensor/temperature/processed" --cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)"'

WASM モジュールによって華氏から摂氏に変換された温度データが表示されます。

{"temperature":{"value":1292.2222222222222,"count":0,"max":0.0,"min":0.0,"average":0.0,"last":0.0,"unit":"C","overtemp":false}}
{"temperature":{"value":203.33333333333334,"count":0,"max":0.0,"min":0.0,"average":0.0,"last":0.0,"unit":"C","overtemp":false}}

例 2: 複合グラフをデプロイする

この例では、温度、湿度、画像データなど複数のデータ型を処理する高度なデータ処理ワークフローを示します。 複雑なグラフ定義では、複数の WASM モジュールを調整して、高度な分析と物体検出を実行します。

動作方法

この複雑なグラフでは、3 つのデータ ストリームを処理し、それらを組み合わせてエンリッチされたセンサー分析を生成します。

  • 温度処理: 華氏を摂氏に変換し、無効な読み取り値をフィルター処理し、統計値を計算します
  • 湿度処理: 一定間隔で湿度の測定値を蓄積します
  • 画像処理: カメラのスナップショットで物体検出を実行し、結果を書式設定します

複雑なグラフ定義のしくみ、その構造、および複数の処理ステージを通じたデータ フローの詳細については、「 例 2: 複雑なグラフ定義」を参照してください。

このグラフでは、 Rust 演算子のコレクションから特殊化されたモジュールを使用します。

複雑なデータ フロー グラフを構成する

この構成では、 graph-complex:1.0.0 YAML グラフ定義を使用して、マルチセンサー処理ワークフローを実装します。 データ フロー グラフのデプロイが 例 1 と似ている点に注目してください。処理ロジックが異なる場合でも、どちらも同じ 3 ノード パターン (ソース、グラフ プロセッサ、宛先) を使用します。

この類似性は、データ フロー グラフ リソースがグラフ定義を読み込んで実行するホスト環境として機能しているためです。 実際の処理ロジックは、WASM モジュール間の操作と接続の YAML 仕様を含むグラフ定義 (graph-simple:1.0.0 または graph-complex:1.0.0) に存在します。 データ フロー グラフ リソースは、グラフ定義をプルし、モジュールをインスタンス化し、定義されたワークフローを介してデータをルーティングするためのランタイム インフラストラクチャを提供します。

  1. 操作エクスペリエンスでデータ フロー グラフを作成するには、[データ フロー] タブに移動します。

  2. [+ 作成] の横にあるドロップダウン メニューを選択し、[データ フロー グラフの作成] を選択します

    データ フローの複雑なグラフを作成する方法を示す操作エクスペリエンス インターフェイスのスクリーンショット。

  3. プレースホルダー名 new-data-flow を選択して、データ フローのプロパティを設定します。 データ フロー グラフの名前を入力し、使用するデータ フロー プロファイルを選択します。

  4. データ フロー図で、[ ソース ] を選択してソース ノードを構成します。 [ ソースの詳細] で、[ 資産 ] または [ データ フロー エンドポイント] を選択します。

    データ フロー グラフのソースを選択する方法を示す操作エクスペリエンス インターフェイスのスクリーンショット。

    1. [資産] を選択した場合は、データをプルする資産を選択し、[適用] をクリックします。

    2. [データ フロー エンドポイント] を選択した場合は、次の詳細を入力し、[適用] をクリックします。

      Setting Description
      データ フロー エンドポイント 既定の MQTT メッセージ ブローカー エンドポイントを使用するには、"既定値" を選択します。
      トピック 着信メッセージをサブスクライブするためのトピック フィルター。 [トピック]>[行の追加] を使用して、複数のトピックを追加します。
      メッセージ スキーマ 受信メッセージの逆シリアル化に使用するスキーマ。
  5. データ フロー ダイアグラムで、[ グラフ変換の追加 ] (省略可能) を選択して、グラフ処理ノードを追加します。 [グラフの選択] ウィンドウで、graph-complex:1 を選択し、[適用] をクリックします。

    複雑なデータ フロー グラフを作成する方法を示す操作エクスペリエンス インターフェイスのスクリーンショット。

  6. ダイアグラム内のグラフ ノードを選択することで、グラフ演算子の設定を構成できます。

    複雑なデータ フロー グラフを構成する方法を示す操作エクスペリエンス インターフェイスのスクリーンショット。

    Operator Description
    module-snapshot/branch 画像に対してオブジェクト検出を実行するように snapshot モジュールを構成します。 snapshot_topic構成キーを設定して、画像データの入力トピックを指定できます。
    モジュール-温度/マップ key2温度値を別のスケールに変換します。
  7. [ 適用 ] をクリックして変更を保存します。

  8. データ フローダイアグラムで、[ 宛先 ] を選択して宛先ノードを構成します。

  9. データ フロー グラフ名の下にある [保存] を選択して、データ フロー グラフを保存します。

複雑なデータ フローをテストする

出力を表示する前に、ソース データを設定します。

RAW 画像ファイルを mqtt-client ポッドにアップロードする

画像ファイルは、snapshot モジュールが画像内のオブジェクトを検出するためのものです。 これらは GitHub の images フォルダーにあります。

まず、リポジトリをクローンして画像ファイルにアクセスします。

git clone https://github.com/Azure-Samples/explore-iot-operations.git
cd explore-iot-operations

RAW 画像ファイルを ./samples/wasm/images フォルダーから mqtt-client ポッドにアップロードするには、次のコマンドを使用します。

kubectl cp ./samples/wasm/images azure-iot-operations/mqtt-client:/tmp

ファイルがアップロードされていることを確認します。

kubectl exec -it mqtt-client -n azure-iot-operations -- ls /tmp/images

/tmp/images フォルダー内のファイルの一覧が表示されます。

beaker.raw          laptop.raw          sunny2.raw
binoculars.raw      lawnmower.raw       sunny4.raw
broom.raw           milkcan.raw         thimble.raw
camera.raw          photocopier.raw     tripod.raw
computer_mouse.raw  radiator.raw        typewriter.raw
daisy3.raw          screwdriver.raw     vacuum_cleaner.raw
digital_clock.raw   sewing_machine.raw
hammer.raw          sliding_door.raw

シミュレーションされた温度、湿度データを発行し、画像を送信する

温度、湿度データの発行、画像の送信を行うコマンドを 1 つのスクリプトにまとめることができます。 次のコマンドを使用します。

# Connect to the MQTT client pod and run the script
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
while true; do 
  # Generate a random temperature value between 0 and 6000
  temp_value=$(shuf -i 0-6000 -n 1)
  temp_payload="{\"temperature\":{\"value\":$temp_value,\"unit\":\"F\"}}"
  echo "Publishing temperature: $temp_payload"
  mosquitto_pub -h aio-broker -p 18883 \
    -m "$temp_payload" \
    -t "sensor/temperature/raw" \
    --cafile /var/run/certs/ca.crt \
    -D CONNECT authentication-method "K8S-SAT" \
    -D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
    -D PUBLISH user-property __ts $(date +%s)000:0:df

  # Generate a random humidity value between 30 and 90
  humidity_value=$(shuf -i 30-90 -n 1)
  humidity_payload="{\"humidity\":{\"value\":$humidity_value}}"
  echo "Publishing humidity: $humidity_payload"
  mosquitto_pub -h aio-broker -p 18883 \
    -m "$humidity_payload" \
    -t "sensor/humidity/raw" \
    --cafile /var/run/certs/ca.crt \
    -D CONNECT authentication-method "K8S-SAT" \
    -D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
    -D PUBLISH user-property __ts $(date +%s)000:0:df

  # Send an image every 2 seconds
  if [ $(( $(date +%s) % 2 )) -eq 0 ]; then
    file=$(ls /tmp/images/*.raw | shuf -n 1)
    echo "Sending file: $file"
    mosquitto_pub -h aio-broker -p 18883 \
      -f $file \
      -t "sensor/images/raw" \
      --cafile /var/run/certs/ca.crt \
      -D CONNECT authentication-method "K8S-SAT" \
      -D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
      -D PUBLISH user-property __ts $(date +%s)000:0:df
  fi

  # Wait for 1 second before the next iteration
  sleep 1
done'

出力をチェックする

新しいターミナルで、出力トピックをサブスクライブします。

kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
mosquitto_sub -h aio-broker -p 18883 -t "analytics/sensor/processed" --cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)"'

出力は次の例のようになります。

{"temperature":[{"count":9,"max":2984.4444444444443,"min":248.33333333333337,"average":1849.6296296296296,"last":2612.222222222222,"unit":"C","overtemp":true}],"humidity":[{"count":10,"max":76.0,"min":30.0,"average":49.7,"last":38.0}],"object":[{"result":"milk can; broom; screwdriver; binoculars, field glasses, opera glasses; toy terrier"}]}
{"temperature":[{"count":10,"max":2490.5555555555557,"min":430.55555555555554,"average":1442.6666666666667,"last":1270.5555555555557,"unit":"C","overtemp":true}],"humidity":[{"count":9,"max":87.0,"min":34.0,"average":57.666666666666664,"last":42.0}],"object":[{"result":"broom; Saint Bernard, St Bernard; radiator"}]}

この出力には、温度と湿度のデータと、画像内で検出されたオブジェクトが含まれます。

カスタム データ フロー グラフの構成

このセクションでは、WASM モジュールを使用してデータ フロー グラフを構成する方法について詳しく説明します。 すべての構成オプション、データ フロー エンドポイント、詳細設定について説明します。

データ フロー グラフの概要

データ フロー グラフでは、処理のために WebAssembly モジュールをデータがどのように流れるかを定義します。 各グラフは次の要素で構成されます。

  • グラフの有効/無効を制御するモード
  • スケーリングとリソース設定を定義するデータ フロー プロファイルにリンクするプロファイル参照
  • 必要に応じてグラフ状態の永続ストレージを有効にするディスク永続化
  • ソース、処理、宛先コンポーネントを定義するノード
  • ノード間のデータ フローを指定するノード接続

モードの構成

mode プロパティにより、データ フロー グラフがアクティブにデータを処理しているかどうかを決定します。 モードは Enabled または Disabled (大文字と小文字の区別はされません) に設定できます。 無効にすると、グラフのデータ処理は停止しますが、構成は保持されます。

データ フロー グラフを作成または編集するときに、[ データ フローのプロパティ ] ウィンドウで [ データ フローを有効にする][はい ] に設定して、モードを Enabledに設定できます。 オフのままにすると、モードは Disabledに設定されます。

モード構成を有効または無効にする方法を示す操作エクスペリエンス インターフェイスのスクリーンショット。

プロファイル参照

プロファイル参照により、データ フロー グラフをデータ フロー プロファイルに接続し、スケーリング設定、インスタンス数、リソース制限を定義します。 プロファイル参照を指定しない場合は、代わりに Kubernetes 所有者参照を使用する必要があります。 ほとんどのシナリオでは、Azure IoT Operations が提供する既定のプロファイルを使用します。

データ フロー グラフを作成または編集するときに、[データ フローのプロパティ ] ウィンドウで、データ フロー プロファイルを選択します。 既定では、既定のデータ フロー プロファイルが選択されています。 データ フロー プロファイルの詳細については、「データ フロー プロファイルの構成」を参照してください。

Von Bedeutung

データ フロー グラフの作成時にのみ、データ フロー プロファイルを選択できます。 データ フロー グラフの作成後にデータ フロー プロファイルを変更することはできません。 既存のデータ フロー グラフのデータ フロー プロファイルを変更する場合は、元のデータ フロー グラフを削除し、新しいデータ フロー プロファイルを使用して新しいデータ フロー グラフを作成します。

ディスク永続化を要求する

ディスク永続化を要求すると、データ フロー グラフは再起動後も状態を維持できます。 この機能を有効にすると、接続されたブローカーが再起動した場合にグラフは処理状態を回復できます。 この機能は、中間データの損失が問題となるステートフル処理のシナリオで役立ちます。 ディスク永続化の要求を有効にすると、サブスクライバー キュー内のメッセージなどの MQTT データはブローカーによってディスクに永続化されます。 このアプローチにより、停電時やブローカーの再起動時にデータ フローのデータ ソースでデータ損失が発生しなくなります。 ブローカーは最適なパフォーマンスを維持します。なぜなら、永続化はデータ フロー単位で構成され、永続化を必要とするデータ フローのみがこの機能を使用するからです。

データ フロー グラフは、サブスクリプション中に MQTTv5 ユーザー プロパティを使用してこの永続化要求を行います。 この機能は次の場合にのみ機能します。

  • データ フローは MQTT ブローカーをソース (MQTT エンドポイントを持つソース ノード) として使用している
  • MQTT ブローカーは、サブスクライバー キューなどのデータ型に対して動的永続化モードが Enabled に設定され、永続化が有効になっている

この構成により、データ フロー グラフなどの MQTT クライアントは、MQTTv5 ユーザー プロパティを使用してサブスクリプションのディスク永続化を要求できます。 MQTT ブローカーの永続化の構成の詳細については、MQTT ブローカーの永続化の構成に関する記事を参照してください。

設定には Enabled または Disabled を使用できます。既定値は Disabled です。

データ フロー グラフを作成または編集する場合、[ データ フローのプロパティ ] ウィンドウで 、[ データの永続化の要求 ] を [ はい ] にチェックして、要求ディスクの永続化を Enabledに設定できます。 未チェックのままにすると、設定は Disabled

ノード構成

ノードはデータ フロー グラフの構成要素です。 各ノードはグラフ内で一意の名前を持ち、特定の機能を実行します。 ノードには次の 3 種類があります。

ソース ノード

ソース ノードは、データがグラフに入る場所を定義します。 MQTT ブローカーまたは Kafka トピックからデータを受信するデータ フロー エンドポイントに接続します。 各ソース ノードでは以下を指定する必要があります。

  • 構成済みのデータ フロー エンドポイントを指すエンドポイント参照
  • サブスクライブする MQTT トピックまたは Kafka トピックの一覧形式のデータ ソース
  • スキーマ推論用の Azure デバイス レジストリ資産にリンクする資産参照 (省略可能)

データ ソース配列を使用すると、エンドポイント構成を変更せずに複数のトピックをサブスクライブできます。 この柔軟性により、さまざまなデータ フロー間でエンドポイントを再利用できます。

現在、データ フロー グラフのデータ ソースとしてサポートされているのは MQTT エンドポイントと Kafka エンドポイントのみです。 詳細については、「 データ フロー エンドポイントの構成」を参照してください。

データ フロー図で、[ ソース ] を選択してソース ノードを構成します。 [ ソースの詳細] で [ データ フロー エンドポイント] を選択し、[ トピック] フィールドを使用して、受信メッセージをサブスクライブする MQTT トピック フィルターを指定します。 [行の追加] を選択し、新しいトピックを入力することで、複数の MQTT トピックを追加できます。

グラフ処理ノード

グラフ処理ノードには、データを変換する WebAssembly モジュールが含まれています。 これらのノードは、コンテナー レジストリから WASM 成果物をプルし、指定された構成パラメーターを使用して実行します。 各グラフ ノードには以下が必要です。

  • 成果物をプルするレジストリ エンドポイントを指すレジストリ エンドポイント参照
  • プルするモジュール名とバージョンを定義する成果物の仕様
  • WASM モジュールに渡されるキーと値のペア形式の構成パラメーター

構成配列を使用すると、WASM 成果物をリビルドせずにモジュールの動作をカスタマイズできます。 一般的な構成オプションとして、処理パラメーター、しきい値、変換設定、機能フラグなどがあります。

データ フロー ダイアグラムで、[ グラフ変換の追加 ] (省略可能) を選択して、グラフ処理ノードを追加します。 [グラフの選択] ウィンドウで、目的のグラフ成果物 (単純グラフまたは複雑グラフ) を選択し、[適用] をクリックします。 ダイアグラム内のグラフ ノードを選択することで、グラフ演算子の設定を構成できます。

構成のキーと値のペアは、実行時に WASM モジュールに渡されます。 モジュールからこれらの値にアクセスして動作をカスタマイズできます。 このアプローチにより、次のことが可能になります。

  • 同じ WASM モジュールを異なる構成でデプロイする
  • モジュールをリビルドせずに処理パラメーターを調整する
  • デプロイ要件に基づいて機能を有効または無効にする
  • しきい値やエンドポイントなどの環境固有の値を設定する

宛先ノード

宛先ノードでは、処理されたデータが送信される場所を定義します。 これらは、MQTT ブローカー、クラウド ストレージ、またはその他のシステムにデータを送信するデータ フロー エンドポイントに接続します。 各宛先ノードでは以下を指定します。

  • 構成済みのデータ フロー エンドポイントを指すエンドポイント参照
  • 出力データの特定のトピック、パス、または場所としてのデータの出力先
  • シリアル化形式とスキーマ検証を定義する出力スキーマ設定 (省略可能)

Azure Data Lake や Fabric OneLake などのストレージの宛先では、出力スキーマ設定を指定して、データのシリアル化と検証の方法を制御できます。

現時点では、MQTT、Kafka、OpenTelemetry エンドポイントのみがデータ フロー グラフのデータ変換先としてサポートされています。 詳細については、「 データ フロー エンドポイントの構成」を参照してください。

  1. データ フロー図で、[ 宛先 ] ノードを選択します。
  2. [データ フロー エンドポイントの詳細] ドロップダウンから目的の データ フロー エンドポイントを 選択します。
  3. [続行] を選択して、宛先を構成します。
  4. データの送信先となるトピックやテーブルなど、宛先に 必要な設定 を入力します。 データ変換先フィールドは、エンドポイントの種類に基づいて自動的に解釈されます。 たとえば、データ フロー エンドポイントが MQTT エンドポイントの場合、宛先の詳細ページでトピックを入力するように求められます。

ノード接続

ノード接続では、ノード間のデータ フロー パスを定義します。 各接続では、ソース ノードと宛先ノードが指定され、処理パイプラインが作成されます。 必要に応じて、初期化時にスキーマをモジュールに提供するスキーマを接続に含めることができます。 これにより、次の例のようなスキーマ検証が可能になります。

グラフ処理ノードを選択すると、操作エクスペリエンスによってノード接続が自動的に作成されます。 グラフの作成後に接続を変更することはできません。

データ フロー エンドポイント

データ フロー グラフは、データ フロー エンドポイントを介して外部システムに接続します。 エンドポイントの種類によって、ソース、宛先、またはその両方として使用できるかがきまります。

MQTT エンドポイント

MQTT エンドポイントは、ソースと宛先の両方として機能できます。 次のような MQTT ブローカーに接続します。

  • Azure IoT Operations ローカル MQTT ブローカー (すべてのデータ フローで必須)
  • Azure Event Grid MQTT
  • カスタム MQTT ブローカー

詳細な構成情報については、MQTT データ フロー エンドポイントの構成に関する記事を参照してください。

Kafka エンドポイント

Kafka エンドポイントは、ソースと宛先の両方として機能できます。 次のような Kafka 互換システムに接続します。

  • Azure Event Hubs (Kafka 互換)
  • Apache Kafka クラスター
  • Confluent Cloud

詳細な構成情報については、Azure Event Hubs と Kafka データ フロー エンドポイントの構成に関する記事を参照してください。

ストレージ エンドポイント

ストレージ エンドポイントは宛先としてのみ機能できます。 長期的なデータ保持と分析のためにクラウド ストレージ システムに接続します。

  • Azure Data Lake Storage
  • Microsoft Fabric OneLake
  • ローカル ストレージ

ストレージ エンドポイントには、通常、データのシリアル化形式を定義するために出力スキーマ設定が必要です。

レジストリ エンドポイント

レジストリ エンドポイントは、WASM モジュールとグラフ定義をプルするコンテナー レジストリへのアクセスを提供します。 データ フローで直接使用されることはありませんが、グラフ処理ノードから参照されます。

詳細な構成情報については、レジストリ エンドポイントの構成に関する記事を参照してください。