你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
本文介绍如何为 Azure IoT 操作图开发自定义 WebAssembly (WASM) 模块和图定义。 在 Rust 或 Python 中创建模块以实现自定义处理逻辑。 定义图配置,指定模块如何连接到完整的处理工作流。
重要
数据流图目前仅支持 MQTT、Kafka 和 OpenTelemetry 终结点。 不支持其他终结点类型,例如 Data Lake、Microsoft Fabric OneLake、Azure 数据资源管理器和本地存储。 有关详细信息,请参阅 已知问题。
概述
Azure IoT 操作数据流图通过可配置的运算符对流数据进行处理,这些运算符是以 WebAssembly 模块的形式实现的。 每个运算符会在保持时间顺序的同时处理带时间戳的数据,从而实现具有确定性结果的实时分析。
主要优势
- 实时处理:以一致的低延迟处理流数据
- 事件时间语义:根据事件发生的时间而非处理时间处理数据
- 容错:内置故障处理和数据一致性保证功能
- 可伸缩性:在保持顺序保证的前提下,将处理工作分布到多个节点上
- 多语言支持:可使用 Rust 或 Python 进行开发,并拥有一致的接口
体系结构基础知识
数据流图建立在 Timely Dataflow 计算模型之上,该模型源自 Microsoft Research 的 Naiad 项目。 此方法可确保实现以下目标:
- 确定性处理:同一输入始终生成相同的输出
- 进度跟踪:系统知道计算何时完成
- 分布式协调:多个处理节点保持同步
为什么使用 Timely Dataflow?
传统的流处理系统存在一些挑战。 乱序数据意味着事件到达时间可能晚于预期。 由于结果不完整,很难知道计算何时完成。 同步分布式处理时会发生协调问题。
Timely Dataflow 通过以下方法解决这些问题:
时间戳和进度跟踪
每个数据项都带有表示其逻辑时间的时间戳。 系统通过时间戳跟踪进度,从而实现多个关键功能:
- 确定性处理:同一输入始终生成相同的输出
- 仅一次语义:无重复或遗漏处理
- 水印:能够知晓在特定时间段内不会传入更多数据
混合逻辑时钟
时间戳机制使用混合方法:
pub struct HybridLogicalClock {
pub physical_time: u64, // Wall-clock time when event occurred
pub logical_time: u64, // Logical ordering for events at same physical time
}
混合逻辑时钟方法可确保实现多种功能:
- 因果顺序:结果在原因之后发生
- 进度保证:系统知道处理何时完成
- 分布式协调:多个节点保持同步
了解运算符和模块
了解运算符和模块之间的区别对于 WASM 开发至关重要:
运营商
运算符是基于 Timely Dataflow 运算符构建的基本处理单元。 每个运算符类型都有其特定的功能:
- Map:转换各个数据项(如转换温度单位)
- Filter:根据条件(例如删除无效读数)仅允许某些数据项通过
- Branch:根据条件将数据路由到不同的路径(例如分离温度和湿度数据)
- Accumulate:收集并汇总特定时间范围内的数据(例如计算统计摘要)
- Concatenate:在保持时间顺序的前提下合并多个数据流
- Delay:通过推进时间戳来控制时间
模块
模块是以 WASM 代码的形式实现的运算符逻辑。 单个模块可以实现多个运算符类型。 例如,温度模块可能会提供:
- 用于单位转换的 map 运算符
- 用于阈值检查的 filter 运算符
- 用于路由决策的 branch 运算符
- 用于统计汇总的 accumulate 运算符
关系
图定义、模块和运算符之间的关系遵循特定模式:
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
这种分离带来了以下好处:
- 模块复用:可以在不同的图配置中部署相同的 WASM 模块
- 独立版本控制:可以在不重新生成模块的情况下更新图定义
- 动态配置:可以为同一个模块传递不同的参数以实现不同的行为
先决条件
选择开发语言并设置所需的工具:
-
Rust 工具链:安装方式:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y -
WASM 目标:添加方式:
rustup target add wasm32-wasip2 -
生成工具:安装方式:
cargo install wasm-tools --version '=1.201.0' --locked
配置开发环境
WASM Rust SDK 可通过自定义 Azure DevOps 注册表获得。 通过设置以下环境变量来配置访问权限:
export CARGO_REGISTRIES_AZURE_VSCODE_TINYKUBE_INDEX="sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/"
export CARGO_NET_GIT_FETCH_WITH_CLI=true
为了实现持久访问,请将以下环境变量添加到 shell 配置文件中:
echo 'export CARGO_REGISTRIES_AZURE_VSCODE_TINYKUBE_INDEX="sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/"' >> ~/.bashrc
echo 'export CARGO_NET_GIT_FETCH_WITH_CLI=true' >> ~/.bashrc
source ~/.bashrc
创建项目
首先,为运算符模块创建新的项目目录。 项目结构取决于所选语言。
cargo new --lib temperature-converter
cd temperature-converter
配置 Cargo.toml
编辑 Cargo.toml 文件以包含 WASM SDK 和其他库的依赖项:
[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"
[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"
# Azure IoT Operations WASM SDK - provides operator macros and host APIs
tinykube_wasm_sdk = { version = "0.2.0", registry = "azure-vscode-tinykube" }
# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }
[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]
关键依赖项说明:
-
wit-bindgen:从 WebAssembly 接口类型 (WIT) 定义生成 Rust 绑定,使代码能够与 WASM 运行时进行交互 -
tinykube_wasm_sdk:Azure IoT 操作 SDK 提供用于日志记录、指标和状态管理的运算符宏(如#[map_operator]、#[filter_operator]等)和主机 API -
serde+serde_json:用于解析和生成数据有效负载的 JSON 处理库;default-features = false针对 WASM 大小约束进行优化 -
crate-type = ["cdylib"]:将 Rust 库编译为与 C 语言兼容的动态库,这是生成 WASM 模块的必要项
创建简单模块
创建一个简单的模块,用于将温度从摄氏度转换为华氏度。 此示例演示 Rust 和 Python 实现的基本结构和处理逻辑。
use serde_json::{json, Value};
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
fn fahrenheit_to_celsius_init(_configuration: ModuleConfiguration) -> bool {
logger::log(Level::Info, "temperature-converter", "Init invoked");
true
}
#[map_operator(init = "fahrenheit_to_celsius_init")]
fn fahrenheit_to_celsius(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut result) = input else {
return Err(Error {
message: "Unexpected input type".to_string(),
});
};
let payload = &result.payload.read();
if let Ok(data_str) = std::str::from_utf8(payload) {
if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
if let Some(temp) = data["temperature"]["value"].as_f64() {
let fahrenheit = (temp * 9.0 / 5.0) + 32.0;
data["temperature"] = json!({
"value_fahrenheit": fahrenheit,
"original_celsius": temp
});
if let Ok(output_str) = serde_json::to_string(&data) {
result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
}
}
}
}
Ok(DataModel::Message(result))
}
生成模块
可根据开发工作流和环境需求,在本地开发构建和容器化构建之间进行选择。
本地生成
在开发过程中以及需要对构建环境进行完全控制时,可直接在开发计算机上进行构建,以实现最快的迭代速度。
# Build WASM module
cargo build --release --target wasm32-wasip2
# Find your module
ls target/wasm32-wasip2/release/*.wasm
Docker 生成
使用预配置所有依赖项和架构的容器化环境进行生成。 这些 Docker 映像在不同的环境中提供一致的生成,非常适合 CI/CD 管道。
系统在 Azure IoT 操作示例存储库中维护 Rust Docker 生成器,其中包含所有必要的依赖项。 有关详细文档,请参阅 Rust Docker 生成器使用情况。
# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter
# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug
Docker 生成选项:
-
--app-name:必须与Cargo.toml中的 Rust Crate 名称一致 -
--build-mode:选择release(默认)以进行优化生成,或选择debug以生成包含符号的开发生成
更多示例
有关综合示例,请参阅示例存储库中的 Rust 示例。 完整的实现包括:
- map 运算符:数据转换和转换逻辑
- filter 运算符:条件数据处理和验证
- branch 运算符:基于数据内容的多路径路由
- accumulate 运算符:按照时间范围执行的汇总和统计处理
- delay 运算符:基于时间的处理控件
这些示例展示了可运行的实现,显示了每种运算符类型的完整结构,包括正确的错误处理和日志记录模式。
SDK 参考和 API
WASM Rust SDK 提供全面的开发工具:
运算符宏
use tinykube_wasm_sdk::macros::{map_operator, filter_operator, branch_operator};
use tinykube_wasm_sdk::{DataModel, HybridLogicalClock};
// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> DataModel {
// Transform logic here
}
// Filter operator - allows/rejects data based on predicate
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> bool {
// Return true to pass data through, false to filter out
}
// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> bool {
// Return true for "True" arm, false for "False" arm
}
模块配置参数
WASM 运算符可以通过传递给 init 函数的 ModuleConfiguration 结构体接收运行时配置参数。 这些参数在图定义中定义,可实现无需重新构建模块的运行时自定义。
use tinykube_wasm_sdk::logger::{self, Level};
use tinykube_wasm_sdk::ModuleConfiguration;
fn my_operator_init(configuration: ModuleConfiguration) -> bool {
// Access required parameters
if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
}
// Access optional parameters with defaults
let unit = configuration.parameters
.get("output_unit")
.map(|s| s.as_str())
.unwrap_or("celsius");
logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
true
}
有关在图形定义中定义配置参数的详细信息,请参阅模块配置参数。
托管 API
使用 SDK 处理分布式服务:
持久数据的状态存储:
use tinykube_wasm_sdk::state_store;
// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;
// Get value
let response = state_store::get(key.as_bytes(), None)?;
// Delete key
state_store::del(key.as_bytes(), None, None)?;
结构化日志记录:
use tinykube_wasm_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
OpenTelemetry 兼容的指标:
use tinykube_wasm_sdk::metrics;
// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;
// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;
使用 WASM 进行 ONNX 推理
若要在模块中嵌入和运行小型 ONNX 模型进行带内推理,请参阅 WebAssembly 数据流图中的“运行 ONNX 推理”。 本文介绍使用模块打包模型、在图形定义中启用 wasi-nn 功能以及限制。
WebAssembly 接口类型 (WIT)
所有运算符均实现使用 WebAssembly 接口类型 (WIT) 定义的标准化接口。 WIT 提供与语言无关的接口定义,确保 WASM 模块与主机运行时之间的兼容性。
示例存储库中提供了 Azure IoT 操作的完整 WIT 架构。 这些架构定义了在开发 WASM 模块时将使用的所有接口、类型和数据结构。
数据模型和接口
所有 WASM 运算符都使用基于 WebAssembly 接口类型 (WIT) 定义的标准化数据模型:
核心数据模型
// Core timestamp structure using hybrid logical clock
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
node-id: buffer-or-string, // Logical node identifier
}
// Union type supporting multiple data formats
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // Structured messages with metadata
snapshot(snapshot), // Video/image frames with timestamps
}
// Structured message format
record message {
timestamp: timestamp,
content_type: buffer-or-string,
payload: message-payload,
}
WIT 接口定义
每种运算符类型都实现了特定的 WIT 接口:
// Core operator interfaces
interface map {
use types.{data-model};
process: func(message: data-model) -> data-model;
}
interface filter {
use types.{data-model};
process: func(message: data-model) -> bool;
}
interface branch {
use types.{data-model, hybrid-logical-clock};
process: func(timestamp: hybrid-logical-clock, message: data-model) -> bool;
}
interface accumulate {
use types.{data-model};
process: func(staged: data-model, message: list<data-model>) -> data-model;
}
图定义和 WASM 集成
图定义定义了 WASM 模块与处理工作流的连接方式。 它们指定了创建完整数据处理管道的操作、连接和参数。
有关创建和配置图定义的全面信息,包括简单和复杂工作流的详细示例,请参阅配置数据流图的 WebAssembly 图定义。
图定义指南中涵盖的主要主题:
- 图定义结构:了解 YAML 架构和所需组件
- 简单的图示例:基本的三段式温度转换管道
- 复杂图形示例:具有分支和聚合的多传感器处理
- 模块配置参数:WASM 运算符的运行时自定义
- 注册表部署:将图定义打包和存储为 OCI 项目
后续步骤
- 请参阅 Azure IoT 操作 WASM 示例存储库中的完整示例和高级模式。
- 参阅“将 WebAssembly 与数据流图配合使用”,了解如何部署模块。
- 在“配置数据流终结点”中配置数据流终结点。