Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este artículo, se muestra cómo desarrollar módulos personalizados de WebAssembly (WASM) y definiciones de grafos para gráficos de flujo de datos de Operaciones de IoT de Azure. Cree módulos en Rust o Python para implementar la lógica de procesamiento personalizada. Defina configuraciones de grafos que especifiquen cómo se conectan los módulos a flujos de trabajo de procesamiento completos.
Importante
Actualmente, los gráficos de flujo de datos solo admiten puntos de conexión MQTT, Kafka y OpenTelemetry. No se admiten otros tipos de punto de conexión, como Azure Data Lake, Microsoft Fabric OneLake, Azure Data Explorer y almacenamiento local. Para más información, consulte Problemas conocidos.
Para obtener información sobre cómo desarrollar módulos WASM mediante la extensión de VS Code, consulte Compilación de módulos WASM con la extensión de VS Code.
Para más información sobre los grafos y WASM en Azure IoT Operations, consulte:
- Uso de un gráfico de flujo de datos con módulos WebAssembly
- Transformación de datos entrantes con módulos WebAssembly
Información general
Los gráficos de flujo de datos de Operaciones de IoT de Azure procesan los datos de streaming a través de operadores configurables implementados como módulos WebAssembly. Cada operador procesa los datos con marcas de tiempo mientras se mantiene la ordenación temporal, lo que permite el análisis en tiempo real con resultados deterministas.
Ventajas clave
- Procesamiento en tiempo real: control de datos de streaming con baja latencia coherente
- Semántica en tiempo de evento: procese datos basados en cuándo se producen eventos, no cuándo se procesan
- Tolerancia a errores: compatibilidad integrada para controlar errores y garantizar la coherencia de los datos
- Escalabilidad: distribuir el procesamiento entre varios nodos a la vez que se mantienen garantías de orden
- Compatibilidad con varios lenguajes: desarrollo en Rust o Python con interfaces coherentes
Base de arquitectura
Los gráficos de flujo de datos se basan en el modelo de cálculo de flujo de datos a tiempo, que se originó en el proyecto Naiad de Microsoft Research. Este enfoque garantiza:
- Procesamiento determinista: la misma entrada siempre genera la misma salida.
- Seguimiento de progreso: el sistema sabe cuándo se completan los cálculos.
- Coordinación distribuida: varios nodos de procesamiento permanecen sincronizados
¿Por qué usar el flujo de datos puntual?
Los sistemas tradicionales de procesamiento de flujos tienen varios desafíos. Los datos desordenados significan que los eventos pueden llegar después de lo esperado. Los resultados parciales hacen que sea difícil saber cuándo finalizan los cálculos. Los problemas de coordinación se producen al sincronizar el procesamiento distribuido.
El flujo de datos a tiempo resuelve estos problemas a través de:
Marcas de tiempo y seguimiento de progreso
Cada elemento de datos lleva una marca de tiempo que representa su tiempo lógico. El sistema realiza un seguimiento del progreso a través de marcas de tiempo, lo que permite varias funcionalidades clave:
- Procesamiento determinista: la misma entrada siempre genera la misma salida.
- Semántica exactamente una vez: sin procesamiento duplicado o perdido
- Marcas de agua: saber cuándo no llegarán más datos durante un tiempo determinado
Reloj lógico híbrido
El mecanismo de marca de tiempo usa un enfoque híbrido:
pub struct HybridLogicalClock {
pub physical_time: u64, // Wall-clock time when event occurred
pub logical_time: u64, // Logical ordering for events at same physical time
}
El enfoque de reloj lógico híbrido garantiza varias funcionalidades:
- Ordenación causal: los efectos siguen las causas
- Garantías de progreso: el sistema sabe cuándo se completa el procesamiento
- Coordinación distribuida: varios nodos permanecen sincronizados
Descripción de operadores y módulos
Comprender la distinción entre operadores y módulos es esencial para el desarrollo de WASM:
Operadores
Los operadores son las unidades de procesamiento fundamentales basadas en operadores de flujo de datos a tiempo. Cada tipo de operador sirve para un propósito específico:
- Mapa: transformar cada elemento de datos (por ejemplo, convertir unidades de temperatura)
- Filtro: permitir que solo determinados elementos de datos pasen en función de las condiciones (como quitar lecturas no válidas)
- Rama: enruta los datos a diferentes rutas de acceso en función de las condiciones (como separar los datos de temperatura y humedad)
- Acumular: Reunir y agregar datos en ventanas de tiempo (como calcular resúmenes estadísticos)
- Concatenar: combinar varios flujos de datos al tiempo que se conserva el orden temporal
- Retraso: Controlar el tiempo adelantando las marcas de tiempo
Módulos
Los módulos son la implementación de la lógica del operador como código WASM. Un único módulo puede implementar varios tipos de operador. Por ejemplo, un módulo de temperatura podría proporcionar:
- Operador de mapa para la conversión de unidades
- Operador de filtro para la comprobación de umbrales
- Operador de sucursal para tomar decisiones de enrutamiento
- Operador acumulado para la agregación estadística
La relación
La relación entre las definiciones de grafos, los módulos y los operadores sigue un patrón específico:
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
Esta separación habilita:
- Reutilización de módulos: implementar el mismo módulo WASM en distintas configuraciones de grafos
- Control de versiones independiente: actualizar definiciones de grafos sin volver a generar módulos
- Configuración dinámica: pasar parámetros diferentes al mismo módulo para comportamientos diferentes
Prerrequisitos
Elija el lenguaje de desarrollo y configure las herramientas necesarias:
La cadena de herramientas de Rust proporciona
cargo,rustcy la biblioteca estándar necesaria para compilar operadores. Instale con:curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -yObjetivo
wasm32-wasip2de WASM necesario para compilar componentes WASM de Operaciones de IoT de Azure. Agregue con:rustup target add wasm32-wasip2Las herramientas de compilación proporcionan utilidades que usan los compiladores y CI para validar y empaquetar artefactos WASM. Instale con:
cargo install wasm-tools --version '=1.201.0' --locked
Configuración del entorno de desarrollo
Puede acceder al SDK de WASM Rust a través de un registro personalizado de Microsoft Azure DevOps. En lugar de usar variables de entorno, configure el acceso con un archivo de configuración del área de trabajo:
# .cargo/config.toml (at your workspace root)
[registries]
aio-wg = { index = "sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/" }
[net]
git-fetch-with-cli = true
Esta configuración replica el diseño de ejemplo en samples/wasm/.cargo/config.toml y mantiene la configuración del Registro dentro del control de versiones.
Crear proyecto
Empiece por crear un nuevo directorio de proyecto para el módulo de operador. La estructura del proyecto depende del idioma elegido.
cargo new --lib temperature-converter
cd temperature-converter
Configuración de Cargo.toml
Edite el archivo Cargo.toml para incluir dependencias para el SDK de WASM y otras bibliotecas:
[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"
[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"
# Azure IoT Operations WASM SDK - provides operator macros and host APIs
wasm_graph_sdk = { version = "=1.1.3", registry = "aio-wg" }
# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }
[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]
Notas sobre las versiones y el registro:
- La versión del SDK (
=1.1.3) se alinea con los ejemplos actuales; mantenerla anclada evita cambios importantes. -
registry = "aio-wg"coincide con la entrada del Registro definida en.cargo/config.toml.
Dependencias clave explicadas:
-
wit-bindgen: genera vinculaciones de Rust a partir de definiciones de tipos de interfaz de WebAssembly (WIT), permitiendo que el código interactúe con el entorno de ejecución WASM. -
wasm_graph_sdk: SDK de operaciones de Azure IoT que proporciona macros de operador, como#[map_operator]y#[filter_operator], y API de host para el registro, las métricas y la administración de estado. -
serde+serde_json: bibliotecas de procesamiento JSON para analizar y generar cargas de datos.default-features = falseoptimiza las restricciones de tamaño de WASM. -
crate-type = ["cdylib"]: compila la biblioteca de Rust como una biblioteca dinámica compatible con C, que es necesaria para la generación de módulos WASM.
Creación de un módulo sencillo
Cree un módulo sencillo que convierta la temperatura de Celsius a Fahrenheit. En este ejemplo, se muestra la estructura básica y la lógica de procesamiento para las implementaciones de Rust y Python.
use serde_json::{json, Value};
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
fn fahrenheit_to_celsius_init(_configuration: ModuleConfiguration) -> bool {
logger::log(Level::Info, "temperature-converter", "Init invoked"); // one-time module init
true
}
#[map_operator(init = "fahrenheit_to_celsius_init")]
fn fahrenheit_to_celsius(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut result) = input else {
return Err(Error {
message: "Unexpected input type".to_string(),
});
};
let payload = &result.payload.read(); // payload bytes from inbound message
if let Ok(data_str) = std::str::from_utf8(payload) {
if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
if let Some(temp) = data["temperature"]["value"].as_f64() {
let fahrenheit = (temp * 9.0 / 5.0) + 32.0; // Celsius -> Fahrenheit
data["temperature"] = json!({
"value_fahrenheit": fahrenheit,
"original_celsius": temp
});
if let Ok(output_str) = serde_json::to_string(&data) {
// Replace payload with owned bytes so the host receives the updated JSON
result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
}
}
}
}
Ok(DataModel::Message(result))
}
Generar el módulo
Elija entre compilaciones de desarrollo local o compilaciones en contenedores en función del flujo de trabajo de desarrollo y los requisitos del entorno.
Compilación local
Compile directamente en la máquina de desarrollo para una iteración más rápida durante el desarrollo y cuando necesite un control total sobre el entorno de compilación.
# Build WASM module
cargo build --release --target wasm32-wasip2 # target required for Azure IoT Operations WASM components
# Find your module
ls target/wasm32-wasip2/release/*.wasm
Compilación de Docker
Compile mediante entornos en contenedor con todas las dependencias y esquemas preconfigurados. Estas imágenes de Docker proporcionan compilaciones coherentes en distintos entornos y son ideales para canalizaciones de CI/CD.
El repositorio de ejemplos de Azure IoT Operations mantiene el generador de Docker de Rust e incluye todas las dependencias necesarias. Para obtener documentación detallada, consulte Uso del generador de Docker de Rust.
# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter
# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug
Opciones de compilación de Docker:
-
--app-name: debe coincidir con el nombre de la caja de Rust desdeCargo.toml -
--build-mode: elijarelease(valor predeterminado) para compilaciones optimizadas odebugpara compilaciones de desarrollo con símbolos
Más ejemplos
Para obtener ejemplos completos, consulte los ejemplos de Rust en el repositorio de ejemplos. Las implementaciones completas incluyen:
- Operadores de mapa: lógica de transformación y conversión de datos
- Operadores de filtro: procesamiento y validación de datos condicionales
- Operadores de rama: enrutamiento de varias rutas en función del contenido de datos
- Operadores acumulados: agregación en período de tiempo y procesamiento estadístico
- Operadores de retraso: control de procesamiento basado en tiempo
Los ejemplos muestran implementaciones de trabajo que muestran la estructura completa de cada tipo de operador, incluidos los patrones de registro y control de errores adecuados.
Referencia del SDK y API
El SDK de WASM Rust proporciona herramientas de desarrollo completas:
Macros de operador
use wasm_graph_sdk::macros::{map_operator, filter_operator, branch_operator};
use wasm_graph_sdk::{DataModel, HybridLogicalClock};
// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> Result<DataModel, Error> {
// Transform logic here
}
// Filter operator - allows/rejects data based on predicate
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> Result<bool, Error> {
// Return true to pass data through, false to filter out
}
// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> Result<bool, Error> {
// Return true for "True" arm, false for "False" arm
}
Parámetros de configuración del módulo
Los operadores WASM pueden recibir parámetros de configuración en tiempo de ejecución a través de la estructura ModuleConfiguration que se pasa a la función init. Estos parámetros se definen en la definición del grafo, lo que le permite personalizar el entorno de ejecución sin volver a generar módulos.
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::ModuleConfiguration;
fn my_operator_init(configuration: ModuleConfiguration) -> bool {
// Access required parameters
if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
}
// Access optional parameters with defaults
let unit = configuration.parameters
.get("output_unit")
.map(|s| s.as_str())
.unwrap_or("celsius");
logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
true
}
Para obtener información detallada sobre cómo definir parámetros de configuración en definiciones de grafos, consulte Parámetros de configuración del módulo.
API de host
Use el SDK para trabajar con servicios distribuidos:
Almacén de estado para datos persistentes:
use wasm_graph_sdk::state_store;
// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;
// Get value
let response = state_store::get(key.as_bytes(), None)?;
// Delete key
state_store::del(key.as_bytes(), None, None)?;
Registro estructurado:
use wasm_graph_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
Métricas compatibles con OpenTelemetry:
use wasm_graph_sdk::metrics;
// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;
// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;
Inferencia de ONNX con WASM
Para insertar y ejecutar modelos ONNX pequeños dentro de los módulos para la inferencia en banda, consulte Ejecución de la inferencia de ONNX en gráficos de flujo de datos de WebAssembly. En este artículo se tratan los modelos de empaquetado con módulos, habilitando la característica wasi-nn en las definiciones de grafos y las limitaciones.
Tipos de interfaz WebAssembly (WIT)
Todos los operadores implementan interfaces estandarizadas definidas mediante tipos de interfaz WebAssembly (WIT). WIT proporciona definiciones de interfaz independientes del lenguaje que garantizan la compatibilidad entre los módulos WASM y el entorno de ejecución del host.
Puede encontrar los esquemas WIT completos para operaciones de Azure IoT en el repositorio de ejemplos. Estos esquemas definen todas las interfaces, tipos y estructuras de datos con las que trabaja al desarrollar módulos WASM.
Modelo de datos e interfaces
Todos los operadores WASM funcionan con modelos de datos estandarizados definidos mediante tipos de interfaz WebAssembly (WIT):
Modelo de datos principales
// Core timestamp structure using hybrid logical clock
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
node-id: buffer-or-string, // Logical node identifier
}
// Union type supporting multiple data formats
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // Structured messages with metadata
snapshot(snapshot), // Video/image frames with timestamps
}
// Structured message format
record message {
timestamp: timestamp,
content_type: buffer-or-string,
payload: message-payload,
}
Definiciones de interfaz WIT
Cada tipo de operador implementa una interfaz WIT específica:
// Core operator interfaces
interface map {
use types.{data-model};
process: func(message: data-model) -> result<data-model, error>;
}
interface filter {
use types.{data-model};
process: func(message: data-model) -> result<bool, error>;
}
interface branch {
use types.{data-model, hybrid-logical-clock};
process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}
interface accumulate {
use types.{data-model};
process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}
Definiciones de grafos e integración de WASM
Las definiciones de grafos muestran cómo los módulos WASM se conectan a los flujos de trabajo de procesamiento. Especifican las operaciones, las conexiones y los parámetros que crean canalizaciones de procesamiento de datos completas.
Para obtener información completa sobre cómo crear y configurar definiciones de grafos, incluidos ejemplos detallados de flujos de trabajo simples y complejos, consulte Configuración de definiciones de grafos WebAssembly para gráficos de flujo de datos.
Temas clave tratados en la guía de definiciones de grafos:
- Estructura de definición de grafos: descripción del esquema YAML y los componentes necesarios
- Ejemplo de grafo simple: canalización básica de conversión de temperatura de tres fases
- Ejemplo de grafo complejo: procesamiento de varios sensores con bifurcación y agregación
- Parámetros de configuración del módulo: personalización en tiempo de ejecución de operadores WASM
- Implementación del Registro: empaquetado y almacenamiento de definiciones de grafos como artefactos de OCI
Pasos siguientes
- Consulte ejemplos completos y patrones avanzados en el repositorio de ejemplos WASM de Operaciones de IoT de Azure.
- Aprenda a implementar los módulos en Uso de WebAssembly con gráficos de flujo de datos.
- Configure los puntos de conexión de flujo de datos en Configuración de puntos de conexión de flujo de datos.