Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym artykule pokazano, jak opracowywać niestandardowe moduły webAssembly (WASM) i definicje grafów dla wykresów przepływu danych operacji usługi Azure IoT. Tworzenie modułów w języku Rust lub Python w celu zaimplementowania logiki przetwarzania niestandardowego. Zdefiniuj konfiguracje grafu, które określają sposób łączenia modułów z kompletnymi przepływami pracy przetwarzania.
Ważne
Wykresy przepływu danych obsługują obecnie tylko punkty końcowe MQTT, Kafka i OpenTelemetry. Inne typy punktów końcowych, takie jak Azure Data Lake, Microsoft Fabric OneLake, Azure Data Explorer i magazyn lokalny, nie są obsługiwane. Aby uzyskać więcej informacji, zobacz Znane problemy.
Aby dowiedzieć się, jak opracowywać moduły WASM przy użyciu rozszerzenia programu VS Code, zobacz Build WASM modules with VS Code extension (Kompilowanie modułów WASM z rozszerzeniem programu VS Code).
Aby dowiedzieć się więcej na temat grafów i funkcji WASM w operacjach usługi Azure IoT, zobacz:
- Używanie grafu przepływu danych z modułami WebAssembly
- Przekształcanie danych przychodzących za pomocą modułów WebAssembly
Przegląd
Wykresy przepływu danych operacji usługi Azure IoT przetwarzają dane przesyłane strumieniowo za pomocą konfigurowalnych operatorów zaimplementowanych jako moduły zestawu WebAssembly. Każdy operator przetwarza dane ze znacznikami czasu przy zachowaniu kolejności czasowej, umożliwiając analizę w czasie rzeczywistym przy użyciu wyników deterministycznych.
Najważniejsze korzyści
- Przetwarzanie w czasie rzeczywistym: Obsługa danych przesyłanych strumieniowo z spójnym małym opóźnieniem
- Semantyka czasu zdarzenia: przetwarzanie danych na podstawie czasu wystąpienia zdarzeń, a nie podczas ich przetwarzania
- Odporność na uszkodzenia: wbudowana obsługa błędów i zapewnianie spójności danych
- Skalowalność: dystrybuuj przetwarzanie między wieloma węzłami przy zachowaniu gwarancji zamówień
- Obsługa wielu języków: programowanie w języku Rust lub Python przy użyciu spójnych interfejsów
Podstawy architektury
Wykresy przepływu danych opierają się na modelu obliczeniowym Przepływ danych czasowy , który pochodzi z projektu Naiad firmy Microsoft Research. Takie podejście zapewnia:
- Przetwarzanie deterministyczne: te same dane wejściowe zawsze generują te same dane wyjściowe
- Śledzenie postępu: system wie, kiedy obliczenia są ukończone
- Koordynacja rozproszona: synchronizacja wielu węzłów przetwarzania
Dlaczego warto używać terminowego przepływu danych?
Tradycyjne systemy przetwarzania strumieniowego mają kilka wyzwań. Dane poza kolejnością oznaczają, że zdarzenia mogą pojawić się później niż oczekiwano. Częściowe wyniki sprawiają, że trudno jest wiedzieć, kiedy obliczenia zakończą się. Problemy z koordynacją występują podczas synchronizowania rozproszonego przetwarzania.
Terminowy przepływ danych rozwiązuje te problemy, wykonując następujące czynności:
Znaczniki czasu i śledzenie postępu
Każdy element danych zawiera znacznik czasu reprezentujący jego czas logiczny. System śledzi postęp przez znaczniki czasu, włączając kilka kluczowych możliwości:
- Przetwarzanie deterministyczne: te same dane wejściowe zawsze generują te same dane wyjściowe
- Dokładnie raz semantyka: brak zduplikowanego lub pominiętego przetwarzania
- Znaki wodne: Dowiedz się, kiedy nie będą już dostarczane żadne dane przez dany czas
Zegar logiczny hybrydowy
Mechanizm znacznika czasu używa podejścia hybrydowego:
pub struct HybridLogicalClock {
pub physical_time: u64, // Wall-clock time when event occurred
pub logical_time: u64, // Logical ordering for events at same physical time
}
Hybrydowe podejście zegara logicznego zapewnia kilka możliwości:
- Kolejność przyczynowa: efekty są zgodne z przyczynami
- Gwarancje postępu: system wie, kiedy przetwarzanie zostało zakończone
- Koordynacja rozproszona: Synchronizacja wielu węzłów jest zsynchronizowana
Omówienie operatorów i modułów
Zrozumienie rozróżnienia między operatorami i modułami jest niezbędne w przypadku opracowywania programu WASM:
Operatorów
Operatory są podstawowymi jednostkami przetwarzania opartymi na operatorach przepływu danych timely. Każdy typ operatora służy do określonego celu:
- Mapa: Przekształcanie każdego elementu danych (na przykład konwertowanie jednostek temperatury)
- Filtr: zezwalaj na przekazywanie tylko niektórych elementów danych na podstawie warunków (takich jak usuwanie nieprawidłowych odczytów)
- Gałąź: przekierowywanie danych do różnych ścieżek na podstawie warunków (takich jak oddzielanie danych temperatury i wilgotności)
- Gromadzenie: zbieranie i agregowanie danych w oknach czasowych (takich jak obliczenia podsumowań statystycznych)
- Łączenie: scalanie wielu strumieni danych przy zachowaniu kolejności czasowej
- Opóźnienie: Kontrola czasu przez przesuwanie znaczników czasu
Modules
Moduły to implementacja logiki operatora jako kod WASM. Pojedynczy moduł może implementować wiele typów operatorów. Na przykład moduł temperatury może zapewnić następujące elementy:
- Operator mapy na potrzeby konwersji jednostek
- Operator filtru na potrzeby sprawdzania progów
- Operator gałęzi do podejmowania decyzji dotyczących routingu
- Operator skumulowany dla agregacji statystycznej
Relacja
Relacja między definicjami, modułami i operatorami grafu jest zgodna z określonym wzorcem:
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
To rozdzielenie umożliwia:
- Ponowne użycie modułu: wdrażanie tego samego modułu WASM w różnych konfiguracjach grafu
- Niezależne przechowywanie wersji: aktualizowanie definicji grafu bez ponownego kompilowania modułów
- Konfiguracja dynamiczna: przekazywanie różnych parametrów do tego samego modułu w przypadku różnych zachowań
Wymagania wstępne
Wybierz język programowania i skonfiguruj wymagane narzędzia:
Łańcuch narzędzi Rust udostępnia
cargo,rustcoraz bibliotekę standardową wymaganą do kompilowania operatorów. Zainstaluj przy użyciu:curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -yElement docelowy WASM
wasm32-wasip2wymagany do kompilowania składników WASM operacji usługi Azure IoT. Dodaj za pomocą:rustup target add wasm32-wasip2Narzędzia kompilacji udostępniają funkcjonalności używane przez buildery i systemy CI do weryfikacji i pakietowania artefaktów WASM. Zainstaluj przy użyciu:
cargo install wasm-tools --version '=1.201.0' --locked
Konfigurowanie środowiska programowania
Dostęp do zestawu SDK WASM Rust można uzyskać za pomocą niestandardowego rejestru DevOps platformy Microsoft Azure. Zamiast używać zmiennych środowiskowych, skonfiguruj dostęp za pomocą pliku konfiguracji obszaru roboczego:
# .cargo/config.toml (at your workspace root)
[registries]
aio-wg = { index = "sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/" }
[net]
git-fetch-with-cli = true
Ta konfiguracja odzwierciedla przykładowy układ pod adresem samples/wasm/.cargo/config.toml i przechowuje ustawienia rejestru w kontroli wersji.
Tworzenie projektu
Zacznij od utworzenia nowego katalogu projektu dla modułu operatora. Struktura projektu zależy od wybranego języka.
cargo new --lib temperature-converter
cd temperature-converter
Konfigurowanie pliku Cargo.toml
Edytuj plik, Cargo.toml aby uwzględnić zależności dla zestawu SDK WASM i innych bibliotek:
[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"
[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"
# Azure IoT Operations WASM SDK - provides operator macros and host APIs
wasm_graph_sdk = { version = "=1.1.3", registry = "aio-wg" }
# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }
[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]
Uwagi dotyczące wersji i rejestru:
- Wersja zestawu SDK (
=1.1.3) jest zgodna z aktualnymi przykładami; utrzymywanie jej w stałej wersji pozwala uniknąć zmian powodujących niezgodność. -
registry = "aio-wg"pasuje do wpisu rejestru zdefiniowanego w pliku.cargo/config.toml.
Objaśniono kluczowe zależności:
-
wit-bindgen: Generuje wiązania Rust z definicji WebAssembly Interface Types (WIT), umożliwiając kodowi interakcję ze środowiskiem uruchomieniowym WASM. -
wasm_graph_sdk: Zestaw SDK operacji usługi Azure IoT udostępnia makra operatorów, takie jak#[map_operator]i#[filter_operator], oraz interfejsy API hosta na potrzeby rejestrowania, metryk i zarządzania stanem. -
serde+serde_json: biblioteki przetwarzania JSON do analizowania i generowania ładunków danych.default-features = falseoptymalizuje pod kątem ograniczeń rozmiaru WASM. -
crate-type = ["cdylib"]: kompiluje bibliotekę Rust jako bibliotekę dynamiczną zgodną z językiem C, która jest wymagana do generowania modułu WASM.
Tworzenie prostego modułu
Utwórz prosty moduł, który konwertuje temperaturę z stopni Celsjusza na Fahrenheit. W tym przykładzie przedstawiono podstawową strukturę i logikę przetwarzania dla implementacji języka Rust i Python.
use serde_json::{json, Value};
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
fn fahrenheit_to_celsius_init(_configuration: ModuleConfiguration) -> bool {
logger::log(Level::Info, "temperature-converter", "Init invoked"); // one-time module init
true
}
#[map_operator(init = "fahrenheit_to_celsius_init")]
fn fahrenheit_to_celsius(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut result) = input else {
return Err(Error {
message: "Unexpected input type".to_string(),
});
};
let payload = &result.payload.read(); // payload bytes from inbound message
if let Ok(data_str) = std::str::from_utf8(payload) {
if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
if let Some(temp) = data["temperature"]["value"].as_f64() {
let fahrenheit = (temp * 9.0 / 5.0) + 32.0; // Celsius -> Fahrenheit
data["temperature"] = json!({
"value_fahrenheit": fahrenheit,
"original_celsius": temp
});
if let Ok(output_str) = serde_json::to_string(&data) {
// Replace payload with owned bytes so the host receives the updated JSON
result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
}
}
}
}
Ok(DataModel::Message(result))
}
Moduł kompilacji
Wybierz między lokalnymi kompilacjami deweloperów lub konteneryzowanymi kompilacjami na podstawie wymagań dotyczących przepływu pracy programowania i środowiska.
Kompilacja lokalna
Kompiluj bezpośrednio na maszynie deweloperów w celu uzyskania najszybszej iteracji podczas programowania i gdy potrzebujesz pełnej kontroli nad środowiskiem kompilacji.
# Build WASM module
cargo build --release --target wasm32-wasip2 # target required for Azure IoT Operations WASM components
# Find your module
ls target/wasm32-wasip2/release/*.wasm
Kompilacja platformy Docker
Kompiluj przy użyciu środowisk konteneryzowanych ze wszystkimi zależnościami i wstępnie skonfigurowanymi schematami. Te obrazy platformy Docker zapewniają spójne kompilacje w różnych środowiskach i są idealne dla potoków ciągłej integracji/ciągłego wdrażania.
Repozytorium przykładów operacji usługi Azure IoT obsługuje konstruktora Rust Docker i zawiera wszystkie niezbędne zależności. Aby uzyskać szczegółową dokumentację, zobacz Rust Docker builder usage (Użycie narzędzia Rust Docker Builder).
# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter
# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug
Opcje kompilacji platformy Docker:
-
--app-name: Musi być zgodna z nazwą skrzyni rdzy zCargo.toml -
--build-mode: wybierzrelease(ustawienie domyślne) dla zoptymalizowanych kompilacji lubdebugkompilacji deweloperskich z symbolami
Więcej przykładów
Aby zapoznać się z kompleksowymi przykładami, zobacz przykłady rust w repozytorium przykładów. Kompletne implementacje obejmują:
- Operatory mapy: przekształcanie i logika konwersji danych
- Operatory filtrów: warunkowe przetwarzanie i walidacja danych
- Operatory gałęzi: routing wielościeżkowy na podstawie zawartości danych
- Operatory kumulowania: agregacja okien czasowych i przetwarzanie statystyczne
- Operatory opóźnień: kontrolka przetwarzania na podstawie czasu
W przykładach pokazano działające implementacje, które pokazują pełną strukturę dla każdego typu operatora, w tym odpowiednie wzorce obsługi błędów i rejestrowania.
Dokumentacja zestawu SDK i interfejsy API
Zestaw SDK WASM Rust udostępnia kompleksowe narzędzia programistyczne:
Makra operatorów
use wasm_graph_sdk::macros::{map_operator, filter_operator, branch_operator};
use wasm_graph_sdk::{DataModel, HybridLogicalClock};
// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> Result<DataModel, Error> {
// Transform logic here
}
// Filter operator - allows/rejects data based on predicate
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> Result<bool, Error> {
// Return true to pass data through, false to filter out
}
// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> Result<bool, Error> {
// Return true for "True" arm, false for "False" arm
}
Parametry konfiguracji modułu
Operatorzy WASM mogą odbierać parametry konfiguracji środowiska uruchomieniowego za pośrednictwem ModuleConfiguration struktury przekazanej init do funkcji. Te parametry definiuje się w definicji grafu, co umożliwia dostosowanie środowiska uruchomieniowego bez ponownego kompilowania modułów.
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::ModuleConfiguration;
fn my_operator_init(configuration: ModuleConfiguration) -> bool {
// Access required parameters
if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
}
// Access optional parameters with defaults
let unit = configuration.parameters
.get("output_unit")
.map(|s| s.as_str())
.unwrap_or("celsius");
logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
true
}
Aby uzyskać szczegółowe informacje na temat definiowania parametrów konfiguracji w definicjach grafu, zobacz Parametry konfiguracji modułu.
Interfejsy API hosta
Użyj zestawu SDK do pracy z usługami rozproszonymi:
Magazyn stanów dla danych trwałych:
use wasm_graph_sdk::state_store;
// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;
// Get value
let response = state_store::get(key.as_bytes(), None)?;
// Delete key
state_store::del(key.as_bytes(), None, None)?;
Rejestrowanie strukturalne:
use wasm_graph_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
Metryki zgodne z technologią OpenTelemetry:
use wasm_graph_sdk::metrics;
// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;
// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;
Wnioskowanie ONNX za pomocą WASM
Aby osadzić i uruchamiać małe modele ONNX wewnątrz modułów na potrzeby wnioskowania wewnątrz pasmowego, zobacz Run ONNX inference in WebAssembly data flow graphs (Uruchamianie wnioskowania ONNX w wykresach przepływu danych zestawu WebAssembly). W tym artykule opisano tworzenie pakietów modeli z modułami, włączanie funkcji wasi-nn w definicjach grafów i ograniczenia.
Typy interfejsów zestawu WebAssembly (WIT)
Wszystkie operatory implementują standardowe interfejsy zdefiniowane przy użyciu typów interfejsów zestawu WebAssembly (WIT). Funkcja WIT udostępnia niezależne od języka definicje interfejsów, które zapewniają zgodność między modułami WASM a środowiskiem uruchomieniowym hosta.
Pełne schematy funkcji WIT dla operacji usługi Azure IoT można znaleźć w repozytorium przykładów. Te schematy definiują wszystkie interfejsy, typy i struktury danych, z których pracujesz podczas tworzenia modułów WASM.
Model danych i interfejsy
Wszystkie operatory WASM współpracują ze standardowymi modelami danych zdefiniowanymi przy użyciu typów interfejsów zestawu WebAssembly (WIT):
Podstawowy model danych
// Core timestamp structure using hybrid logical clock
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
node-id: buffer-or-string, // Logical node identifier
}
// Union type supporting multiple data formats
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // Structured messages with metadata
snapshot(snapshot), // Video/image frames with timestamps
}
// Structured message format
record message {
timestamp: timestamp,
content_type: buffer-or-string,
payload: message-payload,
}
Definicje interfejsu WIT
Każdy typ operatora implementuje określony interfejs funkcji WIT:
// Core operator interfaces
interface map {
use types.{data-model};
process: func(message: data-model) -> result<data-model, error>;
}
interface filter {
use types.{data-model};
process: func(message: data-model) -> result<bool, error>;
}
interface branch {
use types.{data-model, hybrid-logical-clock};
process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}
interface accumulate {
use types.{data-model};
process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}
Definicje grafu i integracja z usługą WASM
Definicje wykresów pokazują, jak moduły WASM łączą się z przepływami pracy przetwarzania. Określają operacje, połączenia i parametry, które tworzą kompletne potoki przetwarzania danych.
Aby uzyskać kompleksowe informacje na temat tworzenia i konfigurowania definicji grafów, w tym szczegółowych przykładów prostych i złożonych przepływów pracy, zobacz Konfigurowanie definicji grafu zestawu WebAssembly dla grafów przepływu danych.
Najważniejsze tematy omówione w przewodniku po definicjach grafu:
- Struktura definicji grafu: opis schematu YAML i wymaganych składników
- Prosty przykład wykresu: Podstawowy potok konwersji temperatury trzyetapowej
- Przykład złożonych grafów: Przetwarzanie wielu czujników z rozgałęzianiem i agregacją
- Parametry konfiguracji modułu: dostosowywanie środowiska uruchomieniowego operatorów WASM
- Wdrażanie rejestru: pakowanie i przechowywanie definicji grafu jako artefaktów OCI
Dalsze kroki
- Zobacz pełne przykłady i zaawansowane wzorce w repozytorium przykładów WASM operacji usługi Azure IoT .
- Dowiedz się, jak wdrożyć moduły w artykule Używanie zestawu WebAssembly z grafami przepływu danych.
- Skonfiguruj punkty końcowe przepływu danych w temacie Konfigurowanie punktów końcowych przepływu danych.