Freigeben über


Entwickeln von WebAssembly-Modulen (WASM) und Diagrammdefinitionen für Datenflussdiagramme

In diesem Artikel erfahren Sie, wie Sie benutzerdefinierte WebAssembly-Module (WASM) und Diagrammdefinitionen für Azure IoT Einsatz-Datenflussdiagramme entwickeln. Erstellen Sie Module in Rust oder Python, um benutzerdefinierte Verarbeitungslogik zu implementieren. Definieren Sie Diagrammkonfigurationen, die angeben, wie Ihre Module eine Verbindung mit vollständigen Verarbeitungsworkflows herstellen.

Von Bedeutung

Datenflussdiagramme unterstützen zur Zeit nur MQTT-, Kafka- und OpenTelemetry-Endpunkte. Andere Endpunkttypen wie Azure Data Lake, Microsoft Fabric OneLake, Azure Data Explorer und lokaler Speicher werden nicht unterstützt. Weitere Informationen finden Sie unter Bekannte Probleme.

Informationen zum Entwickeln von WASM-Modulen mithilfe der VS Code-Erweiterung finden Sie unter Build WASM-Module mit VS Code-Erweiterung.

Weitere Informationen zu Graphen und WASM in Azure IoT Operations finden Sie unter:

Überblick

Azure IoT Einsatz-Datenflussdiagramme verarbeiten Streamingdaten über konfigurierbare Operatoren, die als WebAssembly-Module implementiert werden. Jeder Operator verarbeitet mit Zeitstempel versehene Daten und behält gleichzeitig die zeitliche Anordnung bei. Dadurch werden Echtzeitanalysen mit deterministischen Ergebnissen ermöglicht.

Hauptvorteile

  • Echtzeitverarbeitung: Verarbeiten von Streamingdaten mit konsistenter niedriger Latenz
  • Ereigniszeitsemantik: Verarbeiten von Daten basierend darauf, wann Ereignisse aufgetreten sind, nicht basierend auf ihrem Verarbeitungszeitpunkt
  • Fehlertoleranz: Integrierte Unterstützung für die Behandlung von Fehlern und Sicherstellung der Datenkonsistenz
  • Skalierbarkeit: Verteilen der Verarbeitung über mehrere Knoten hinweg bei gleichzeitiger Aufrechterhaltung von Auftragsgarantien
  • Unterstützung für mehrere Sprachen: Entwickeln in Rust oder Python mit konsistenten Schnittstellen

Architekturumgebung

Datenflussdiagramme basieren auf dem Berechnungsmodell Timely Dataflow, das aus dem Naiad-Projekt von Microsoft Research stammt. Dieser Ansatz stellt Folgendes sicher:

  • Deterministische Verarbeitung: Die gleiche Eingabe erzeugt immer die gleiche Ausgabe.
  • Statusnachverfolgung: Das System weiß, wann Berechnungen abgeschlossen sind.
  • Verteilte Koordination: Mehrere Verarbeitungsknoten bleiben synchronisiert.

Warum timely dataflow verwenden?

Bei herkömmlichen Datenstromverarbeitungssysteme sind verschiedene Herausforderungen zu bewältigen. Unsortierte Daten bedeuten, dass Ereignisse später als erwartet eingehen können. Teilergebnisse machen es schwierig zu ermitteln, wann Berechnungen abgeschlossen sind. Koordinationsprobleme treten beim Synchronisieren der verteilten Verarbeitung auf.

Timely Dataflow löst diese Probleme durch:

Zeitstempel und Statusnachverfolgung

Jedes Datenelement hat einen Zeitstempel, der seine logische Zeit darstellt. Das System verfolgt den Status durch Zeitstempel nach und ermöglicht mehrere wichtige Funktionen:

  • Deterministische Verarbeitung: Die gleiche Eingabe erzeugt immer die gleiche Ausgabe.
  • Exactly-Once-Semantik: Keine duplizierte oder fehlende Verarbeitung
  • Wasserzeichen: Wissen, wann keine weiteren Daten für einen bestimmten Zeitraum mehr eingehen

Hybride logische Uhr

Der Zeitstempelmechanismus nutzt einen hybriden Ansatz:

pub struct HybridLogicalClock {
    pub physical_time: u64,  // Wall-clock time when event occurred
    pub logical_time: u64,   // Logical ordering for events at same physical time
}

Der Ansatz für die hybride logische Uhr stellt mehrere Funktionen sicher:

  • Kausale Reihenfolge: Effekte folgen Ursachen.
  • Statusgarantien: Das System weiß, wann die Verarbeitung abgeschlossen ist.
  • Verteilte Koordination: Mehrere Knoten bleiben synchronisiert.

Grundlegendes zu Operatoren und Modulen

Es ist wichtig, die Unterschiede zwischen Operatoren und Modulen zu kennen, da dies für die WASM-Entwicklung von wesentlicher Bedeutung ist:

Betriebspersonal

Operatoren sind die grundlegenden Verarbeitungseinheiten, die auf Timely Dataflow-Operatoren basieren. Jeder Operatortyp dient einem bestimmten Zweck:

  • Karte: Transformieren jedes Datenelements (z. B. Konvertieren von Temperatureinheiten)
  • Filter: Zulassen, dass nur bestimmte Datenelemente basierend auf Bedingungen durchlaufen werden (z. B. das Entfernen ungültiger Lesewerte)
  • Branch: Routen Sie Daten auf der Grundlage von Bedingungen auf verschiedene Pfade (wie z. B. die Trennung von Temperatur- und Luftfeuchtigkeitsdaten)
  • Akkumulation: Sammeln und Aggregieren von Daten innerhalb von Zeitfenstern (z. B. Berechnen statistischer Zusammenfassungen)
  • Verketten: Zusammenführen mehrerer Datenströme bei gleichzeitiger Beibehaltung der zeitlichen Reihenfolge
  • Verzögerung: Steuerung des Timings durch Vorverschiebung von Zeitstempeln

Module

Module sind die Implementierung der Operatorlogik als WASM-Code. Ein einzelnes Modul kann mehrere Operatortypen implementieren. Ein Temperaturmodul kann z. B. Folgendes bereitstellen:

  • map-Operator für die Einheitenkonvertierung
  • filter-Operator für die Schwellenwertüberprüfung
  • branch-Operator für Routingentscheidungen
  • accumulate-Operator für statistische Aggregation

Die Beziehung

Die Beziehung zwischen Diagrammdefinitionen, Modulen und Operatoren folgt einem bestimmten Muster:

Graph Definition → References Module → Provides Operator → Processes Data
     ↓                    ↓               ↓              ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C

Diese Trennung ermöglicht Folgendes:

  • Modulwiederverwendung: Bereitstellen des gleichen WASM-Moduls in verschiedenen Diagrammkonfigurationen
  • Unabhängige Versionsverwaltung: Aktualisieren von Diagrammdefinitionen ohne Neuerstellung von Modulen
  • Dynamische Konfiguration: Übergeben verschiedener Parameter an dasselbe Modul für unterschiedliche Verhaltensweisen

Voraussetzungen

Wählen Sie Ihre Entwicklungssprache aus, und richten Sie die erforderlichen Tools ein:

  • Rust-Toolkette bietet cargo, rustcund die Standardbibliothek, die zum Kompilieren von Operatoren benötigt wird. Installieren mit:

    curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
    
  • WASM wasm32-wasip2 Ziel, das zum Erstellen von Azure IoT Operations WASM-Komponenten erforderlich ist. Hinzufügen durch:

    rustup target add wasm32-wasip2
    
  • Build-Tools stellen Dienstprogramme bereit, die von den Buildern und der KI verwendet werden, um WASM-Artefakte zu validieren und zu packen. Installieren mit:

    cargo install wasm-tools --version '=1.201.0' --locked
    

Konfigurieren der Entwicklungsumgebung

Sie können über eine benutzerdefinierte Microsoft Azure DevOps-Registrierung auf das WASM Rust SDK zugreifen. Statt Umgebungsvariablen zu verwenden, konfigurieren Sie den Zugriff mit einer Arbeitsbereichskonfigurationsdatei:

# .cargo/config.toml (at your workspace root)
[registries]
aio-wg = { index = "sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/" }

[net]
git-fetch-with-cli = true

Dieses Setup spiegelt das Beispiellayout bei samples/wasm/.cargo/config.toml und behält Registrierungseinstellungen in der Versionssteuerung bei.

Projekt erstellen

Erstellen Sie zunächst ein neues Projektverzeichnis für Ihr Operatormodul. Die Projektstruktur hängt von der ausgewählten Sprache ab.

cargo new --lib temperature-converter
cd temperature-converter

Konfigurieren von „Cargo.toml“

Bearbeiten Sie die Cargo.toml Datei, um Abhängigkeiten für das WASM SDK und andere Bibliotheken einzuschließen:

[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"

[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"

# Azure IoT Operations WASM SDK - provides operator macros and host APIs
wasm_graph_sdk = { version = "=1.1.3", registry = "aio-wg" }

# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }

[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]

Hinweise zu Versionen und Registrierung:

  • Die SDK-Version (=1.1.3) stimmt mit den aktuellen Beispielen überein; wenn Sie sie angeheftet lassen, vermeiden Sie fehlerhafte Änderungen.
  • registry = "aio-wg" entspricht dem Registriereintrag, der in .cargo/config.toml definiert ist.

Die wichtigsten Abhängigkeiten wurden erläutert:

  • wit-bindgen: Generiert Rust-Bindungen aus WebAssembly Interface Types (WIT)-Definitionen, damit Ihr Code mit der WASM-Laufzeit kommunizieren kann.
  • wasm_graph_sdk: Das Azure IoT Operations SDK stellt Operatormakros bereit, wie z.B. #[map_operator], und #[filter_operator] Host-APIs für Protokollierung, Metriken und Zustandsverwaltung.
  • serde + serde_json: JSON-Verarbeitungsbibliotheken zum Analysieren und Generieren von Datennutzlasten. default-features = false optimiert für WASM-Größenbeschränkungen.
  • crate-type = ["cdylib"]: Kompiliert die Rust-Bibliothek als C-kompatible dynamische Bibliothek, die für die WASM-Modulgenerierung erforderlich ist.

Erstellen eines einfachen Moduls

Erstellen Sie ein einfaches Modul, das die Temperatur von Celsius in Fahrenheit umwandelt. In diesem Beispiel wird die grundlegende Struktur- und Verarbeitungslogik für Rust- und Python-Implementierungen veranschaulicht.

use serde_json::{json, Value};

use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;

fn fahrenheit_to_celsius_init(_configuration: ModuleConfiguration) -> bool {
    logger::log(Level::Info, "temperature-converter", "Init invoked"); // one-time module init
    true
}

#[map_operator(init = "fahrenheit_to_celsius_init")]
fn fahrenheit_to_celsius(input: DataModel) -> Result<DataModel, Error> {
    let DataModel::Message(mut result) = input else {
        return Err(Error {
            message: "Unexpected input type".to_string(),
        });
    };

    let payload = &result.payload.read(); // payload bytes from inbound message
    if let Ok(data_str) = std::str::from_utf8(payload) {
        if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
            if let Some(temp) = data["temperature"]["value"].as_f64() {
                let fahrenheit = (temp * 9.0 / 5.0) + 32.0; // Celsius -> Fahrenheit
                data["temperature"] = json!({
                    "value_fahrenheit": fahrenheit,
                    "original_celsius": temp
                });

                if let Ok(output_str) = serde_json::to_string(&data) {
                    // Replace payload with owned bytes so the host receives the updated JSON
                    result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
                }
            }
        }
    }

    Ok(DataModel::Message(result))
}

Erstellungsmodul

Wählen Sie basierend auf ihren Entwicklungsworkflow- und Umgebungsanforderungen zwischen lokalen Entwicklungsbuilds oder containerisierten Builds aus.

Lokaler Build

Führen Sie die Erstellung direkt auf Ihrem Entwicklungscomputer durch, um die schnellste Iteration während der Entwicklung und für den Zeitpunkt zu ermöglichen, zu dem Sie die vollständige Kontrolle über die Buildumgebung benötigen.

# Build WASM module
cargo build --release --target wasm32-wasip2  # target required for Azure IoT Operations WASM components

# Find your module  
ls target/wasm32-wasip2/release/*.wasm

Docker-Build

Erstellen Sie mithilfe von containerisierten Umgebungen mit allen vorkonfigurierten Abhängigkeiten und Schemas. Diese Docker-Images bieten konsistente Builds in verschiedenen Umgebungen und eignen sich ideal für CI/CD-Pipelines.

Das Repository für Azure IoT Operations-Beispiele verwaltet den Rust Docker Builder und enthält alle erforderlichen Abhängigkeiten. Ausführliche Dokumentation finden Sie in der Verwendung des Rust Docker Builder.

# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter

# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug

Docker-Buildoptionen:

  • --app-name: Muss Ihrem Rust-Cratenamen aus Cargo.toml entsprechen.
  • --build-mode: Wählen Sie release (Standard) für optimierte Builds oder debug für Entwicklungsbuilds mit Symbolen aus.

Weitere Beispiele

Umfassende Beispiele finden Sie in den Rust-Beispielen im Beispiel-Repository. Zu den vollständigen Implementierungen gehören folgende:

  • map-Operatoren: Datentransformation und Konvertierungslogik
  • filter-Operatoren: Bedingte Datenverarbeitung und Validierung
  • brach-Operatoren: Routing mit mehreren Pfaden basierend auf Dateninhalten
  • accumulate-Operatoren: Zeitfensteraggregation und statistische Verarbeitung
  • delay-Operatoren: Zeitbasierte Verarbeitungssteuerung

Die Beispiele veranschaulichen Arbeitsimplementierungen, die die vollständige Struktur für jeden Operatortyp darstellen, einschließlich der richtigen Fehlerbehandlung und Protokollierungsmuster.

SDK-Referenz und APIs

Das WASM Rust SDK bietet umfassende Entwicklungstools:

Operatormakros

use wasm_graph_sdk::macros::{map_operator, filter_operator, branch_operator};
use wasm_graph_sdk::{DataModel, HybridLogicalClock};

// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> Result<DataModel, Error> {
    // Transform logic here
}

// Filter operator - allows/rejects data based on predicate  
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> Result<bool, Error> {
    // Return true to pass data through, false to filter out
}

// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> Result<bool, Error> {
    // Return true for "True" arm, false for "False" arm
}

Modulkonfigurationsparameter

Ihre WASM-Operatoren können Laufzeitkonfigurationsparameter über die ModuleConfiguration an die init Funktion übergebene Struktur empfangen. Sie definieren diese Parameter in der Diagrammdefinition, mit der Sie die Laufzeit anpassen können, ohne Module neu zu erstellen.

use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::ModuleConfiguration;

fn my_operator_init(configuration: ModuleConfiguration) -> bool {
    // Access required parameters
    if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
        let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
        logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
    }
    
    // Access optional parameters with defaults
    let unit = configuration.parameters
        .get("output_unit")
        .map(|s| s.as_str())
        .unwrap_or("celsius");
    
    logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
    true
}

Ausführliche Informationen zum Definieren von Konfigurationsparametern in Diagrammdefinitionen finden Sie unter Modulkonfigurationsparameter.

Hosten von APIs

Verwenden Sie das SDK, um mit verteilten Diensten zu arbeiten:

Zustandsspeicher für persistente Daten:

use wasm_graph_sdk::state_store;

// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;

// Get value  
let response = state_store::get(key.as_bytes(), None)?;

// Delete key
state_store::del(key.as_bytes(), None, None)?;

Strukturierte Protokollierung:

use wasm_graph_sdk::logger::{self, Level};

logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));

Mit OpenTelemetry kompatible Metriken:

use wasm_graph_sdk::metrics;

// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;

// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;

ONNX-Inferenz mit WASM

Informationen zum Einbetten und Ausführen kleiner ONNX-Modelle in Ihre Module zur In-Band-Inference finden Sie unter Ausführen von ONNX-Inference in WebAssembly-Datenflussdiagrammen. In diesem Artikel werden Verpackungsmodelle mit Modulen behandelt, die Aktivierung des WASI-NN-Features in Graphdefinitionen beschrieben und Einschränkungen aufgezeigt.

WIT (WebAssembly Interface Types, WebAssembly-Schnittstellentypen)

Alle Operatoren implementieren standardisierte Schnittstellen, die mithilfe von WIT (WebAssembly Interface Types, WebAssembly-Schnittstellentypen) definiert werden. WIT stellt sprachunabhängige Schnittstellendefinitionen bereit, die die Kompatibilität zwischen WASM-Modulen und der Hostlaufzeit sicherstellen.

Sie finden die vollständigen WIT-Schemas für Azure IoT Operations im Beispiel-Repository. Diese Schemas definieren alle Schnittstellen, Typen und Datenstrukturen, mit die Sie beim Entwickeln von WASM-Modulen arbeiten.

Datenmodell und Schnittstellen

Alle WASM-Operatoren arbeiten mit standardisierten Datenmodellen, die mithilfe von WebAssembly Interface Types (WIT) definiert sind:

Kerndatenmodell

// Core timestamp structure using hybrid logical clock
record timestamp {
    timestamp: timespec,     // Physical time (seconds + nanoseconds)
    node-id: buffer-or-string,  // Logical node identifier
}

// Union type supporting multiple data formats
variant data-model {
    buffer-or-bytes(buffer-or-bytes),    // Raw byte data
    message(message),                    // Structured messages with metadata
    snapshot(snapshot),                  // Video/image frames with timestamps
}

// Structured message format
record message {
    timestamp: timestamp,
    content_type: buffer-or-string,
    payload: message-payload,
}

WIT-Schnittstellendefinitionen

Jeder Operatortyp implementiert eine bestimmte WIT-Schnittstelle:

// Core operator interfaces
interface map {
    use types.{data-model};
    process: func(message: data-model) -> result<data-model, error>;
}

interface filter {
    use types.{data-model};
    process: func(message: data-model) -> result<bool, error>;
}

interface branch {
    use types.{data-model, hybrid-logical-clock};
    process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}

interface accumulate {
    use types.{data-model};
    process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}

Graphdefinitionen und WASM-Integration

Diagrammdefinitionen zeigen, wie Ihre WASM-Module eine Verbindung mit Verarbeitungsworkflows herstellen. Sie geben die Vorgänge, Verbindungen und Parameter an, die vollständige Datenverarbeitungspipelines erstellen.

Umfassende Informationen zum Erstellen und Konfigurieren von Diagrammdefinitionen, einschließlich detaillierter Beispiele für einfache und komplexe Workflows, finden Sie unter Konfigurieren von WebAssembly-Diagrammdefinitionen für Datenflussdiagramme.

Wichtige Themen, die im Diagrammdefinitionshandbuch behandelt werden:

  • Diagrammdefinitionsstruktur: Grundlegendes zum YAML-Schema und erforderlichen Komponenten
  • Einfaches Diagrammbeispiel: Grundlegende dreistufige Temperaturkonvertierungspipeline
  • Komplexes Diagrammbeispiel: Multisensorverarbeitung mit Verzweigung und Aggregation
  • Modulkonfigurationsparameter: Laufzeitanpassung von WASM-Operatoren
  • Bereitstellung von Registrierungen: Verpacken und Speichern von Graphdefinitionen als OCI-Artefakte

Nächste Schritte