你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

为数据流图开发 WebAssembly (WASM) 模块和图形定义

本文介绍如何为 Azure IoT 操作图开发自定义 WebAssembly (WASM) 模块和图定义。 在 Rust 或 Python 中创建模块以实现自定义处理逻辑。 定义图配置,指定模块如何连接到完整的处理工作流。

重要

数据流图目前仅支持 MQTT、Kafka 和 OpenTelemetry 终结点。 不支持其他终结点类型,例如 Azure Data Lake、Microsoft Fabric OneLake、Azure 数据资源管理器和本地存储。 有关详细信息,请参阅 已知问题

若要了解如何使用 VS Code 扩展开发 WASM 模块,请参阅 使用 VS Code 扩展生成 WASM 模块

概述

Azure IoT 操作数据流图通过可配置的运算符对流数据进行处理,这些运算符是以 WebAssembly 模块的形式实现的。 每个运算符会在保持时间顺序的同时处理带时间戳的数据,从而实现具有确定性结果的实时分析。

主要优势

  • 实时处理:以一致的低延迟处理流数据
  • 事件时间语义:根据事件发生的时间而非处理时间处理数据
  • 容错:内置故障处理和数据一致性保证功能
  • 可伸缩性:在保持顺序保证的前提下,将处理工作分布到多个节点上
  • 多语言支持:可使用 Rust 或 Python 进行开发,并拥有一致的接口

体系结构基础知识

数据流图建立在 Timely Dataflow 计算模型之上,该模型源自 Microsoft Research 的 Naiad 项目。 此方法可确保实现以下目标:

  • 确定性处理:同一输入始终生成相同的输出
  • 进度跟踪:系统知道计算何时完成
  • 分布式协调:多个处理节点保持同步

为何使用及时数据流?

传统的流处理系统存在一些挑战。 乱序数据意味着事件到达时间可能晚于预期。 由于结果不完整,很难知道计算何时完成。 同步分布式处理时会发生协调问题。

Timely Dataflow 通过以下方法解决这些问题:

时间戳和进度跟踪

每个数据项都带有表示其逻辑时间的时间戳。 系统通过时间戳跟踪进度,从而实现多个关键功能:

  • 确定性处理:同一输入始终生成相同的输出
  • 仅一次语义:无重复或遗漏处理
  • 水印:能够知晓在特定时间段内不会传入更多数据

混合逻辑时钟

时间戳机制使用混合方法:

pub struct HybridLogicalClock {
    pub physical_time: u64,  // Wall-clock time when event occurred
    pub logical_time: u64,   // Logical ordering for events at same physical time
}

混合逻辑时钟方法可确保实现多种功能:

  • 因果顺序:结果在原因之后发生
  • 进度保证:系统知道处理何时完成
  • 分布式协调:多个节点保持同步

了解运算符和模块

了解运算符和模块之间的区别对于 WASM 开发至关重要:

运营商

运算符是基于 Timely Dataflow 运算符构建的基本处理单元。 每个运算符类型都有其特定的功能:

  • 映射:转换每个数据项(如转换温度单位)
  • 筛选器:仅允许某些数据项根据条件传递(例如删除无效读取)
  • 分支:根据条件将数据路由到不同的路径(例如分离温度和湿度数据)
  • 累积:在时间范围内收集和聚合数据(例如计算统计摘要)
  • 连接:在保留临时顺序时合并多个数据流
  • 延迟:通过推进时间戳来控制计时

模块

模块是以 WASM 代码的形式实现的运算符逻辑。 单个模块可以实现多个运算符类型。 例如,温度模块可能会提供:

  • 用于单位转换的 map 运算符
  • 用于阈值检查的 filter 运算符
  • 用于路由决策的 branch 运算符
  • 用于统计汇总的 accumulate 运算符

关系

图定义、模块和运算符之间的关系遵循特定模式:

Graph Definition → References Module → Provides Operator → Processes Data
     ↓                    ↓               ↓              ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C

此分离可实现以下目的:

  • 模块复用:可以在不同的图配置中部署相同的 WASM 模块
  • 独立版本控制:可以在不重新生成模块的情况下更新图定义
  • 动态配置:可以为同一个模块传递不同的参数以实现不同的行为

先决条件

选择开发语言并设置所需的工具:

  • Rust 工具链提供 cargorustc 和 编译运算符所需的标准库。 安装方式如下:

    curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
    
  • 生成 Azure IoT Operations WASM 组件所需的 WASM wasm32-wasip2 目标。 使用以下方式添加:

    rustup target add wasm32-wasip2
    
  • 生成工具提供给构建器和 CI 用于验证和打包 WASM 工件的实用工具。 安装方式如下:

    cargo install wasm-tools --version '=1.201.0' --locked
    

配置开发环境

可以通过自定义Microsoft Azure DevOps 注册表访问 WASM Rust SDK。 使用工作区配置文件配置访问权限,而不是使用环境变量:

# .cargo/config.toml (at your workspace root)
[registries]
aio-wg = { index = "sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/" }

[net]
git-fetch-with-cli = true

此设置复制 samples/wasm/.cargo/config.toml 处的示例布局,并将注册表设置保留在版本控制中。

创建项目

首先,为运算符模块创建新的项目目录。 项目结构取决于所选语言。

cargo new --lib temperature-converter
cd temperature-converter

配置 Cargo.toml

编辑 Cargo.toml 文件以包含 WASM SDK 和其他库的依赖项:

[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"

[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"

# Azure IoT Operations WASM SDK - provides operator macros and host APIs
wasm_graph_sdk = { version = "=1.1.3", registry = "aio-wg" }

# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }

[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]

有关版本和注册表的说明:

  • SDK 版本(=1.1.3)与当前示例保持一致;将其锁定可避免重大变更。
  • registry = "aio-wg" 匹配 .cargo/config.toml 中定义的注册表项。

关键依赖项说明:

  • wit-bindgen:从 WebAssembly 接口类型(WIT)定义生成 Rust 绑定,使代码能够与 WASM 运行时进行交互。
  • wasm_graph_sdk:Azure IoT Operations SDK 提供运算符宏,例如 #[map_operator],以及用于日志记录、指标和状态管理的主机 API #[filter_operator]
  • serde + serde_json:用于分析和生成数据有效负载的 JSON 处理库。 default-features = false 针对 WASM 大小约束进行优化。
  • crate-type = ["cdylib"]:将 Rust 库编译为 C 兼容的动态库,这是 WASM 模块生成所必需的。

创建简单模块

创建一个简单的模块,用于将温度从摄氏度转换为华氏度。 此示例演示 Rust 和 Python 实现的基本结构和处理逻辑。

use serde_json::{json, Value};

use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;

fn fahrenheit_to_celsius_init(_configuration: ModuleConfiguration) -> bool {
    logger::log(Level::Info, "temperature-converter", "Init invoked"); // one-time module init
    true
}

#[map_operator(init = "fahrenheit_to_celsius_init")]
fn fahrenheit_to_celsius(input: DataModel) -> Result<DataModel, Error> {
    let DataModel::Message(mut result) = input else {
        return Err(Error {
            message: "Unexpected input type".to_string(),
        });
    };

    let payload = &result.payload.read(); // payload bytes from inbound message
    if let Ok(data_str) = std::str::from_utf8(payload) {
        if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
            if let Some(temp) = data["temperature"]["value"].as_f64() {
                let fahrenheit = (temp * 9.0 / 5.0) + 32.0; // Celsius -> Fahrenheit
                data["temperature"] = json!({
                    "value_fahrenheit": fahrenheit,
                    "original_celsius": temp
                });

                if let Ok(output_str) = serde_json::to_string(&data) {
                    // Replace payload with owned bytes so the host receives the updated JSON
                    result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
                }
            }
        }
    }

    Ok(DataModel::Message(result))
}

生成模块

可根据开发工作流和环境需求,在本地开发构建和容器化构建之间进行选择。

本地生成

在开发过程中以及需要对构建环境进行完全控制时,可直接在开发计算机上进行构建,以实现最快的迭代速度。

# Build WASM module
cargo build --release --target wasm32-wasip2  # target required for Azure IoT Operations WASM components

# Find your module  
ls target/wasm32-wasip2/release/*.wasm

Docker 生成

使用预配置所有依赖项和架构的容器化环境进行生成。 这些 Docker 映像在不同的环境中提供一致的生成,非常适合 CI/CD 管道。

Azure IoT操作示例存储库维护Rust Docker构建器,并包含所有必要的依赖项。 有关详细文档,请参阅 Rust Docker 生成器使用情况

# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter

# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug

Docker 生成选项:

  • --app-name:必须与 Cargo.toml 中的 Rust Crate 名称一致
  • --build-mode:选择 release(默认)以进行优化生成,或选择 debug 以生成包含符号的开发生成

更多示例

有关综合示例,请参阅示例存储库中的 Rust 示例。 完整的实现包括:

  • map 运算符:数据转换和转换逻辑
  • filter 运算符:条件数据处理和验证
  • branch 运算符:基于数据内容的多路径路由
  • accumulate 运算符:按照时间范围执行的汇总和统计处理
  • delay 运算符:基于时间的处理控件

这些示例展示了可运行的实现,显示了每种运算符类型的完整结构,包括正确的错误处理和日志记录模式。

SDK 参考和 API

WASM Rust SDK 提供全面的开发工具:

运算符宏

use wasm_graph_sdk::macros::{map_operator, filter_operator, branch_operator};
use wasm_graph_sdk::{DataModel, HybridLogicalClock};

// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> Result<DataModel, Error> {
    // Transform logic here
}

// Filter operator - allows/rejects data based on predicate  
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> Result<bool, Error> {
    // Return true to pass data through, false to filter out
}

// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> Result<bool, Error> {
    // Return true for "True" arm, false for "False" arm
}

模块配置参数

WASM 运算符可以通过传递给 ModuleConfiguration 函数的 init 结构体接收运行时配置参数。 可以在图形定义中定义这些参数,这样就可以在不重新生成模块的情况下自定义运行时。

use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::ModuleConfiguration;

fn my_operator_init(configuration: ModuleConfiguration) -> bool {
    // Access required parameters
    if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
        let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
        logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
    }
    
    // Access optional parameters with defaults
    let unit = configuration.parameters
        .get("output_unit")
        .map(|s| s.as_str())
        .unwrap_or("celsius");
    
    logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
    true
}

有关在图形定义中定义配置参数的详细信息,请参阅模块配置参数

托管 API

使用 SDK 处理分布式服务:

持久数据的状态存储:

use wasm_graph_sdk::state_store;

// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;

// Get value  
let response = state_store::get(key.as_bytes(), None)?;

// Delete key
state_store::del(key.as_bytes(), None, None)?;

结构化日志记录:

use wasm_graph_sdk::logger::{self, Level};

logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));

OpenTelemetry 兼容的指标:

use wasm_graph_sdk::metrics;

// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;

// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;

ONNX 推理与 WASM

若要在模块中嵌入和运行小型 ONNX 模型进行带内推理,请参阅 WebAssembly 数据流图中的“运行 ONNX 推理”。 本文介绍使用模块打包模型、在图形定义中启用 wasi-nn 功能以及限制。

WebAssembly 接口类型 (WIT)

所有运算符均实现使用 WebAssembly 接口类型 (WIT) 定义的标准化接口。 WIT 提供与语言无关的接口定义,确保 WASM 模块与主机运行时之间的兼容性。

可以在 示例存储库中找到 Azure IoT作的完整 WIT 架构。 这些架构定义开发 WASM 模块时使用的所有接口、类型和数据结构。

数据模型和接口

所有 WASM 运算符都使用 WebAssembly 接口类型(WIT)定义的标准化数据模型。

核心数据模型

// Core timestamp structure using hybrid logical clock
record timestamp {
    timestamp: timespec,     // Physical time (seconds + nanoseconds)
    node-id: buffer-or-string,  // Logical node identifier
}

// Union type supporting multiple data formats
variant data-model {
    buffer-or-bytes(buffer-or-bytes),    // Raw byte data
    message(message),                    // Structured messages with metadata
    snapshot(snapshot),                  // Video/image frames with timestamps
}

// Structured message format
record message {
    timestamp: timestamp,
    content_type: buffer-or-string,
    payload: message-payload,
}

WIT 接口定义

每种运算符类型都实现了特定的 WIT 接口:

// Core operator interfaces
interface map {
    use types.{data-model};
    process: func(message: data-model) -> result<data-model, error>;
}

interface filter {
    use types.{data-model};
    process: func(message: data-model) -> result<bool, error>;
}

interface branch {
    use types.{data-model, hybrid-logical-clock};
    process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}

interface accumulate {
    use types.{data-model};
    process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}

图定义和 WASM 集成

图形定义显示 WASM 模块如何连接到处理工作流。 它们指定了创建完整数据处理管道的操作、连接和参数。

有关创建和配置图定义的全面信息,包括简单和复杂工作流的详细示例,请参阅配置数据流图的 WebAssembly 图定义

图定义指南中涵盖的主要主题:

  • 图定义结构:了解 YAML 架构和所需组件
  • 简单的图示例:基本的三段式温度转换管道
  • 复杂图形示例:具有分支和聚合的多传感器处理
  • 模块配置参数:WASM 运算符的运行时自定义
  • 注册表部署:将图定义打包和存储为 OCI 项目

后续步骤