你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

在 VS Code 中为数据流生成 WASM 模块

Azure IoT 运营中的自定义 WebAssembly (WASM) 数据处理功能可以在 Azure IoT 运营集群中实现实时遥测数据处理。 通过部署自定义 WASM 模块,可以定义和执行数据转换,作为数据流图或 HTTP/REST 连接器的一部分。

本文介绍如何使用 Azure IoT 操作数据流 VS Code 扩展在本地开发、测试和调试 WASM 模块,然后再将其部署到 Azure IoT 操作群集。 你将了解如何:

  • 为了了解基本工作流,使用示例数据在本地执行预生成图形来运行图形应用程序。
  • 通过在 Python 和 Rust 中生成具有地图和筛选器功能的新运算符来创建自定义 WASM 模块。
  • 在消息处理中使用状态存储来维护状态。
  • 在处理之前,使用架构注册表中的验证功能,通过 JSON 架构来确认消息格式。
  • 通过在本地开发中使用断点和单步调试来调试 WASM 模块。

以下平台上支持该扩展:

  • Linux
  • 适用于 Linux 的 Windows 子系统 (WSL)
  • Windows操作系统

若要详细了解 Azure IoT作中的图形和 WASM,请参阅 将数据流图与 WebAssembly (WASM) 模块配合使用

先决条件

开发环境:

Docker 映像:

docker pull mcr.microsoft.com/azureiotoperations/processor-app:1.1.4
docker tag mcr.microsoft.com/azureiotoperations/processor-app:1.1.4 host-app

docker pull mcr.microsoft.com/azureiotoperations/devx-runtime:0.1.8
docker tag mcr.microsoft.com/azureiotoperations/devx-runtime:0.1.8 devx

docker pull mcr.microsoft.com/azureiotoperations/statestore-cli:0.0.2
docker tag mcr.microsoft.com/azureiotoperations/statestore-cli:0.0.2 statestore-cli

docker pull eclipse-mosquitto

使用 VS Code 扩展在本地运行图形应用程序

此示例使用一个示例工作区,其中包含使用 VS Code 扩展在本地生成和运行图形应用程序所需的所有资源。

在 VS Code 中打开示例工作区

如果尚未克隆 “探索 IoT 操作” 存储库。

在 Visual Studio Code 中打开samples/wasm文件夹,方法是选择“文件>打开文件夹”选项,并导航到samples/wasm文件夹。

生成运算符

Ctrl+Shift+P 打开命令面板并搜索 Azure IoT 操作:生成所有数据流运算符。 选择 发布 作为生成模式。

此命令生成工作区中的所有运算符,并在文件夹中创建 .wasm 文件 operators 。 使用 .wasm 文件在本地运行图形应用程序。

在本地运行图形应用程序

Ctrl+Shift+P 打开命令面板并搜索 Azure IoT 操作:运行应用程序图。 选择 发布 作为运行模式。 此命令使用本地执行环境在工作区中包含的graph.dataflow.yaml文件本地运行图形应用程序。

它还从中读取 hostapp.env.list 以设置数据流操作员配置参数的环境变量 TK_CONFIGURATION_PARAMETERS

当系统提示输入数据时,请选择 data-and-images 工作区中的文件夹。 此文件夹包含图形应用程序的输入数据文件,包括温度和湿度数据,以及快照模块的某些图像。

等待,直到看到 VS Code 通知,日志已准备就绪: Log files for the run can be found at ...\wasm\data-and-images\output\logs

输出位于 output 文件夹下的 data-and-images 文件夹中。 可以在工作区中打开 output 文件夹以查看输出文件。 .txt文件名日期和时间的文件包含已处理的数据,如以下示例所示:

{"tst":"2025-09-19T04:19:13.530381+0000","topic":"sensors","qos":0,"retain":0,"payloadlen":312,"properties":{"payload-format-indicator":1,"message-expiry-interval":10,"correlation-data":"...","user-properties":{"__ts":"001758255553528:00000:...","__protVer":"1.0","__srcId":"mqtt-source"},"content-type":"application/json"},"payload":{"temperature":[{"count":2,"max":653.888888888889,"min":204.44444444444449,"average":429.16666666666669,"last":204.44444444444449,"unit":"C","overtemp":true}],"humidity":[{"count":3,"max":85,"min":45,"average":69.666666666666671,"last":79}],"object":[{"result":"notebook, notebook computer; sliding door"}]}}

输出显示图形应用程序处理了输入数据并生成了输出。 输出包括温度和湿度数据,以及图像中检测到的对象。

使用自定义 WASM 模块创建新图形

此方案演示如何使用自定义 WASM 模块创建新的图形应用程序。 图形应用程序由两个运算符组成:将 map 温度值从华氏度转换为摄氏度的运算符,以及筛选 filter 出温度值超过 500°C 的消息的运算符。

无需使用现有示例工作区,而是从头开始创建新的工作区。 此过程可让你了解如何创建新的图形应用程序和在 Python 和 Rust 中对运算符进行编程。

运算符命名约束

目前,请勿在运算符名称中使用连字符(-)或下划线(_)。 VS Code 扩展强制实施此要求,但如果手动创建或重命名模块,则会导致问题。 对模块(例如filtermapstateenrichschemafilter)使用简单的字母数字名称。

在 Python 中创建新的图形应用程序项目

Ctrl+Shift+P 键,在 VS Code 命令面板中并搜索 Azure IoT 操作:创建新的数据流应用程序:

  1. 对于文件夹,请选择用于创建项目的文件夹。 可以为此项目创建新文件夹。
  2. 输入 my-graph 为名称。
  3. 选择 Python 作为语言。
  4. 选择 “映射 ”作为类型。
  5. 输入 map 为名称。

现在有了一个新的 VS Code 工作区,其中包含基本项目结构和初学者文件。 起始文件包括 graph.dataflow.yaml 文件和映射运算符模板源代码。

重要

若要在已部署的 Azure IoT 操作实例中使用 Python 模块,必须将实例部署为代理内存配置文件设置为“中”“高”。 如果将内存配置文件设置为 “低 ”或“ Tiny”,则实例无法拉取 Python 模块。

为地图运算符模块添加 Python 代码

operators/map/map.py打开文件,将内容替换为以下代码,将传入的温度值从华氏度转换为摄氏度:

import json
from map_impl import exports
from map_impl import imports
from map_impl.imports import types

class Map(exports.Map):
    def init(self, configuration) -> bool:
        imports.logger.log(imports.logger.Level.INFO, "module4/map", "Init invoked")
        return True

    def process(self, message: types.DataModel) -> types.DataModel:
        # TODO: implement custom logic for map operator
        imports.logger.log(imports.logger.Level.INFO, "module4/map", "processing from python")

        # Ensure the input is of the expected type
        if not isinstance(message, types.DataModel_Message):
            raise ValueError("Unexpected input type: Expected DataModel_Message")

        # Extract and decode the payload
        payload_variant = message.value.payload
        if isinstance(payload_variant, types.BufferOrBytes_Buffer):
            # It's a Buffer handle - read from host
            imports.logger.log(imports.logger.Level.INFO, "module4/map", "Reading payload from Buffer")
            payload = payload_variant.value.read()
        elif isinstance(payload_variant, types.BufferOrBytes_Bytes):
            # It's already bytes
            imports.logger.log(imports.logger.Level.INFO, "module4/map", "Reading payload from Bytes")
            payload = payload_variant.value
        else:
            raise ValueError("Unexpected payload type")

        decoded = payload.decode("utf-8")

        # Parse the JSON data
        json_data = json.loads(decoded)

        # Check and update the temperature value
        if "temperature" in json_data and "value" in json_data["temperature"]:
            temp_f = json_data["temperature"]["value"]
            if isinstance(temp_f, int):
                # Convert Fahrenheit to Celsius
                temp_c = round((temp_f - 32) * 5.0 / 9.0)

                # Update the JSON data
                json_data["temperature"]["value"] = temp_c
                json_data["temperature"]["unit"] = "C"

                # Serialize the updated JSON back to bytes
                updated_payload = json.dumps(json_data).encode("utf-8")

                # Update the message payload
                message.value.payload = types.BufferOrBytes_Bytes(value=updated_payload)

        return message

确保 Docker 正在运行。 然后,按 Ctrl+Shift+P 打开命令面板并搜索 Azure IoT 操作:生成所有数据流算子。 创建 发布 模块。

构建过程将map.wasm运算符的map文件放在operators/map/bin/release文件夹中。

为筛选器运算符模块添加 Rust 代码

通过按 Ctrl+Shift+P 打开命令面板,搜索 Azure IoT 操作:创建数据流运算符,创建新的运算符:

  1. 选择 Rust 作为语言。
  2. 选择 “筛选器 ”作为运算符类型。
  3. 输入 filter 为名称。

operators/filter/src/lib.rs打开文件,将内容替换为以下代码,以筛选出温度高于 500°C 的值:

mod filter_operator {
    use wasm_graph_sdk::macros::filter_operator;
    use serde_json::Value;

    fn filter_init(_configuration: ModuleConfiguration) -> bool {
        // Add code here to process the module init properties and module schemas from the configuration

        true
    }

    #[filter_operator(init = "filter_init")]
    fn filter(input: DataModel) -> Result<bool, Error> {

        // Extract payload from input to process
        let payload = match input {
            DataModel::Message(Message {
                payload: BufferOrBytes::Buffer(buffer),
                ..
            }) => buffer.read(),
            DataModel::Message(Message {
                payload: BufferOrBytes::Bytes(bytes),
                ..
            }) => bytes,
            _ => return Err(Error { message: "Unexpected input type".to_string() }),
        };

        // ... perform filtering logic here and return boolean
        if let Ok(payload_str) = std::str::from_utf8(&payload) {
            if let Ok(json) = serde_json::from_str::<Value>(payload_str) {
                if let Some(temp_c) = json["temperature"]["value"].as_i64() {
                    // Return true if temperature is above 500°C
                    return Ok(temp_c > 500);
                }
            }
        }

        Ok(false)
   }
}

operators/filter/Cargo.toml打开该文件并添加以下依赖项:

[dependencies]
# ...
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

确保 Docker 正在运行。 然后,按 Ctrl+Shift+P 打开命令面板并搜索 Azure IoT 操作:生成所有数据流算子。 创建 发布 模块。

构建过程将filter.wasm运算符的filter文件放在operators/filter/bin/release文件夹中。

使用示例数据在本地运行图形应用程序

打开 graph.dataflow.yaml 文件并将内容替换为以下代码:

metadata:
    $schema: "https://www.schemastore.org/aio-wasm-graph-config-1.0.0.json"
    name: "Temperature Monitoring"
    description: "A graph that converts temperature from Fahrenheit to Celsius, if temperature is above 500°C, then sends the processed data to the sink."
    version: "1.0.0"
    vendor: "Microsoft"
moduleRequirements:
    apiVersion: "1.1.0"
    runtimeVersion: "1.1.0"
operations:
  - operationType: source
    name: source
  - operationType: map
    name: map
    module: map
  - operationType: filter
    name: filter
    module: filter
  - operationType: sink
    name: sink
connections:
  - from:
      name: source
    to:
      name: map
  - from:
      name: map
    to:
      name: filter
  - from:
      name: filter
    to:
      name: sink

data 包含示例数据的文件夹从克隆的示例存储库 explore-iot-operations\samples\wasm\data 复制到当前工作区。 该 data 文件夹包含三个 JSON 文件,其中包含示例输入温度数据。

Ctrl+Shift+P 打开命令面板并搜索 Azure IoT 操作:运行 Application Graph

  1. 选择 graph.dataflow.yaml 图形文件。
  2. 选择 发布 作为运行模式。
  3. 选择您复制到工作区的data文件夹。

DevX 容器启动以运行图形。 处理的结果保存在 data/output 文件夹中。

若要了解如何将自定义 WASM 模块和图形部署到 Azure IoT作实例,请参阅 部署 WASM 模块和数据流图

对 WASM 运算符的状态存储的支持

此示例演示如何将状态存储与 WASM 运算符一起使用。 状态存储器允许运算符在消息处理过程中持久保存和检索数据,并在数据流图中启用有状态操作。

打开状态存储示例工作区

如果尚未克隆 “探索 IoT 操作” 存储库。

在 Visual Studio Code 中打开samples/wasm/statestore-scenario文件夹,方法是选择“文件>打开文件夹”选项,并导航到samples/wasm/statestore-scenario文件夹。 此文件夹包含以下资源:

  • graph.dataflow.yaml - 带有启用状态存储运算符的数据流图配置。
  • statestore.json - 具有键值对的状态存储配置。
  • data/ - 用于测试的示例输入数据。
  • operators/ - otel-enrich和筛选器运算符的源代码。

配置状态存储

  1. statestore.json打开该文件以查看当前状态存储配置。

  2. 您可以修改factoryIdmachineId的值进行测试。 扩充参数引用这些键。

  3. 打开 graph.dataflow.yaml 并查看扩充配置。 此文件的重要部分显示在以下代码片段中:

    moduleConfigurations:
      - name: module-otel-enrich/map
        parameters:
          enrichKeys:
            name: enrichKeys
            description: Comma separated list of DSS keys which will be fetched and added as attributes
            default: factoryId,machineId
    operations:
      - operationType: "source"
        name: "source"
      - operationType: "map"
        name: "module-otel-enrich/map"
        module: "otel-enrich"
      - operationType: "sink"
        name: "sink"
    

    参数 enrichKeys 的默认值 (factoryId,machineId) 确定从状态存储中提取的键并将其添加为属性。

  4. 确保enrichKeys中列出的每个键都存在于statestore.json中。 如果在 statestore.json 中添加或删除密钥,请更新 default 的值,或者使用 TK_CONFIGURATION_PARAMETERS 环境变量重写该值。

更新测试数据(可选)

可以修改文件夹中的测试数据 data/ 以试验不同的输入值。 示例数据包括温度读数。

生成并运行状态存储方案

  1. Ctrl+Shift+P 打开命令面板并搜索 Azure IoT 操作:生成所有数据流运算符

  2. 选择 发布 作为生成模式。 等待生成完成。

  3. 再次按 Ctrl+Shift+P 并搜索 Azure IoT 操作:运行 Application Graph

  4. 选择 发布 作为运行模式。

  5. 在 VS Code 工作区中选择 data 文件夹作为您的输入数据。 DevX 容器启动以运行带有示例输入的图。

验证状态存储功能

图形执行完成后:

  1. data/output/ 文件夹中查看结果。

  2. 打开生成的 .txt 文件以查看已处理的数据。 user properties输出消息部分包括factoryId以及machineId从状态存储中检索到的值。

  3. 检查日志 data/output/logs/host-app.log 以验证操作员是否 otel-enrich 从状态存储中检索了值,并将其作为用户属性添加到消息中。

WASM 模块的架构注册表支持

此示例演示如何将架构注册表与 WASM 模块配合使用。 通过架构注册表,可以验证消息格式并确保数据流处理中的数据一致性。

打开示例架构注册表工作区

如果尚未克隆 “探索 IoT 操作” 存储库。

在 Visual Studio Code 中打开samples/wasm/schema-registry-scenario文件夹,方法是选择“文件>打开文件夹”选项,并导航到samples/wasm/schema-registry-scenario文件夹。 此文件夹包含以下资源:

  • graph.dataflow.yaml - 数据流图配置。
  • tk_schema_config.json - 主机应用程序本地使用的 JSON 模式,用于在消息有效负载到达下游操作员之前进行验证。 将此文件与发布到 Microsoft Azure 环境的任何架构保持同步。
  • data/ - 使用不同消息格式进行测试的示例输入数据。
  • operators/filter/ - 筛选器运算符的源代码。
  • (可选)hostapp.env.list - VS Code 扩展在运行时自动生成并添加TK_SCHEMA_CONFIG_PATH=tk_schema_config.json。 如果提供自己的架构文件,请确保变量指向它。

了解架构配置

tk_schema_config.json打开文件以查看架构定义:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": "object",
  "properties": {
    "humidity": {
      "type": "integer"
    },
    "temperature": {
      "type": "number"
    }
  }
}

此架构定义一个顶级 JSON 对象,该对象可以包含两个数值属性: humidity (整数)和 temperature (数字)。 "required": ["humidity", "temperature"]添加数组以使两者都为必需项,或者可以随着负载格式的发展而扩展properties节。

以下限制适用于本地架构文件(tk_schema_config.json)。 架构文件:

  • 使用标准 JSON 架构草稿-07 语法,即 Azure IoT 操作架构注册表支持的相同草稿。
  • 功能上,它与您在云 schema 注册表中注册的内容相同。 可以在两者之间复制和粘贴以保持一致。
  • 由主机应用通过环境变量 TK_SCHEMA_CONFIG_PATH引用。

目前,本地开发运行时中存在以下限制:

  • 仅加载一个架构配置文件。 不支持多文件包含或目录扫描。
  • $ref 本地不支持指向外部文件或 URL 的链接。 保持模式的自包含性。 可以使用内部 JSON 指针引用,例如 {"$ref":"#/components/..."}
  • 常用的草稿07关键字,例如typepropertiesrequiredenumminimummaximumallOfanyOfoneOfnotitems 都可以使用。 基础验证程序可能会忽略不太常见的或高级功能,例如 contentEncodingcontentMediaType
  • 对于快速冷启动,请将架构的大小保持在小于 1 MB。
  • 版本控制与架构演变策略不会在本地强制执行。 你负责与云注册表保持一致。

如果依赖于在本地验证失败的高级构造,请在发布后针对云注册表验证相同的架构,以确保奇偶校验。

若要了解详细信息,请参阅 Azure IoT作架构注册表概念

查看测试数据

data/ 文件夹包含三个测试文件:

  • temperature_humidity_payload_1.json - 包含温度和湿度数据,并通过验证。 但是,湿度值不是架构中指定的整数,因此数据将被筛选掉。
  • temperature_humidity_payload_2.json - 仅包含湿度数据,因此被过滤掉。
  • temperature_humidity_payload_3.json - 包含温度和湿度数据,并通过验证。

生成并运行架构注册表方案

  1. Ctrl+Shift+P 打开命令面板并搜索 Azure IoT 操作:生成所有数据流运算符

  2. 选择 发布 作为生成模式。 等待生成完成。

  3. 再次按 Ctrl+Shift+P 并搜索 Azure IoT 操作:运行 Application Graph

  4. 选择 graph.dataflow.yaml 图形文件。

  5. 选择 发布 作为运行模式。

  6. 在 VS Code 工作区中选择 data 文件夹作为您的输入数据。 DevX 容器启动以运行带有示例输入的图。

验证架构验证

处理完成后:

  1. 检查 data/output/ 文件夹以获取结果。

  2. 输出仅包含消息的 temperature_humidity_payload_3.json 已处理版本,因为它符合架构。

  3. 该文件 data/output/logs/host-app.log 包含日志条目,指示根据架构验证接受或拒绝哪些消息。

此示例演示架构注册表如何验证传入消息并筛选出不符合定义的架构的消息。

使用 VS Code 扩展调试 WASM 模块

此示例演示如何使用断点和 VS Code 中的集成调试器在本地调试 WASM 模块。

先决条件

完成 WASM 模块示例的架构注册表支持 ,以设置示例工作区。

设置调试环境

  1. 打开工作区中的operators/filter/src/lib.rs文件schema-registry-scenario

  2. 定位filter函数,并通过单击行号旁边的边缘或按F9来设置断点。

    fn filter(message: DataModel) -> Result<bool, Error> {
        let DataModel::Message(message) = input else {
            return Err(Error {message: "Unexpected input type.".to_string()});
        };
        // ... rest of function
    }
    

用于调试的生成

  1. Ctrl+Shift+P 打开命令面板并搜索 Azure IoT 操作:生成所有数据流运算符

  2. 选择 “调试 ”作为生成模式。 等待生成完成。

在启用调试模式的情况下运行

  1. Ctrl+Shift+P 并搜索 Azure IoT操作:运行应用程序图

  2. 选择 lldb-debug.graph.dataflow.yaml 图形文件。

  3. 选择 “调试 ”作为运行模式。

  4. 在 VS Code 工作区中选择 data 文件夹作为您的输入数据。 DevX 容器启动以运行带有示例输入的图。

  5. DevX 容器启动后,您会看到主机应用容器以 lldb-server 启动以便调试。

使用特殊的调试图形文件,因为当前调试器只能附加到调试会话中的一个自定义 WASM 运算符。 使用专用调试图可以保持graph.dataflow.yaml的正常工作状态,并防止在存在多个模块时调试器出现不可预知的附着情况。

若要创建自己的调试图表文件,请执行以下操作:

  1. 将常规 graph.dataflow.yaml 复制到新文件。 使用 lldb- 前缀是约定,但名称是任意名称。
  2. 删除除要调试的运算符以外的所有其他自定义 WASM 运算符。
  3. 调试 模式下重新生成目标运算符,以便符号可用。
  4. 运行调试图,并将运行模式设置为 调试。 该扩展会自动启动lldb-server并附加到 VS Code。

调试 WASM 模块

  1. 执行会在函数中设置的 filter 断点处自动停止。

  2. 使用 VS Code 调试接口可以:

    • “变量 ”面板中检查变量值。
    • 使用 F10F11 单步执行代码。
    • “调用堆栈”面板中查看调用堆栈
    • 您可以为特定变量或表达式添加监视功能。
  3. 通过按 F5 或选择 “继续 ”按钮继续执行。

  4. 调试器在正在处理的每个消息的断点处停止,使你能够检查数据流。

调试提示

  • 使用 调试控制台 计算表达式并检查运行时状态。
  • 通过右键单击断点并添加条件来设置条件断点。
  • 使用F9在不删除断点的情况下启用和停用断点。
  • 变量 ”面板显示局部变量和函数参数的当前状态。

借助此调试功能,可以在部署到生产环境之前排查问题、了解数据流并验证 WASM 模块逻辑。

已知问题

数据流配置

  • YAML 中的布尔值:必须将布尔值引用为字符串,以避免验证错误。 例如,使用 "True""False" 代替 truefalse

    使用未引用的布尔值时的示例错误:

    * spec.connections[2].from.arm: Invalid value: "boolean": spec.connections[2].from.arm in body must be of type string: "boolean"
    * spec.connections[2].from.arm: Unsupported value: false: supported values: "False", "True"
    
  • Python 模块要求:若要使用 Python 模块,必须将 Azure IoT Operations 部署在配置为使用 中等 内存配置文件的 MQTT 代理服务器上。 当内存配置文件设置为 “低 ”或“ Tiny”时,无法拉取 Python 模块。

  • 模块部署计时:拉取和应用 WASM 模块可能需要一些时间,通常大约一分钟,具体取决于网络条件和模块大小。

VS Code 扩展

  • 生成错误详细信息:生成失败时,弹出通知中的错误消息可能无法提供足够的详细信息。 检查终端输出以获取更具体的错误信息。

  • Windows 兼容性:在 Windows 上,首次运行图形应用程序时,可能会遇到错误“命令失败并出现退出代码 1”。如果发生此错误,请重试该作,并应正常工作。

  • 主机应用稳定性:本地执行环境偶尔可能会停止工作,需要重启才能恢复。

Troubleshooting

查看日志

若要查看数据流服务日志,请运行以下命令:

kubectl logs -n azure-iot-operations -l app.kubernetes.io/name=microsoft-iotoperations-dataflows

若要查看数据处理服务日志,请运行以下命令:

kubectl logs -n azure-iot-operations -l app.kubernetes.io/instance=aio-dataflow-default

若要查看模块管理日志,请运行以下命令:

kubectl logs -n azure-iot-operations -l app.kubernetes.io/instance=aio-dataflow-graph-controller

恢复过程

VS Code 扩展重置:如果 VS Code 扩展的行为意外,请尝试卸载并重新安装它,然后重启 VS Code。