Udostępnij przez


Tworzenie modułów WebAssembly (WASM) i definicji grafów dla wykresów przepływu danych

W tym artykule pokazano, jak opracowywać niestandardowe moduły webAssembly (WASM) i definicje grafów dla wykresów przepływu danych operacji usługi Azure IoT. Tworzenie modułów w języku Rust lub Python w celu zaimplementowania logiki przetwarzania niestandardowego. Zdefiniuj konfiguracje grafu, które określają sposób łączenia modułów z kompletnymi przepływami pracy przetwarzania.

Ważne

Wykresy przepływu danych obsługują obecnie tylko punkty końcowe MQTT, Kafka i OpenTelemetry. Inne typy punktów końcowych, takie jak Azure Data Lake, Microsoft Fabric OneLake, Azure Data Explorer i magazyn lokalny, nie są obsługiwane. Aby uzyskać więcej informacji, zobacz Znane problemy.

Aby dowiedzieć się, jak opracowywać moduły WASM przy użyciu rozszerzenia programu VS Code, zobacz Build WASM modules with VS Code extension (Kompilowanie modułów WASM z rozszerzeniem programu VS Code).

Aby dowiedzieć się więcej na temat grafów i funkcji WASM w operacjach usługi Azure IoT, zobacz:

Przegląd

Wykresy przepływu danych operacji usługi Azure IoT przetwarzają dane przesyłane strumieniowo za pomocą konfigurowalnych operatorów zaimplementowanych jako moduły zestawu WebAssembly. Każdy operator przetwarza dane ze znacznikami czasu przy zachowaniu kolejności czasowej, umożliwiając analizę w czasie rzeczywistym przy użyciu wyników deterministycznych.

Najważniejsze korzyści

  • Przetwarzanie w czasie rzeczywistym: Obsługa danych przesyłanych strumieniowo z spójnym małym opóźnieniem
  • Semantyka czasu zdarzenia: przetwarzanie danych na podstawie czasu wystąpienia zdarzeń, a nie podczas ich przetwarzania
  • Odporność na uszkodzenia: wbudowana obsługa błędów i zapewnianie spójności danych
  • Skalowalność: dystrybuuj przetwarzanie między wieloma węzłami przy zachowaniu gwarancji zamówień
  • Obsługa wielu języków: programowanie w języku Rust lub Python przy użyciu spójnych interfejsów

Podstawy architektury

Wykresy przepływu danych opierają się na modelu obliczeniowym Przepływ danych czasowy , który pochodzi z projektu Naiad firmy Microsoft Research. Takie podejście zapewnia:

  • Przetwarzanie deterministyczne: te same dane wejściowe zawsze generują te same dane wyjściowe
  • Śledzenie postępu: system wie, kiedy obliczenia są ukończone
  • Koordynacja rozproszona: synchronizacja wielu węzłów przetwarzania

Dlaczego warto używać terminowego przepływu danych?

Tradycyjne systemy przetwarzania strumieniowego mają kilka wyzwań. Dane poza kolejnością oznaczają, że zdarzenia mogą pojawić się później niż oczekiwano. Częściowe wyniki sprawiają, że trudno jest wiedzieć, kiedy obliczenia zakończą się. Problemy z koordynacją występują podczas synchronizowania rozproszonego przetwarzania.

Terminowy przepływ danych rozwiązuje te problemy, wykonując następujące czynności:

Znaczniki czasu i śledzenie postępu

Każdy element danych zawiera znacznik czasu reprezentujący jego czas logiczny. System śledzi postęp przez znaczniki czasu, włączając kilka kluczowych możliwości:

  • Przetwarzanie deterministyczne: te same dane wejściowe zawsze generują te same dane wyjściowe
  • Dokładnie raz semantyka: brak zduplikowanego lub pominiętego przetwarzania
  • Znaki wodne: Dowiedz się, kiedy nie będą już dostarczane żadne dane przez dany czas

Zegar logiczny hybrydowy

Mechanizm znacznika czasu używa podejścia hybrydowego:

pub struct HybridLogicalClock {
    pub physical_time: u64,  // Wall-clock time when event occurred
    pub logical_time: u64,   // Logical ordering for events at same physical time
}

Hybrydowe podejście zegara logicznego zapewnia kilka możliwości:

  • Kolejność przyczynowa: efekty są zgodne z przyczynami
  • Gwarancje postępu: system wie, kiedy przetwarzanie zostało zakończone
  • Koordynacja rozproszona: Synchronizacja wielu węzłów jest zsynchronizowana

Omówienie operatorów i modułów

Zrozumienie rozróżnienia między operatorami i modułami jest niezbędne w przypadku opracowywania programu WASM:

Operatorów

Operatory są podstawowymi jednostkami przetwarzania opartymi na operatorach przepływu danych timely. Każdy typ operatora służy do określonego celu:

  • Mapa: Przekształcanie każdego elementu danych (na przykład konwertowanie jednostek temperatury)
  • Filtr: zezwalaj na przekazywanie tylko niektórych elementów danych na podstawie warunków (takich jak usuwanie nieprawidłowych odczytów)
  • Gałąź: przekierowywanie danych do różnych ścieżek na podstawie warunków (takich jak oddzielanie danych temperatury i wilgotności)
  • Gromadzenie: zbieranie i agregowanie danych w oknach czasowych (takich jak obliczenia podsumowań statystycznych)
  • Łączenie: scalanie wielu strumieni danych przy zachowaniu kolejności czasowej
  • Opóźnienie: Kontrola czasu przez przesuwanie znaczników czasu

Modules

Moduły to implementacja logiki operatora jako kod WASM. Pojedynczy moduł może implementować wiele typów operatorów. Na przykład moduł temperatury może zapewnić następujące elementy:

  • Operator mapy na potrzeby konwersji jednostek
  • Operator filtru na potrzeby sprawdzania progów
  • Operator gałęzi do podejmowania decyzji dotyczących routingu
  • Operator skumulowany dla agregacji statystycznej

Relacja

Relacja między definicjami, modułami i operatorami grafu jest zgodna z określonym wzorcem:

Graph Definition → References Module → Provides Operator → Processes Data
     ↓                    ↓               ↓              ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C

To rozdzielenie umożliwia:

  • Ponowne użycie modułu: wdrażanie tego samego modułu WASM w różnych konfiguracjach grafu
  • Niezależne przechowywanie wersji: aktualizowanie definicji grafu bez ponownego kompilowania modułów
  • Konfiguracja dynamiczna: przekazywanie różnych parametrów do tego samego modułu w przypadku różnych zachowań

Wymagania wstępne

Wybierz język programowania i skonfiguruj wymagane narzędzia:

  • Łańcuch narzędzi Rust udostępnia cargo, rustc oraz bibliotekę standardową wymaganą do kompilowania operatorów. Zainstaluj przy użyciu:

    curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
    
  • Element docelowy WASM wasm32-wasip2 wymagany do kompilowania składników WASM operacji usługi Azure IoT. Dodaj za pomocą:

    rustup target add wasm32-wasip2
    
  • Narzędzia kompilacji udostępniają funkcjonalności używane przez buildery i systemy CI do weryfikacji i pakietowania artefaktów WASM. Zainstaluj przy użyciu:

    cargo install wasm-tools --version '=1.201.0' --locked
    

Konfigurowanie środowiska programowania

Dostęp do zestawu SDK WASM Rust można uzyskać za pomocą niestandardowego rejestru DevOps platformy Microsoft Azure. Zamiast używać zmiennych środowiskowych, skonfiguruj dostęp za pomocą pliku konfiguracji obszaru roboczego:

# .cargo/config.toml (at your workspace root)
[registries]
aio-wg = { index = "sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/" }

[net]
git-fetch-with-cli = true

Ta konfiguracja odzwierciedla przykładowy układ pod adresem samples/wasm/.cargo/config.toml i przechowuje ustawienia rejestru w kontroli wersji.

Tworzenie projektu

Zacznij od utworzenia nowego katalogu projektu dla modułu operatora. Struktura projektu zależy od wybranego języka.

cargo new --lib temperature-converter
cd temperature-converter

Konfigurowanie pliku Cargo.toml

Edytuj plik, Cargo.toml aby uwzględnić zależności dla zestawu SDK WASM i innych bibliotek:

[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"

[dependencies]
# WebAssembly Interface Types (WIT) code generation
wit-bindgen = "0.22"

# Azure IoT Operations WASM SDK - provides operator macros and host APIs
wasm_graph_sdk = { version = "=1.1.3", registry = "aio-wg" }

# JSON serialization/deserialization for data processing
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }

[lib]
# Required for WASM module compilation
crate-type = ["cdylib"]

Uwagi dotyczące wersji i rejestru:

  • Wersja zestawu SDK (=1.1.3) jest zgodna z aktualnymi przykładami; utrzymywanie jej w stałej wersji pozwala uniknąć zmian powodujących niezgodność.
  • registry = "aio-wg" pasuje do wpisu rejestru zdefiniowanego w pliku .cargo/config.toml.

Objaśniono kluczowe zależności:

  • wit-bindgen: Generuje wiązania Rust z definicji WebAssembly Interface Types (WIT), umożliwiając kodowi interakcję ze środowiskiem uruchomieniowym WASM.
  • wasm_graph_sdk: Zestaw SDK operacji usługi Azure IoT udostępnia makra operatorów, takie jak #[map_operator] i #[filter_operator], oraz interfejsy API hosta na potrzeby rejestrowania, metryk i zarządzania stanem.
  • serde + serde_json: biblioteki przetwarzania JSON do analizowania i generowania ładunków danych. default-features = false optymalizuje pod kątem ograniczeń rozmiaru WASM.
  • crate-type = ["cdylib"]: kompiluje bibliotekę Rust jako bibliotekę dynamiczną zgodną z językiem C, która jest wymagana do generowania modułu WASM.

Tworzenie prostego modułu

Utwórz prosty moduł, który konwertuje temperaturę z stopni Celsjusza na Fahrenheit. W tym przykładzie przedstawiono podstawową strukturę i logikę przetwarzania dla implementacji języka Rust i Python.

use serde_json::{json, Value};

use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;

fn fahrenheit_to_celsius_init(_configuration: ModuleConfiguration) -> bool {
    logger::log(Level::Info, "temperature-converter", "Init invoked"); // one-time module init
    true
}

#[map_operator(init = "fahrenheit_to_celsius_init")]
fn fahrenheit_to_celsius(input: DataModel) -> Result<DataModel, Error> {
    let DataModel::Message(mut result) = input else {
        return Err(Error {
            message: "Unexpected input type".to_string(),
        });
    };

    let payload = &result.payload.read(); // payload bytes from inbound message
    if let Ok(data_str) = std::str::from_utf8(payload) {
        if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
            if let Some(temp) = data["temperature"]["value"].as_f64() {
                let fahrenheit = (temp * 9.0 / 5.0) + 32.0; // Celsius -> Fahrenheit
                data["temperature"] = json!({
                    "value_fahrenheit": fahrenheit,
                    "original_celsius": temp
                });

                if let Ok(output_str) = serde_json::to_string(&data) {
                    // Replace payload with owned bytes so the host receives the updated JSON
                    result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
                }
            }
        }
    }

    Ok(DataModel::Message(result))
}

Moduł kompilacji

Wybierz między lokalnymi kompilacjami deweloperów lub konteneryzowanymi kompilacjami na podstawie wymagań dotyczących przepływu pracy programowania i środowiska.

Kompilacja lokalna

Kompiluj bezpośrednio na maszynie deweloperów w celu uzyskania najszybszej iteracji podczas programowania i gdy potrzebujesz pełnej kontroli nad środowiskiem kompilacji.

# Build WASM module
cargo build --release --target wasm32-wasip2  # target required for Azure IoT Operations WASM components

# Find your module  
ls target/wasm32-wasip2/release/*.wasm

Kompilacja platformy Docker

Kompiluj przy użyciu środowisk konteneryzowanych ze wszystkimi zależnościami i wstępnie skonfigurowanymi schematami. Te obrazy platformy Docker zapewniają spójne kompilacje w różnych środowiskach i są idealne dla potoków ciągłej integracji/ciągłego wdrażania.

Repozytorium przykładów operacji usługi Azure IoT obsługuje konstruktora Rust Docker i zawiera wszystkie niezbędne zależności. Aby uzyskać szczegółową dokumentację, zobacz Rust Docker builder usage (Użycie narzędzia Rust Docker Builder).

# Build release version (optimized for production)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter

# Build debug version (includes debugging symbols and less optimization)
docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name temperature-converter --build-mode debug

Opcje kompilacji platformy Docker:

  • --app-name: Musi być zgodna z nazwą skrzyni rdzy z Cargo.toml
  • --build-mode: wybierz release (ustawienie domyślne) dla zoptymalizowanych kompilacji lub debug kompilacji deweloperskich z symbolami

Więcej przykładów

Aby zapoznać się z kompleksowymi przykładami, zobacz przykłady rust w repozytorium przykładów. Kompletne implementacje obejmują:

  • Operatory mapy: przekształcanie i logika konwersji danych
  • Operatory filtrów: warunkowe przetwarzanie i walidacja danych
  • Operatory gałęzi: routing wielościeżkowy na podstawie zawartości danych
  • Operatory kumulowania: agregacja okien czasowych i przetwarzanie statystyczne
  • Operatory opóźnień: kontrolka przetwarzania na podstawie czasu

W przykładach pokazano działające implementacje, które pokazują pełną strukturę dla każdego typu operatora, w tym odpowiednie wzorce obsługi błędów i rejestrowania.

Dokumentacja zestawu SDK i interfejsy API

Zestaw SDK WASM Rust udostępnia kompleksowe narzędzia programistyczne:

Makra operatorów

use wasm_graph_sdk::macros::{map_operator, filter_operator, branch_operator};
use wasm_graph_sdk::{DataModel, HybridLogicalClock};

// Map operator - transforms each data item
#[map_operator(init = "my_init_function")]
fn my_map(input: DataModel) -> Result<DataModel, Error> {
    // Transform logic here
}

// Filter operator - allows/rejects data based on predicate  
#[filter_operator(init = "my_init_function")]
fn my_filter(input: DataModel) -> Result<bool, Error> {
    // Return true to pass data through, false to filter out
}

// Branch operator - routes data to different arms
#[branch_operator(init = "my_init_function")]
fn my_branch(input: DataModel, timestamp: HybridLogicalClock) -> Result<bool, Error> {
    // Return true for "True" arm, false for "False" arm
}

Parametry konfiguracji modułu

Operatorzy WASM mogą odbierać parametry konfiguracji środowiska uruchomieniowego za pośrednictwem ModuleConfiguration struktury przekazanej init do funkcji. Te parametry definiuje się w definicji grafu, co umożliwia dostosowanie środowiska uruchomieniowego bez ponownego kompilowania modułów.

use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::ModuleConfiguration;

fn my_operator_init(configuration: ModuleConfiguration) -> bool {
    // Access required parameters
    if let Some(threshold_param) = configuration.parameters.get("temperature_threshold") {
        let threshold: f64 = threshold_param.parse().unwrap_or(25.0);
        logger::log(Level::Info, "my-operator", &format!("Using threshold: {}", threshold));
    }
    
    // Access optional parameters with defaults
    let unit = configuration.parameters
        .get("output_unit")
        .map(|s| s.as_str())
        .unwrap_or("celsius");
    
    logger::log(Level::Info, "my-operator", &format!("Output unit: {}", unit));
    true
}

Aby uzyskać szczegółowe informacje na temat definiowania parametrów konfiguracji w definicjach grafu, zobacz Parametry konfiguracji modułu.

Interfejsy API hosta

Użyj zestawu SDK do pracy z usługami rozproszonymi:

Magazyn stanów dla danych trwałych:

use wasm_graph_sdk::state_store;

// Set value
state_store::set(key.as_bytes(), value.as_bytes(), None, None, options)?;

// Get value  
let response = state_store::get(key.as_bytes(), None)?;

// Delete key
state_store::del(key.as_bytes(), None, None)?;

Rejestrowanie strukturalne:

use wasm_graph_sdk::logger::{self, Level};

logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));

Metryki zgodne z technologią OpenTelemetry:

use wasm_graph_sdk::metrics;

// Increment counter
metrics::add_to_counter("requests_total", 1.0, Some(labels))?;

// Record histogram value
metrics::record_to_histogram("processing_duration", duration_ms, Some(labels))?;

Wnioskowanie ONNX za pomocą WASM

Aby osadzić i uruchamiać małe modele ONNX wewnątrz modułów na potrzeby wnioskowania wewnątrz pasmowego, zobacz Run ONNX inference in WebAssembly data flow graphs (Uruchamianie wnioskowania ONNX w wykresach przepływu danych zestawu WebAssembly). W tym artykule opisano tworzenie pakietów modeli z modułami, włączanie funkcji wasi-nn w definicjach grafów i ograniczenia.

Typy interfejsów zestawu WebAssembly (WIT)

Wszystkie operatory implementują standardowe interfejsy zdefiniowane przy użyciu typów interfejsów zestawu WebAssembly (WIT). Funkcja WIT udostępnia niezależne od języka definicje interfejsów, które zapewniają zgodność między modułami WASM a środowiskiem uruchomieniowym hosta.

Pełne schematy funkcji WIT dla operacji usługi Azure IoT można znaleźć w repozytorium przykładów. Te schematy definiują wszystkie interfejsy, typy i struktury danych, z których pracujesz podczas tworzenia modułów WASM.

Model danych i interfejsy

Wszystkie operatory WASM współpracują ze standardowymi modelami danych zdefiniowanymi przy użyciu typów interfejsów zestawu WebAssembly (WIT):

Podstawowy model danych

// Core timestamp structure using hybrid logical clock
record timestamp {
    timestamp: timespec,     // Physical time (seconds + nanoseconds)
    node-id: buffer-or-string,  // Logical node identifier
}

// Union type supporting multiple data formats
variant data-model {
    buffer-or-bytes(buffer-or-bytes),    // Raw byte data
    message(message),                    // Structured messages with metadata
    snapshot(snapshot),                  // Video/image frames with timestamps
}

// Structured message format
record message {
    timestamp: timestamp,
    content_type: buffer-or-string,
    payload: message-payload,
}

Definicje interfejsu WIT

Każdy typ operatora implementuje określony interfejs funkcji WIT:

// Core operator interfaces
interface map {
    use types.{data-model};
    process: func(message: data-model) -> result<data-model, error>;
}

interface filter {
    use types.{data-model};
    process: func(message: data-model) -> result<bool, error>;
}

interface branch {
    use types.{data-model, hybrid-logical-clock};
    process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}

interface accumulate {
    use types.{data-model};
    process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}

Definicje grafu i integracja z usługą WASM

Definicje wykresów pokazują, jak moduły WASM łączą się z przepływami pracy przetwarzania. Określają operacje, połączenia i parametry, które tworzą kompletne potoki przetwarzania danych.

Aby uzyskać kompleksowe informacje na temat tworzenia i konfigurowania definicji grafów, w tym szczegółowych przykładów prostych i złożonych przepływów pracy, zobacz Konfigurowanie definicji grafu zestawu WebAssembly dla grafów przepływu danych.

Najważniejsze tematy omówione w przewodniku po definicjach grafu:

  • Struktura definicji grafu: opis schematu YAML i wymaganych składników
  • Prosty przykład wykresu: Podstawowy potok konwersji temperatury trzyetapowej
  • Przykład złożonych grafów: Przetwarzanie wielu czujników z rozgałęzianiem i agregacją
  • Parametry konfiguracji modułu: dostosowywanie środowiska uruchomieniowego operatorów WASM
  • Wdrażanie rejestru: pakowanie i przechowywanie definicji grafu jako artefaktów OCI

Dalsze kroki