Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Este artigo mostra como desenvolver módulos WebAssembly (WASM) personalizados e definições de gráfico para gráficos de fluxo de dados do Azure IoT Operations. Crie módulos em Rust ou Python para implementar lógica de processamento personalizada. Defina configurações de gráficos que especifiquem como seus módulos se conectam a fluxos de trabalho de processamento completos.
Importante
Atualmente, os gráficos de fluxo de dados suportam apenas endpoints MQTT, Kafka e OpenTelemetry. Outros tipos de endpoints como Azure Data Lake, Microsoft Fabric OneLake, Azure Data Explorer e armazenamento local não são suportados. Para obter mais informações, consulte Problemas conhecidos.
Para aprender a desenvolver módulos WASM utilizando a extensão VS Code, veja Construir módulos WASM com extensão VS Code.
Para saber mais sobre grafos e WASM no Azure IoT Operations, veja:
- Use um grafo de fluxo de dados com módulos WebAssembly
- Transformar dados recebidos com módulos WebAssembly
Visão geral
Os gráficos de fluxo de dados das Operações IoT do Azure processam dados de streaming por meio de operadores configuráveis implementados como módulos WebAssembly. Cada operador processa dados com carimbo de data/hora, mantendo a ordenação temporal, permitindo análises em tempo real com resultados determinísticos.
Principais benefícios
- Processamento em tempo real: lide com dados de streaming com baixa latência consistente
- Semântica em tempo de evento: processe dados com base em quando os eventos ocorreram, não quando eles são processados
- Tolerância a falhas: Suporte integrado para lidar com falhas e garantir a consistência dos dados
- Escalabilidade: distribua o processamento entre vários nós enquanto mantém as garantias de pedidos
- Suporte multi-linguagem: Desenvolva em Rust ou Python com interfaces consistentes
Fundação da arquitetura
Os gráficos de fluxo de dados baseiam-se no modelo computacional de fluxo de dados oportuno , que se originou do projeto Naiad da Microsoft Research. Esta abordagem garante:
- Processamento determinístico: A mesma entrada produz sempre a mesma saída
- Acompanhamento do progresso: o sistema sabe quando os cálculos estão concluídos
- Coordenação distribuída: vários nós de processamento permanecem sincronizados
Por que usar fluxo de dados atempado?
Os sistemas tradicionais de processamento de fluxo têm vários desafios. Dados fora de ordem significam que os eventos podem chegar mais tarde do que o esperado. Resultados parciais tornam difícil saber quando os cálculos terminam. Problemas de coordenação acontecem ao sincronizar o processamento distribuído.
O fluxo de dados oportuno resolve esses problemas através de:
Carimbos de data/hora e acompanhamento do progresso
Cada item de dados carrega um carimbo de data/hora que representa sua hora lógica. O sistema rastreia o progresso através de carimbos de data/hora, permitindo vários recursos principais:
- Processamento determinístico: A mesma entrada produz sempre a mesma saída
- Semântica exatamente uma vez: Sem processamento duplicado ou perdido
- Marcas d'água: saiba quando não chegarão mais dados por um determinado tempo
Relógio lógico híbrido
O mecanismo de carimbo de data/hora usa uma abordagem híbrida:
pub struct HybridLogicalClock {
pub physical_time: u64, // Wall-clock time when event occurred
pub logical_time: u64, // Logical ordering for events at same physical time
}
A abordagem de relógio lógico híbrido garante vários recursos:
- Ordem causal: Os efeitos seguem as causas
- Garantias de progresso: O sistema sabe quando o processamento está concluído
- Coordenação distribuída: vários nós permanecem sincronizados
Compreender operadores e módulos
Compreender a distinção entre operadores e módulos é essencial para o desenvolvimento do WASM:
Operadores
Os operadores são as unidades de processamento fundamentais baseadas em operadores de fluxo de dados oportunos. Cada tipo de operador serve uma finalidade específica:
- Mapa: Transformar cada elemento de dados (como converter unidades de temperatura)
- Filtro: Permitir que apenas certos itens de dados passem com base em condições (como remover leituras inválidas)
- Ramificação: Encaminhar dados para diferentes caminhos com base nas condições (como separar dados de temperatura e humidade)
- Acumular: Recolher e agregar dados dentro de janelas temporais (como calcular resumos estatísticos)
- Concatenar: Fundir múltiplos fluxos de dados preservando a ordem temporal
- Atraso: Controlar a temporização avançando os carimbos temporais
Modules
Os módulos são a implementação da lógica do operador como código WASM. Um único módulo pode implementar vários tipos de operadores. Por exemplo, um módulo de temperatura pode fornecer:
- Um operador de mapa para conversão de unidades
- Um operador de filtro para verificação de limite
- Um operador de filial para decisões de roteamento
- Um operador de acumulação para agregação estatística
A relação
A relação entre definições de gráficos, módulos e operadores segue um padrão específico:
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
Esta separação permite:
- Reutilização do módulo: implante o mesmo módulo WASM em diferentes configurações de gráficos
- Controle de versão independente: atualize as definições de gráfico sem reconstruir módulos
- Configuração dinâmica: passe parâmetros diferentes para o mesmo módulo para comportamentos diferentes
Pré-requisitos
Escolha sua linguagem de desenvolvimento e configure as ferramentas necessárias:
A cadeia de ferramentas Rust fornece
cargo,rustc, e a biblioteca padrão necessária para compilar operadores. Instale com:curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -yÉ necessário o alvo WASM
wasm32-wasip2para construir os componentes WASM do Azure IoT Operations. Adicione com:rustup target add wasm32-wasip2As ferramentas de build fornecem utilitários usados pelos construtores e pela CI para validar e empacotar artefactos WASM. Instale com:
cargo install wasm-tools --version '=1.201.0' --locked
Configurar o ambiente de desenvolvimento
Pode aceder ao WASM Rust SDK através de um registo personalizado Microsoft Azure DevOps. Em vez de usar variáveis de ambiente, configure o acesso com um ficheiro de configuração de workspace:
# .cargo/config.toml (at your workspace root)
[registries]
aio-wg = { index = "sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/" }
[net]
git-fetch-with-cli = true
Esta configuração reflete o layout do exemplo em samples/wasm/.cargo/config.toml e mantém as configurações do registo no controlo de versões.
Criar o projeto
Comece criando um novo diretório de projeto para o módulo do operador. A estrutura do projeto depende do idioma escolhido.
cargo new --lib temperature-converter
cd temperature-converter
Configurar o Cargo.toml
Edite o Cargo.toml arquivo para incluir dependências para o SDK do WASM e outras bibliotecas:
[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"
[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"
# Azure IoT Operations WASM SDK - provides operator macros and host APIs
wasm_graph_sdk = { version = "=1.1.3", registry = "aio-wg" }
# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }
[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]
Notas sobre versões e registo:
- A versão SDK (
=1.1.3) está alinhada com as amostras atuais; mantê-la fixada evita alterações incompatíveis. -
registry = "aio-wg"corresponde à entrada do registo definida em.cargo/config.toml.
Principais dependências explicadas:
-
wit-bindgen: Gera bindings Rust a partir das definições de Tipos de Interface WebAssembly (WIT), permitindo que o seu código interfaça com o runtime WASM. -
wasm_graph_sdk: Azure IoT Operations SDK que fornece macros de operador, como#[map_operator]e#[filter_operator], e APIs de host para registo, métricas e gestão de estados. -
serde+serde_json: Bibliotecas de processamento JSON para análise e geração de cargas úteis de dados.default-features = falseotimiza para as restrições de tamanho do WASM. -
crate-type = ["cdylib"]: Compila a biblioteca Rust como uma biblioteca dinâmica compatível com C, necessária para a geração de módulos WASM.
Criar um módulo simples
Crie um módulo simples que converta a temperatura de Celsius para Fahrenheit. Este exemplo demonstra a estrutura básica e a lógica de processamento para implementações Rust e Python.
use serde_json::{json, Value};
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
fn fahrenheit_to_celsius_init(_configuration: ModuleConfiguration) -> bool {
logger::log(Level::Info, "temperature-converter", "Init invoked"); // one-time module init
true
}
#[map_operator(init = "fahrenheit_to_celsius_init")]
fn fahrenheit_to_celsius(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut result) = input else {
return Err(Error {
message: "Unexpected input type".to_string(),
});
};
let payload = &result.payload.read(); // payload bytes from inbound message
if let Ok(data_str) = std::str::from_utf8(payload) {
if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
if let Some(temp) = data["temperature"]["value"].as_f64() {
let fahrenheit = (temp * 9.0 / 5.0) + 32.0; // Celsius -> Fahrenheit
data["temperature"] = json!({
"value_fahrenheit": fahrenheit,
"original_celsius": temp
});
if let Ok(output_str) = serde_json::to_string(&data) {
// Replace payload with owned bytes so the host receives the updated JSON
result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
}
}
}
}
Ok(DataModel::Message(result))
}
Módulo de construção
Escolha entre compilações de desenvolvimento local ou compilações em contêineres com base em seu fluxo de trabalho de desenvolvimento e requisitos de ambiente.
Construção local
Construa diretamente em sua máquina de desenvolvimento para iteração mais rápida durante o desenvolvimento e quando precisar de controle total sobre o ambiente de compilação.
# Build WASM module
cargo build --release --target wasm32-wasip2 # target required for Azure IoT Operations WASM components
# Find your module
ls target/wasm32-wasip2/release/*.wasm
Compilação do Docker
Crie usando ambientes em contêineres com todas as dependências e esquemas pré-configurados. Essas imagens do Docker fornecem compilações consistentes em diferentes ambientes e são ideais para pipelines de CI/CD.
O repositório de exemplos Azure IoT Operations mantém o construtor Rust Docker e inclui todas as dependências necessárias. Para obter documentação detalhada, consulte Rust Docker builder usage.
# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter
# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug
Opções de compilação do Docker:
-
--app-name: Deve corresponder ao nome da sua caixa Rust deCargo.toml -
--build-mode: Escolharelease(padrão) para compilações otimizadas oudebugpara compilações de desenvolvimento com símbolos
Mais exemplos
Para obter exemplos abrangentes, consulte os exemplos de Rust no repositório de amostras. As implementações completas incluem:
- Operadores de mapas: Lógica de transformação e conversão de dados
- Operadores de filtro: processamento e validação condicional de dados
- Operadores de filial: roteamento de vários caminhos com base no conteúdo de dados
- Acumular operadores: Agregação com janela de tempo e processamento estatístico
- Operadores de atraso: controle de processamento baseado no tempo
Os exemplos demonstram implementações de trabalho que mostram a estrutura completa para cada tipo de operador, incluindo o tratamento adequado de erros e padrões de registro.
Referência do SDK e APIs
O WASM Rust SDK fornece ferramentas de desenvolvimento abrangentes:
Macros do operador
use wasm_graph_sdk::macros::{map_operator, filter_operator, branch_operator};
use wasm_graph_sdk::{DataModel, HybridLogicalClock};
// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> Result<DataModel, Error> {
// Transform logic here
}
// Filter operator - allows/rejects data based on predicate
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> Result<bool, Error> {
// Return true to pass data through, false to filter out
}
// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> Result<bool, Error> {
// Return true for "True" arm, false for "False" arm
}
Parâmetros de configuração do módulo
Seus operadores WASM podem receber parâmetros de configuração de tempo de execução através da ModuleConfiguration struct passada para a init função. Define-se estes parâmetros na definição do grafo, que permite personalizar o tempo de execução sem ter de reconstruir módulos.
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::ModuleConfiguration;
fn my_operator_init(configuration: ModuleConfiguration) -> bool {
// Access required parameters
if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
}
// Access optional parameters with defaults
let unit = configuration.parameters
.get("output_unit")
.map(|s| s.as_str())
.unwrap_or("celsius");
logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
true
}
Para obter informações detalhadas sobre como definir parâmetros de configuração em definições de gráficos, consulte Parâmetros de configuração do módulo.
APIs de Hospedagem
Use o SDK para trabalhar com serviços distribuídos:
Armazenamento de estado para dados persistentes:
use wasm_graph_sdk::state_store;
// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;
// Get value
let response = state_store::get(key.as_bytes(), None)?;
// Delete key
state_store::del(key.as_bytes(), None, None)?;
Registo estruturado:
use wasm_graph_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
Métricas compatíveis com OpenTelemetry:
use wasm_graph_sdk::metrics;
// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;
// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;
Inferência ONNX com WASM
Para incorporar e executar pequenos modelos ONNX dentro de seus módulos para inferência em banda, consulte Executar inferência ONNX em gráficos de fluxo de dados WebAssembly. Esse artigo abrange modelos de pacotes com módulos, permitindo a funcionalidade wasi-nn nos gráficos definidos, bem como as limitações associadas.
Tipos de interface WebAssembly (WIT)
Todos os operadores implementam interfaces padronizadas definidas usando WebAssembly Interface Types (WIT). O WIT fornece definições de interface agnósticas de linguagem que garantem a compatibilidade entre os módulos WASM e o tempo de execução do host.
Pode encontrar os esquemas completos WIT para Azure IoT Operations no repositório de exemplos. Estes esquemas definem todas as interfaces, tipos e estruturas de dados com que trabalha ao desenvolver módulos WASM.
Modelo de dados e interfaces
Todos os operadores WASM trabalham com modelos de dados padronizados definidos usando WebAssembly Interface Types (WIT):
Modelo de dados principal
// Core timestamp structure using hybrid logical clock
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
node-id: buffer-or-string, // Logical node identifier
}
// Union type supporting multiple data formats
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // Structured messages with metadata
snapshot(snapshot), // Video/image frames with timestamps
}
// Structured message format
record message {
timestamp: timestamp,
content_type: buffer-or-string,
payload: message-payload,
}
Definições de interface WIT
Cada tipo de operador implementa uma interface WIT específica:
// Core operator interfaces
interface map {
use types.{data-model};
process: func(message: data-model) -> result<data-model, error>;
}
interface filter {
use types.{data-model};
process: func(message: data-model) -> result<bool, error>;
}
interface branch {
use types.{data-model, hybrid-logical-clock};
process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}
interface accumulate {
use types.{data-model};
process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}
Definições de gráficos e integração WASM
As definições de grafos mostram como os seus módulos WASM se ligam aos fluxos de trabalho de processamento. Eles especificam as operações, conexões e parâmetros que criam pipelines completos de processamento de dados.
Para obter informações abrangentes sobre como criar e configurar definições de gráficos, incluindo exemplos detalhados de fluxos de trabalho simples e complexos, consulte Configurar definições de gráfico WebAssembly para gráficos de fluxo de dados.
Principais tópicos abordados no guia de definições de gráficos:
- Estrutura de definição de gráfico: Entendendo o esquema YAML e os componentes necessários
- Exemplo de gráfico simples: pipeline básico de conversão de temperatura de três estágios
- Exemplo de gráfico complexo: processamento de vários sensores com ramificação e agregação
- Parâmetros de configuração do módulo: Personalização do tempo de execução dos operadores WASM
- Implantação do Registro: empacotando e armazenando definições de gráficos como artefatos OCI
Próximos passos
- Veja exemplos completos e padrões avançados no repositório de exemplos WASM do Azure IoT Operations .
- Saiba como implantar seus módulos em Usar WebAssembly com gráficos de fluxo de dados.
- Configure seus pontos de extremidade de fluxo de dados em Configurar pontos de extremidade de fluxo de dados.