Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Este artigo mostra como desenvolver módulos personalizados de WebAssembly (WASM) e definições de grafo para grafos de fluxo de dados das Operações do Azure IoT. Crie módulos no Rust ou Python para implementar a lógica de processamento personalizada. Defina as configurações de grafo que especificam como seus módulos se conectam a fluxos de trabalho de processamento completos.
Importante
Atualmente, os grafos de fluxo de dados suportam apenas pontos de extremidade MQTT, Kafka e OpenTelemetry. Não há suporte para outros tipos de ponto de extremidade, como Azure Data Lake, Microsoft Fabric OneLake, Azure Data Explorer e armazenamento local. Para obter mais informações, consulte Problemas conhecidos.
Para saber como desenvolver módulos WASM usando a extensão do VS Code, consulte Compilar módulos WASM com a extensão VS Code.
Para saber mais sobre grafos e WASM nas Operações de IoT do Azure, confira:
- Usar um grafo de fluxo de dados com módulos WebAssembly
- Transformar dados de entrada com módulos WebAssembly
Visão geral
Os grafos de fluxo de dados das Operações do Azure IoT processam dados de streaming por meio de operadores configuráveis implementados como os módulos WebAssembly. Cada operador processa dados com carimbo de data/hora, mantendo a ordenação temporal, habilitando a análise em tempo real com resultados determinísticos.
Principais benefícios
- Processamento em tempo real: manipular dados de streaming com baixa latência consistente
- Semântica de tempo de evento: processar dados com base em quando os eventos ocorreram, não quando eles são processados
- Tolerância a falhas: suporte interno para lidar com falhas e garantir consistência de dados
- Escalabilidade: distribuir o processamento entre vários nós e, ao mesmo tempo, manter as garantias de ordem
- Suporte a vários idiomas: desenvolver no Rust ou Python com interfaces consistentes
Base de arquitetura
Os grafos de fluxo de dados se baseiam no modelo computacional de Fluxo de dados oportuno, que se originou do projeto Naiad do Microsoft Research. Essa abordagem garante:
- Processamento determinístico: a mesma entrada sempre produz a mesma saída
- Acompanhamento de progresso: o sistema sabe quando os cálculos são concluídos
- Coordenação distribuída: vários nós de processamento permanecem sincronizados
Por que usar o fluxo de dados oportuno?
Os sistemas tradicionais de processamento de fluxo têm vários desafios. Os dados fora de ordem significam que os eventos podem chegar mais tarde do que o esperado. Os resultados parciais dificultam a conclusão dos cálculos. Os problemas de coordenação ocorrem ao sincronizar o processamento distribuído.
O fluxo de dados oportuno resolve esses problemas por meio de:
Carimbos de data/hora e acompanhamento de progresso
Cada item de dados carrega um carimbo de data/hora que representa seu tempo lógico. O sistema acompanha o progresso por meio de carimbos de data/hora, habilitando vários recursos principais:
- Processamento determinístico: a mesma entrada sempre produz a mesma saída
- Semântica de exatamente uma vez: nenhum processamento duplicado ou perdido
- Marcas d'água: saiba quando não haverá mais dados para um determinado momento
Relógio lógico híbrido
O mecanismo de carimbo de data/hora usa uma abordagem híbrida:
pub struct HybridLogicalClock {
pub physical_time: u64, // Wall-clock time when event occurred
pub logical_time: u64, // Logical ordering for events at same physical time
}
A abordagem do relógio lógico híbrido garante várias funcionalidades:
- Ordenação causal: os efeitos seguem as causas
- Garantias de progresso: o sistema sabe quando o processamento é concluído
- Coordenação distribuída: vários nós permanecem sincronizados
Entender os operadores e módulos
Entender a distinção entre operadores e módulos é essencial para o desenvolvimento do WASM:
Operadores
Os operadores são as unidades de processamento fundamentais com base em Operadores de fluxo de dados oportunos. Cada tipo de operador serve a uma finalidade específica:
- Mapa: Transformar cada item de dados (como converter unidades de temperatura)
- Filtro: permitir que apenas determinados itens de dados passem com base em condições (como remover leituras inválidas)
- Branch: Rotear dados para caminhos diferentes com base em condições (como separar dados de temperatura e umidade)
- Acumular: coletar e agregar dados em janelas de tempo (como resumos estatísticos de computação)
- Concatenar: mesclar vários fluxos de dados ao preservar a ordem temporal
- Atraso: Controlar o timing avançando os carimbos de data/hora
Módulos
Os módulos são a implementação da lógica do operador como o código do WASM. Um único módulo pode implementar vários tipos de operador. Por exemplo, um módulo de temperatura pode fornecer:
- Um operador de mapa para conversão de unidade
- Um operador de filtro para verificação de limite
- Um operador de ramificação para decisões de roteamento
- Um operador acumulado para agregação estatística
O relacionamento
O relacionamento entre definições de grafo, módulos e operadores segue um padrão específico:
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
Essa separação permite:
- Reutilização do módulo: implantar o mesmo módulo WASM em diferentes configurações de grafo
- Controle de versão independente: atualizar as definições de grafo sem recompilar os módulos
- Configuração dinâmica: passar parâmetros diferentes para o mesmo módulo para comportamentos diferentes
Pré-requisitos
Escolha sua linguagem de desenvolvimento e configure as ferramentas necessárias:
A cadeia de ferramentas Rust fornece
cargo,rustce a biblioteca padrão necessária para compilar operadores. Instale com:curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -yDestino WASM
wasm32-wasip2necessário para criar componentes WASM das Operações do Azure IoT. Adicione com:rustup target add wasm32-wasip2As ferramentas de compilação fornecem utilitários usados pelos compiladores e pela integração contínua (CI) para validar e empacotar artefatos WASM. Instale com:
cargo install wasm-tools --version '=1.201.0' --locked
Configurar o ambiente de desenvolvimento
Você pode acessar o SDK do RUST do WASM por meio de um registro personalizado do Microsoft Azure DevOps. Em vez de usar variáveis de ambiente, configure o acesso com um arquivo de configuração do workspace:
# .cargo/config.toml (at your workspace root)
[registries]
aio-wg = { index = "sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/" }
[net]
git-fetch-with-cli = true
Essa configuração espelha o layout de exemplo em samples/wasm/.cargo/config.toml e mantém as configurações do Registro sob controle de versão.
Criar projeto
Comece criando um novo diretório de projeto para o módulo do operador. A estrutura do projeto depende do idioma escolhido.
cargo new --lib temperature-converter
cd temperature-converter
Configurar Cargo.toml
Edite o arquivo Cargo.toml para incluir dependências para o SDK do WASM e outras bibliotecas:
[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"
[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"
# Azure IoT Operations WASM SDK - provides operator macros and host APIs
wasm_graph_sdk = { version = "=1.1.3", registry = "aio-wg" }
# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }
[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]
Anotações sobre versões e registro:
- A versão do SDK (
=1.1.3) se alinha com os exemplos atuais; mantê-la fixada evita alterações significativas. -
registry = "aio-wg"corresponde à entrada do Registro definida em.cargo/config.toml.
As principais dependências explicadas:
-
wit-bindgen: gera associações Rust de definições WIT (Tipos de Interface WebAssembly), permitindo que seu código interaja com o runtime WASM. -
wasm_graph_sdk: SDK de Operações de IoT do Azure fornecendo macros de operador, como#[map_operator]e#[filter_operator], e APIs de host para registro em log, métricas e gerenciamento de estado. -
serde+serde_json: bibliotecas de processamento JSON para analisar e gerar cargas de dados.default-features = falseotimiza para restrições de tamanho do WASM. -
crate-type = ["cdylib"]: compila a biblioteca Rust como uma biblioteca dinâmica compatível com C, que é necessária para a geração do módulo WASM.
Criar um módulo simples
Crie um módulo simples que converte a temperatura de Celsius em Fahrenheit. Esse exemplo demonstra a estrutura básica e a lógica de processamento para implementações do Rust e do Python.
use serde_json::{json, Value};
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
fn fahrenheit_to_celsius_init(_configuration: ModuleConfiguration) -> bool {
logger::log(Level::Info, "temperature-converter", "Init invoked"); // one-time module init
true
}
#[map_operator(init = "fahrenheit_to_celsius_init")]
fn fahrenheit_to_celsius(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut result) = input else {
return Err(Error {
message: "Unexpected input type".to_string(),
});
};
let payload = &result.payload.read(); // payload bytes from inbound message
if let Ok(data_str) = std::str::from_utf8(payload) {
if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
if let Some(temp) = data["temperature"]["value"].as_f64() {
let fahrenheit = (temp * 9.0 / 5.0) + 32.0; // Celsius -> Fahrenheit
data["temperature"] = json!({
"value_fahrenheit": fahrenheit,
"original_celsius": temp
});
if let Ok(output_str) = serde_json::to_string(&data) {
// Replace payload with owned bytes so the host receives the updated JSON
result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
}
}
}
}
Ok(DataModel::Message(result))
}
Criar módulo
Escolha entre builds de desenvolvimento local ou builds conteinerizados com base em seus requisitos de ambiente e fluxo de trabalho de desenvolvimento.
Construção local
Crie diretamente em seu computador de desenvolvimento para iteração mais rápida durante o desenvolvimento e quando você precisar de controle total sobre o ambiente de build.
# Build WASM module
cargo build --release --target wasm32-wasip2 # target required for Azure IoT Operations WASM components
# Find your module
ls target/wasm32-wasip2/release/*.wasm
Build do Docker
Crie usando ambientes conteinerizados com todas as dependências e esquemas pré-configurados. Essas imagens do Docker fornecem builds consistentes em diferentes ambientes e são ideais para pipelines de CI/CD.
O repositório de exemplos de Operações IoT do Azure mantém o construtor do Rust Docker e inclui todas as dependências necessárias. Para obter uma documentação detalhada, veja Uso do Construtor do Docker para Rust.
# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter
# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug
Opções de build do Docker:
-
--app-name: deve corresponder ao seu nome de crate do Rust deCargo.toml -
--build-mode: escolharelease(padrão) para builds otimizados oudebugpara builds de desenvolvimento com símbolos
Mais exemplos
Para obter exemplos abrangentes, veja os exemplos do Rust no repositório de exemplos. As implementações completas incluem:
- Operadores de mapa: transformação de dados e lógica de conversão
- Operadores de filtro: processamento e validação de dados condicionais
- Operadores de ramificação: roteamento de vários caminhos com base no conteúdo de dados
- Operadores acumulados: agregação em janelas de tempo e processamento estatístico
- Operadores de atraso: controle de processamento baseado em tempo
Os exemplos demonstram implementações de trabalho que mostram a estrutura completa para cada tipo de operador, incluindo o tratamento de erro e padrões de log adequados.
Referência e APIs do SDK
O SDK do WASM Rust fornece ferramentas de desenvolvimento abrangentes:
Macros de operador
use wasm_graph_sdk::macros::{map_operator, filter_operator, branch_operator};
use wasm_graph_sdk::{DataModel, HybridLogicalClock};
// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> Result<DataModel, Error> {
// Transform logic here
}
// Filter operator - allows/rejects data based on predicate
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> Result<bool, Error> {
// Return true to pass data through, false to filter out
}
// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> Result<bool, Error> {
// Return true for "True" arm, false for "False" arm
}
Parâmetros de configuração do módulo
Seus operadores WASM podem receber parâmetros de configuração de runtime por meio do struct ModuleConfiguration passado para a função init. Você define esses parâmetros na definição de grafo, que permite personalizar o runtime sem recompilar módulos.
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::ModuleConfiguration;
fn my_operator_init(configuration: ModuleConfiguration) -> bool {
// Access required parameters
if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
}
// Access optional parameters with defaults
let unit = configuration.parameters
.get("output_unit")
.map(|s| s.as_str())
.unwrap_or("celsius");
logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
true
}
Para obter informações detalhadas sobre como definir os parâmetros de configuração em definições de grafo, veja Parâmetros de configuração do módulo.
APIs de Host
Use o SDK para trabalhar com serviços distribuídos:
Repositório de estado para dados persistentes:
use wasm_graph_sdk::state_store;
// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;
// Get value
let response = state_store::get(key.as_bytes(), None)?;
// Delete key
state_store::del(key.as_bytes(), None, None)?;
Registro em log estruturado:
use wasm_graph_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
Métricas compatíveis com OpenTelemetry:
use wasm_graph_sdk::metrics;
// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;
// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;
Inferência ONNX com WASM
Para inserir e executar pequenos modelos ONNX dentro de seus módulos para inferência em banda, consulte Executar inferência ONNX em grafos de fluxo de dados do WebAssembly. Esse artigo aborda modelos de empacotamento com módulos, habilitando o recurso wasi-nn em definições de grafo, e limitações.
WIT (Tipos de Interface WebAssembly)
Todos os operadores implementam interfaces padronizadas definidas usando WIT (Tipos de Interface WebAssembly). O WIT fornece definições de interface independente de linguagem que garantem a compatibilidade entre os módulos WASM e o runtime do host.
Você pode encontrar os esquemas WIT completos para operações de IoT do Azure no repositório de exemplos. Esses esquemas definem todas as interfaces, tipos e estruturas de dados com as quais você trabalha ao desenvolver módulos WASM.
Modelo e interfaces de dados
Todos os operadores WASM funcionam com modelos de dados padronizados definidos usando WIT (Tipos de Interface WebAssembly):
Modelo de dados principais
// Core timestamp structure using hybrid logical clock
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
node-id: buffer-or-string, // Logical node identifier
}
// Union type supporting multiple data formats
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // Structured messages with metadata
snapshot(snapshot), // Video/image frames with timestamps
}
// Structured message format
record message {
timestamp: timestamp,
content_type: buffer-or-string,
payload: message-payload,
}
Definições de interface WIT
Cada tipo de operador implementa uma interface WIT específica:
// Core operator interfaces
interface map {
use types.{data-model};
process: func(message: data-model) -> result<data-model, error>;
}
interface filter {
use types.{data-model};
process: func(message: data-model) -> result<bool, error>;
}
interface branch {
use types.{data-model, hybrid-logical-clock};
process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}
interface accumulate {
use types.{data-model};
process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}
Definições do grafo e integração de WASM
As definições de grafo mostram como os módulos WASM se conectam aos fluxos de trabalho de processamento. Elas especificam as operações, as conexões e os parâmetros que criam pipelines de processamento de dados completos.
Para obter informações abrangentes sobre como criar e configurar definições de grafo, incluindo exemplos detalhados de fluxos de trabalho simples e complexos, veja Configurar as definições de grafo WebAssembly para os grafos de fluxo de dados.
Principais tópicos abordados no guia de definições do grafo:
- Estrutura de definição do grafo: noções básicas sobre o esquema YAML e os componentes necessários
- Exemplo de grafo simples: pipeline básico de conversão de temperatura de três estágios
- Exemplo de grafo complexo: processamento de vários sensores com ramificação e agregação
- Parâmetros de configuração do módulo: personalização de runtime de operadores WASM
- Implantação do registro: empacotamento e armazenamento de definições de grafo como artefatos OCI
Próximas etapas
- Veja exemplos completos e padrões avançados no repositório de exemplos WASM das Operações do Azure IoT.
- Saiba como implantar seus módulos em Use WebAssembly com grafos de fluxo de dados.
- Configure seus pontos de extremidade de fluxo de dados em Configurar pontos de extremidade de fluxo de dados.