Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Artikel erfahren Sie, wie Sie benutzerdefinierte WebAssembly-Module (WASM) und Diagrammdefinitionen für Azure IoT Einsatz-Datenflussdiagramme entwickeln. Erstellen Sie Module in Rust oder Python, um benutzerdefinierte Verarbeitungslogik zu implementieren. Definieren Sie Diagrammkonfigurationen, die angeben, wie Ihre Module eine Verbindung mit vollständigen Verarbeitungsworkflows herstellen.
Von Bedeutung
Datenflussdiagramme unterstützen zur Zeit nur MQTT-, Kafka- und OpenTelemetry-Endpunkte. Andere Endpunkttypen wie Azure Data Lake, Microsoft Fabric OneLake, Azure Data Explorer und lokaler Speicher werden nicht unterstützt. Weitere Informationen finden Sie unter Bekannte Probleme.
Informationen zum Entwickeln von WASM-Modulen mithilfe der VS Code-Erweiterung finden Sie unter Build WASM-Module mit VS Code-Erweiterung.
Weitere Informationen zu Graphen und WASM in Azure IoT Operations finden Sie unter:
- Verwenden eines Datenflussdiagramms mit WebAssembly-Modulen
- Transformieren eingehender Daten mit WebAssembly-Modulen
Überblick
Azure IoT Einsatz-Datenflussdiagramme verarbeiten Streamingdaten über konfigurierbare Operatoren, die als WebAssembly-Module implementiert werden. Jeder Operator verarbeitet mit Zeitstempel versehene Daten und behält gleichzeitig die zeitliche Anordnung bei. Dadurch werden Echtzeitanalysen mit deterministischen Ergebnissen ermöglicht.
Hauptvorteile
- Echtzeitverarbeitung: Verarbeiten von Streamingdaten mit konsistenter niedriger Latenz
- Ereigniszeitsemantik: Verarbeiten von Daten basierend darauf, wann Ereignisse aufgetreten sind, nicht basierend auf ihrem Verarbeitungszeitpunkt
- Fehlertoleranz: Integrierte Unterstützung für die Behandlung von Fehlern und Sicherstellung der Datenkonsistenz
- Skalierbarkeit: Verteilen der Verarbeitung über mehrere Knoten hinweg bei gleichzeitiger Aufrechterhaltung von Auftragsgarantien
- Unterstützung für mehrere Sprachen: Entwickeln in Rust oder Python mit konsistenten Schnittstellen
Architekturumgebung
Datenflussdiagramme basieren auf dem Berechnungsmodell Timely Dataflow, das aus dem Naiad-Projekt von Microsoft Research stammt. Dieser Ansatz stellt Folgendes sicher:
- Deterministische Verarbeitung: Die gleiche Eingabe erzeugt immer die gleiche Ausgabe.
- Statusnachverfolgung: Das System weiß, wann Berechnungen abgeschlossen sind.
- Verteilte Koordination: Mehrere Verarbeitungsknoten bleiben synchronisiert.
Warum timely dataflow verwenden?
Bei herkömmlichen Datenstromverarbeitungssysteme sind verschiedene Herausforderungen zu bewältigen. Unsortierte Daten bedeuten, dass Ereignisse später als erwartet eingehen können. Teilergebnisse machen es schwierig zu ermitteln, wann Berechnungen abgeschlossen sind. Koordinationsprobleme treten beim Synchronisieren der verteilten Verarbeitung auf.
Timely Dataflow löst diese Probleme durch:
Zeitstempel und Statusnachverfolgung
Jedes Datenelement hat einen Zeitstempel, der seine logische Zeit darstellt. Das System verfolgt den Status durch Zeitstempel nach und ermöglicht mehrere wichtige Funktionen:
- Deterministische Verarbeitung: Die gleiche Eingabe erzeugt immer die gleiche Ausgabe.
- Exactly-Once-Semantik: Keine duplizierte oder fehlende Verarbeitung
- Wasserzeichen: Wissen, wann keine weiteren Daten für einen bestimmten Zeitraum mehr eingehen
Hybride logische Uhr
Der Zeitstempelmechanismus nutzt einen hybriden Ansatz:
pub struct HybridLogicalClock {
pub physical_time: u64, // Wall-clock time when event occurred
pub logical_time: u64, // Logical ordering for events at same physical time
}
Der Ansatz für die hybride logische Uhr stellt mehrere Funktionen sicher:
- Kausale Reihenfolge: Effekte folgen Ursachen.
- Statusgarantien: Das System weiß, wann die Verarbeitung abgeschlossen ist.
- Verteilte Koordination: Mehrere Knoten bleiben synchronisiert.
Grundlegendes zu Operatoren und Modulen
Es ist wichtig, die Unterschiede zwischen Operatoren und Modulen zu kennen, da dies für die WASM-Entwicklung von wesentlicher Bedeutung ist:
Betriebspersonal
Operatoren sind die grundlegenden Verarbeitungseinheiten, die auf Timely Dataflow-Operatoren basieren. Jeder Operatortyp dient einem bestimmten Zweck:
- Karte: Transformieren jedes Datenelements (z. B. Konvertieren von Temperatureinheiten)
- Filter: Zulassen, dass nur bestimmte Datenelemente basierend auf Bedingungen durchlaufen werden (z. B. das Entfernen ungültiger Lesewerte)
- Branch: Routen Sie Daten auf der Grundlage von Bedingungen auf verschiedene Pfade (wie z. B. die Trennung von Temperatur- und Luftfeuchtigkeitsdaten)
- Akkumulation: Sammeln und Aggregieren von Daten innerhalb von Zeitfenstern (z. B. Berechnen statistischer Zusammenfassungen)
- Verketten: Zusammenführen mehrerer Datenströme bei gleichzeitiger Beibehaltung der zeitlichen Reihenfolge
- Verzögerung: Steuerung des Timings durch Vorverschiebung von Zeitstempeln
Module
Module sind die Implementierung der Operatorlogik als WASM-Code. Ein einzelnes Modul kann mehrere Operatortypen implementieren. Ein Temperaturmodul kann z. B. Folgendes bereitstellen:
- map-Operator für die Einheitenkonvertierung
- filter-Operator für die Schwellenwertüberprüfung
- branch-Operator für Routingentscheidungen
- accumulate-Operator für statistische Aggregation
Die Beziehung
Die Beziehung zwischen Diagrammdefinitionen, Modulen und Operatoren folgt einem bestimmten Muster:
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
Diese Trennung ermöglicht Folgendes:
- Modulwiederverwendung: Bereitstellen des gleichen WASM-Moduls in verschiedenen Diagrammkonfigurationen
- Unabhängige Versionsverwaltung: Aktualisieren von Diagrammdefinitionen ohne Neuerstellung von Modulen
- Dynamische Konfiguration: Übergeben verschiedener Parameter an dasselbe Modul für unterschiedliche Verhaltensweisen
Voraussetzungen
Wählen Sie Ihre Entwicklungssprache aus, und richten Sie die erforderlichen Tools ein:
Rust-Toolkette bietet
cargo,rustcund die Standardbibliothek, die zum Kompilieren von Operatoren benötigt wird. Installieren mit:curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -yWASM
wasm32-wasip2Ziel, das zum Erstellen von Azure IoT Operations WASM-Komponenten erforderlich ist. Hinzufügen durch:rustup target add wasm32-wasip2Build-Tools stellen Dienstprogramme bereit, die von den Buildern und der KI verwendet werden, um WASM-Artefakte zu validieren und zu packen. Installieren mit:
cargo install wasm-tools --version '=1.201.0' --locked
Konfigurieren der Entwicklungsumgebung
Sie können über eine benutzerdefinierte Microsoft Azure DevOps-Registrierung auf das WASM Rust SDK zugreifen. Statt Umgebungsvariablen zu verwenden, konfigurieren Sie den Zugriff mit einer Arbeitsbereichskonfigurationsdatei:
# .cargo/config.toml (at your workspace root)
[registries]
aio-wg = { index = "sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/" }
[net]
git-fetch-with-cli = true
Dieses Setup spiegelt das Beispiellayout bei samples/wasm/.cargo/config.toml und behält Registrierungseinstellungen in der Versionssteuerung bei.
Projekt erstellen
Erstellen Sie zunächst ein neues Projektverzeichnis für Ihr Operatormodul. Die Projektstruktur hängt von der ausgewählten Sprache ab.
cargo new --lib temperature-converter
cd temperature-converter
Konfigurieren von „Cargo.toml“
Bearbeiten Sie die Cargo.toml Datei, um Abhängigkeiten für das WASM SDK und andere Bibliotheken einzuschließen:
[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"
[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"
# Azure IoT Operations WASM SDK - provides operator macros and host APIs
wasm_graph_sdk = { version = "=1.1.3", registry = "aio-wg" }
# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }
[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]
Hinweise zu Versionen und Registrierung:
- Die SDK-Version (
=1.1.3) stimmt mit den aktuellen Beispielen überein; wenn Sie sie angeheftet lassen, vermeiden Sie fehlerhafte Änderungen. -
registry = "aio-wg"entspricht dem Registriereintrag, der in.cargo/config.tomldefiniert ist.
Die wichtigsten Abhängigkeiten wurden erläutert:
-
wit-bindgen: Generiert Rust-Bindungen aus WebAssembly Interface Types (WIT)-Definitionen, damit Ihr Code mit der WASM-Laufzeit kommunizieren kann. -
wasm_graph_sdk: Das Azure IoT Operations SDK stellt Operatormakros bereit, wie z.B.#[map_operator], und#[filter_operator]Host-APIs für Protokollierung, Metriken und Zustandsverwaltung. -
serde+serde_json: JSON-Verarbeitungsbibliotheken zum Analysieren und Generieren von Datennutzlasten.default-features = falseoptimiert für WASM-Größenbeschränkungen. -
crate-type = ["cdylib"]: Kompiliert die Rust-Bibliothek als C-kompatible dynamische Bibliothek, die für die WASM-Modulgenerierung erforderlich ist.
Erstellen eines einfachen Moduls
Erstellen Sie ein einfaches Modul, das die Temperatur von Celsius in Fahrenheit umwandelt. In diesem Beispiel wird die grundlegende Struktur- und Verarbeitungslogik für Rust- und Python-Implementierungen veranschaulicht.
use serde_json::{json, Value};
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
fn fahrenheit_to_celsius_init(_configuration: ModuleConfiguration) -> bool {
logger::log(Level::Info, "temperature-converter", "Init invoked"); // one-time module init
true
}
#[map_operator(init = "fahrenheit_to_celsius_init")]
fn fahrenheit_to_celsius(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut result) = input else {
return Err(Error {
message: "Unexpected input type".to_string(),
});
};
let payload = &result.payload.read(); // payload bytes from inbound message
if let Ok(data_str) = std::str::from_utf8(payload) {
if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
if let Some(temp) = data["temperature"]["value"].as_f64() {
let fahrenheit = (temp * 9.0 / 5.0) + 32.0; // Celsius -> Fahrenheit
data["temperature"] = json!({
"value_fahrenheit": fahrenheit,
"original_celsius": temp
});
if let Ok(output_str) = serde_json::to_string(&data) {
// Replace payload with owned bytes so the host receives the updated JSON
result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
}
}
}
}
Ok(DataModel::Message(result))
}
Erstellungsmodul
Wählen Sie basierend auf ihren Entwicklungsworkflow- und Umgebungsanforderungen zwischen lokalen Entwicklungsbuilds oder containerisierten Builds aus.
Lokaler Build
Führen Sie die Erstellung direkt auf Ihrem Entwicklungscomputer durch, um die schnellste Iteration während der Entwicklung und für den Zeitpunkt zu ermöglichen, zu dem Sie die vollständige Kontrolle über die Buildumgebung benötigen.
# Build WASM module
cargo build --release --target wasm32-wasip2 # target required for Azure IoT Operations WASM components
# Find your module
ls target/wasm32-wasip2/release/*.wasm
Docker-Build
Erstellen Sie mithilfe von containerisierten Umgebungen mit allen vorkonfigurierten Abhängigkeiten und Schemas. Diese Docker-Images bieten konsistente Builds in verschiedenen Umgebungen und eignen sich ideal für CI/CD-Pipelines.
Das Repository für Azure IoT Operations-Beispiele verwaltet den Rust Docker Builder und enthält alle erforderlichen Abhängigkeiten. Ausführliche Dokumentation finden Sie in der Verwendung des Rust Docker Builder.
# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter
# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug
Docker-Buildoptionen:
-
--app-name: Muss Ihrem Rust-Cratenamen ausCargo.tomlentsprechen. -
--build-mode: Wählen Sierelease(Standard) für optimierte Builds oderdebugfür Entwicklungsbuilds mit Symbolen aus.
Weitere Beispiele
Umfassende Beispiele finden Sie in den Rust-Beispielen im Beispiel-Repository. Zu den vollständigen Implementierungen gehören folgende:
- map-Operatoren: Datentransformation und Konvertierungslogik
- filter-Operatoren: Bedingte Datenverarbeitung und Validierung
- brach-Operatoren: Routing mit mehreren Pfaden basierend auf Dateninhalten
- accumulate-Operatoren: Zeitfensteraggregation und statistische Verarbeitung
- delay-Operatoren: Zeitbasierte Verarbeitungssteuerung
Die Beispiele veranschaulichen Arbeitsimplementierungen, die die vollständige Struktur für jeden Operatortyp darstellen, einschließlich der richtigen Fehlerbehandlung und Protokollierungsmuster.
SDK-Referenz und APIs
Das WASM Rust SDK bietet umfassende Entwicklungstools:
Operatormakros
use wasm_graph_sdk::macros::{map_operator, filter_operator, branch_operator};
use wasm_graph_sdk::{DataModel, HybridLogicalClock};
// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> Result<DataModel, Error> {
// Transform logic here
}
// Filter operator - allows/rejects data based on predicate
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> Result<bool, Error> {
// Return true to pass data through, false to filter out
}
// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> Result<bool, Error> {
// Return true for "True" arm, false for "False" arm
}
Modulkonfigurationsparameter
Ihre WASM-Operatoren können Laufzeitkonfigurationsparameter über die ModuleConfiguration an die init Funktion übergebene Struktur empfangen. Sie definieren diese Parameter in der Diagrammdefinition, mit der Sie die Laufzeit anpassen können, ohne Module neu zu erstellen.
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::ModuleConfiguration;
fn my_operator_init(configuration: ModuleConfiguration) -> bool {
// Access required parameters
if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
}
// Access optional parameters with defaults
let unit = configuration.parameters
.get("output_unit")
.map(|s| s.as_str())
.unwrap_or("celsius");
logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
true
}
Ausführliche Informationen zum Definieren von Konfigurationsparametern in Diagrammdefinitionen finden Sie unter Modulkonfigurationsparameter.
Hosten von APIs
Verwenden Sie das SDK, um mit verteilten Diensten zu arbeiten:
Zustandsspeicher für persistente Daten:
use wasm_graph_sdk::state_store;
// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;
// Get value
let response = state_store::get(key.as_bytes(), None)?;
// Delete key
state_store::del(key.as_bytes(), None, None)?;
Strukturierte Protokollierung:
use wasm_graph_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
Mit OpenTelemetry kompatible Metriken:
use wasm_graph_sdk::metrics;
// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;
// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;
ONNX-Inferenz mit WASM
Informationen zum Einbetten und Ausführen kleiner ONNX-Modelle in Ihre Module zur In-Band-Inference finden Sie unter Ausführen von ONNX-Inference in WebAssembly-Datenflussdiagrammen. In diesem Artikel werden Verpackungsmodelle mit Modulen behandelt, die Aktivierung des WASI-NN-Features in Graphdefinitionen beschrieben und Einschränkungen aufgezeigt.
WIT (WebAssembly Interface Types, WebAssembly-Schnittstellentypen)
Alle Operatoren implementieren standardisierte Schnittstellen, die mithilfe von WIT (WebAssembly Interface Types, WebAssembly-Schnittstellentypen) definiert werden. WIT stellt sprachunabhängige Schnittstellendefinitionen bereit, die die Kompatibilität zwischen WASM-Modulen und der Hostlaufzeit sicherstellen.
Sie finden die vollständigen WIT-Schemas für Azure IoT Operations im Beispiel-Repository. Diese Schemas definieren alle Schnittstellen, Typen und Datenstrukturen, mit die Sie beim Entwickeln von WASM-Modulen arbeiten.
Datenmodell und Schnittstellen
Alle WASM-Operatoren arbeiten mit standardisierten Datenmodellen, die mithilfe von WebAssembly Interface Types (WIT) definiert sind:
Kerndatenmodell
// Core timestamp structure using hybrid logical clock
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
node-id: buffer-or-string, // Logical node identifier
}
// Union type supporting multiple data formats
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // Structured messages with metadata
snapshot(snapshot), // Video/image frames with timestamps
}
// Structured message format
record message {
timestamp: timestamp,
content_type: buffer-or-string,
payload: message-payload,
}
WIT-Schnittstellendefinitionen
Jeder Operatortyp implementiert eine bestimmte WIT-Schnittstelle:
// Core operator interfaces
interface map {
use types.{data-model};
process: func(message: data-model) -> result<data-model, error>;
}
interface filter {
use types.{data-model};
process: func(message: data-model) -> result<bool, error>;
}
interface branch {
use types.{data-model, hybrid-logical-clock};
process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}
interface accumulate {
use types.{data-model};
process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}
Graphdefinitionen und WASM-Integration
Diagrammdefinitionen zeigen, wie Ihre WASM-Module eine Verbindung mit Verarbeitungsworkflows herstellen. Sie geben die Vorgänge, Verbindungen und Parameter an, die vollständige Datenverarbeitungspipelines erstellen.
Umfassende Informationen zum Erstellen und Konfigurieren von Diagrammdefinitionen, einschließlich detaillierter Beispiele für einfache und komplexe Workflows, finden Sie unter Konfigurieren von WebAssembly-Diagrammdefinitionen für Datenflussdiagramme.
Wichtige Themen, die im Diagrammdefinitionshandbuch behandelt werden:
- Diagrammdefinitionsstruktur: Grundlegendes zum YAML-Schema und erforderlichen Komponenten
- Einfaches Diagrammbeispiel: Grundlegende dreistufige Temperaturkonvertierungspipeline
- Komplexes Diagrammbeispiel: Multisensorverarbeitung mit Verzweigung und Aggregation
- Modulkonfigurationsparameter: Laufzeitanpassung von WASM-Operatoren
- Bereitstellung von Registrierungen: Verpacken und Speichern von Graphdefinitionen als OCI-Artefakte
Nächste Schritte
- Sehen Sie sich vollständige Beispiele und erweiterte Muster im Repository mit Azure IoT Einsatz-WASM-Beispielen an.
- Unter Verwenden von WebAssembly mit Datenflussdiagrammen erfahren Sie, wie Sie Ihre Module bereitstellen.
- Konfigurieren Sie Ihre Datenflussendpunkte gemäß der Anweisungen unter Konfigurieren von Datenflussendpunkten.