Freigeben über


Schnellstart: Erstellen Sie eine App mit Durable Task SDKs und Durable Task Scheduler

Die SDKs für dauerhafte Aufgaben bieten eine einfache Clientbibliothek für den Durable Task Scheduler. In dieser Schnellstartanleitung erfahren Sie, wie Sie Orchestrierungen erstellen, die das Aus-/Fan-In-Anwendungsmuster verwenden, um parallele Verarbeitung durchzuführen.

Von Bedeutung

Derzeit sind die sdKs für dauerhafte Aufgaben für JavaScript und PowerShell nicht verfügbar.

Von Bedeutung

Derzeit sind die sdKs für dauerhafte Aufgaben für JavaScript und PowerShell nicht verfügbar.

  • Richten Sie den Emulator "Durable Task Scheduler" für die lokale Entwicklung ein und führen Sie diesen aus.
  • Führen Sie die Arbeiter- und Clientprojekte aus.
  • Überprüfen Sie den Status und die Historie der Orchestrierung über das Dashboard des Durable Task Schedulers.

Voraussetzungen

Bevor Sie beginnen:

Einrichten des Emulators "Durable Task Scheduler"

Der Anwendungscode sucht nach einer bereitgestellten Scheduler- und Task Hub-Ressource. Werden diese nicht gefunden, greift der Code auf den Emulator zurück. Der Emulator simuliert einen Planer und einen Aufgabenhub in einem Docker-Container, sodass er ideal für die lokale Entwicklung geeignet ist, die in dieser Schnellstartanleitung erforderlich ist.

  1. Navigieren Sie im Azure-Samples/Durable-Task-Scheduler Stammverzeichnis zum .NET SDK-Beispielverzeichnis.

    cd samples/durable-task-sdks/dotnet/FanOutFanIn
    
  2. Pullen Sie das Docker-Image für den Emulator:

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Führen Sie den Emulator aus. Der Container kann einige Sekunden dauern, bis er bereit ist.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Da der Beispielcode automatisch die Standardeinstellungen des Emulators verwendet, müssen Sie keine Umgebungsvariablen festlegen. Die Standard-Emulatoreinstellungen für diese Schnellstartanleitung sind:

  • Endpunkt: http://localhost:8080
  • Aufgabenhub: default
  1. Navigieren Sie im Azure-Samples/Durable-Task-Scheduler Stammverzeichnis zum Python SDK-Beispielverzeichnis.

    cd samples/durable-task-sdks/python/fan-out-fan-in
    
  2. Pullen Sie das Docker-Image für den Emulator:

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Führen Sie den Emulator aus. Der Container kann einige Sekunden dauern, bis er bereit ist.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Da der Beispielcode automatisch die Standardeinstellungen des Emulators verwendet, müssen Sie keine Umgebungsvariablen festlegen. Die Standard-Emulatoreinstellungen für diese Schnellstartanleitung sind:

  • Endpunkt: http://localhost:8080
  • Aufgabenhub: default
  1. Navigieren Sie im Azure-Samples/Durable-Task-Scheduler Stammverzeichnis zum Java SDK-Beispielverzeichnis.

    cd samples/durable-task-sdks/java/fan-out-fan-in
    
  2. Pullen Sie das Docker-Image für den Emulator:

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Führen Sie den Emulator aus. Der Container kann einige Sekunden dauern, bis er bereit ist.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Da der Beispielcode automatisch die Standardeinstellungen des Emulators verwendet, müssen Sie keine Umgebungsvariablen festlegen. Die Standard-Emulatoreinstellungen für diese Schnellstartanleitung sind:

  • Endpunkt: http://localhost:8080
  • Aufgabenhub: default

Den Schnellstart ausführen

  1. Navigieren Sie im Verzeichnis FanOutFanIn zum Verzeichnis Worker, um den Worker zu erstellen und auszuführen.

    cd Worker
    dotnet build
    dotnet run
    
  2. Navigieren Sie in einem separaten Terminal vom FanOutFanIn Verzeichnis zum Client Verzeichnis, um den Client zu erstellen und auszuführen.

    cd Client
    dotnet build
    dotnet run
    

Grundlegendes zur Ausgabe

Wenn Sie dieses Beispiel ausführen, erhalten Sie die Ausgabe von den Worker- und von den Clientprozessen. Entpacken Sie, was im Code passiert ist, wenn Sie das Projekt ausgeführt haben.

Arbeitsleistung

In der Workerausgabe wird Folgendes angezeigt:

  • Registrierung des Orchestrators und der Aktivitäten
  • Protokollierung beim Aufruf jeder Aktivität
  • Parallele Verarbeitung mehrerer Arbeitsaufgaben
  • Endgültige Aggregation der Ergebnisse

Clientausgabe

Die Clientausgabe zeigt Folgendes an:

  • Die Orchestrierung beginnt mit einer Liste von Arbeitselementen.
  • Die eindeutige Orchestrierungsinstanz-ID
  • Die endgültigen aggregierten Ergebnisse mit den einzelnen Arbeitsaufgaben und dem entsprechenden Ergebnis
  • Gesamtanzahl der verarbeiteten Elemente

Beispielausgabe

Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
  1. Aktivieren sie eine virtuelle Python-Umgebung.

    python -m venv venv
    /venv/Scripts/activate
    
  2. Installieren Sie die erforderlichen Pakete.

    pip install -r requirements.txt
    
  3. Starten Sie den Arbeiter.

    python worker.py
    

    Erwartete Ausgabe

    Die Ausgabe zeigt, dass der Worker gestartet wurde und auf Arbeitselemente wartet.

    Starting Fan Out/Fan In pattern worker...
    Using taskhub: default
    Using endpoint: http://localhost:8080
    Starting gRPC worker that connects to http://localhost:8080
    Successfully connected to http://localhost:8080. Waiting for work items...
    
  4. Aktivieren Sie in einem neuen Terminal die virtuelle Umgebung, und führen Sie den Client aus.

    venv/Scripts/activate
    python client.py
    

    Sie können die Anzahl der Arbeitsaufgaben als Argument angeben. Wenn kein Argument angegeben wird, führt das Beispiel standardmäßig 10 Elemente aus.

    python client.py [number_of_items]
    

Grundlegendes zur Ausgabe

Wenn Sie dieses Beispiel ausführen, erhalten Sie die Ausgabe von den Worker- und von den Clientprozessen. Entpacken Sie, was im Code passiert ist, wenn Sie das Projekt ausgeführt haben.

Arbeitsleistung

In der Workerausgabe wird Folgendes angezeigt:

  • Registrierung des Orchestrators und der Aktivitäten.
  • Statusmeldungen beim parallelen Verarbeiten jeder Arbeitsaufgabe, die anzeigt, dass sie gleichzeitig ausgeführt werden.
  • Zufällige Verzögerungen für jede Arbeitsaufgabe (zwischen 0,5 und 2 Sekunden), um unterschiedliche Verarbeitungszeiten zu simulieren.
  • Eine letzte Meldung mit der Aggregation der Ergebnisse.

Clientausgabe

Die Clientausgabe zeigt Folgendes an:

  • Die Orchestrierung beginnt mit der angegebenen Anzahl von Arbeitselementen.
  • Die eindeutige Orchestrierungsinstanz-ID.
  • Das endgültige aggregierte Ergebnis, das Folgendes umfasst:
    • Die Gesamtanzahl der verarbeiteten Elemente
    • Die Summe aller Ergebnisse (jedes Elementergebnis ist das Quadrat seines Werts)
    • Der Mittelwert aller Ergebnisse

Beispielausgabe

Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED

Erstellen Sie aus dem fan-out-fan-in Verzeichnis die Anwendung mit Gradle, und führen Sie sie aus.

./gradlew runFanOutFanInPattern

Tipp

Wenn die Fehlermeldung zsh: permission denied: ./gradlew angezeigt wird, versuchen Sie, chmod +x gradlew auszuführen, bevor Sie die Anwendung starten.

Grundlegendes zur Ausgabe

Wenn Sie dieses Beispiel ausführen, erhalten Sie eine Ausgabe, die Folgendes zeigt:

  • Registrierung des Orchestrators und der Aktivitäten.
  • Statusmeldungen beim parallelen Verarbeiten jeder Arbeitsaufgabe, die anzeigt, dass sie gleichzeitig ausgeführt werden.
  • Zufällige Verzögerungen für jede Arbeitsaufgabe (zwischen 0,5 und 2 Sekunden), um unterschiedliche Verarbeitungszeiten zu simulieren.
  • Eine letzte Meldung mit der Aggregation der Ergebnisse.

Entpacken Sie, was im Code passiert ist, wenn Sie das Projekt ausgeführt haben.

Beispielausgabe

Starting a Gradle Daemon (subsequent builds will be faster)

> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60

Sie haben das Projekt lokal ausgeführt und können sich nun damit vertraut machen, wie Sie die Bereitstellung in einer in Azure Container Apps gehosteten Azure-Instanz ausführen.

Orchestrierungsstatus und Verlauf anzeigen

Sie können den Orchestrierungsstatus und den Verlauf über das Dashboard "Durable Task Scheduler" anzeigen. Standardmäßig wird das Dashboard auf Port 8082 ausgeführt.

  1. Navigieren Sie in Ihrem Webbrowser zu http://localhost:8082.
  2. Klicken Sie auf den Standardaufgabenhub . Die von Ihnen erstellte Orchestrierungsinstanz befindet sich in der Liste.
  3. Klicken Sie auf die Orchestrierungsinstanz-ID, um die Ausführungsdetails anzuzeigen, einschließlich:
    • Parallele Ausführung mehrerer Aktivitätsaufgaben
    • Der Aggregationsschritt für das Auffächern nach innen
    • Eingabe und Ausgabe in jedem Schritt
    • Die Für jeden Schritt benötigte Zeit

Screenshot mit Details zur Orchestrierungsinstanz des .NET-Beispiels.

Screenshot der Details der Orchestrierungsinstanz für das Python-Beispiel.

Screenshot der Details der Orchestrierungsinstanz für das Java-Beispiel.

Grundlegendes zur Codestruktur

Das Arbeiterprojekt

Zum Veranschaulichen des MustersAuffächern nach außen/Auffächern nach innen erstellt die Workerprojektorchestrierung parallele Aktivitätsaufgaben und wartet auf den Abschluss aller dieser Aufgaben. Der Orchestrator:

  1. Verwendet eine Liste der Arbeitsaufgaben als Eingabe.
  2. Fächert nach außen auf, indem für jedes Arbeitselement mithilfe von ProcessWorkItemActivity eine separate Aufgabe erstellt wird.
  3. Führt alle Aufgaben parallel aus.
  4. Wartet, bis alle Aufgaben mit der Verwendung Task.WhenAllabgeschlossen werden.
  5. Fächert durch Aggregieren aller einzelnen Ergebnisse mithilfe von AggregateResultsActivity nach innen auf.
  6. Gibt das endgültige aggregierte Ergebnis an den Client zurück.

Das Workerprojekt enthält Folgendes:

  • ParallelProcessingOrchestration.cs: Definiert die Orchestrator- und Aktivitätsfunktionen in einer einzelnen Datei.
  • Program.cs: Richtet den Workerhost mit entsprechender Behandlung von Verbindungszeichenfolgen ein.

ParallelProcessingOrchestration.cs

Mithilfe von Auffächern nach außen/Auffächern nach innen erstellt die Orchestrierung parallele Aktivitätsaufgaben und wartet, bis alle abgeschlossen sind.

public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
    // Step 1: Fan-out by creating a task for each work item in parallel
    List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();

    foreach (string workItem in workItems)
    {
        // Create a task for each work item (fan-out)
        Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
            nameof(ProcessWorkItemActivity), workItem);
        processingTasks.Add(task);
    }

    // Step 2: Wait for all parallel tasks to complete
    Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

    // Step 3: Fan-in by aggregating all results
    Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
        nameof(AggregateResultsActivity), results);

    return aggregatedResults;
}

Jede Aktivität wird als separate Klasse implementiert, die mit dem [DurableTask] Attribut versehen ist.

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    // Implementation processes a single work item
}

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    // Implementation aggregates individual results
}

Program.cs

Der Worker verwendet Microsoft.Extensions.Hosting für die ordnungsgemäße Lebenszyklusverwaltung.

using Microsoft.Extensions.Hosting;
//..

builder.Services.AddDurableTaskWorker()
    .AddTasks(registry =>
    {
        registry.AddOrchestrator<ParallelProcessingOrchestration>();
        registry.AddActivity<ProcessWorkItemActivity>();
        registry.AddActivity<AggregateResultsActivity>();
    })
    .UseDurableTaskScheduler(connectionString);

Das Clientprojekt

Das Clientprojekt:

  • Verwendet dieselbe Verbindungszeichenfolgenlogik wie der Worker.
  • Erstellt eine Liste der Arbeitsaufgaben, die parallel verarbeitet werden sollen.
  • Plant eine Orchestrierungsinstanz mit der Liste als Eingabe.
  • Wartet, bis die Orchestrierung abgeschlossen ist, und zeigt die aggregierten Ergebnisse an.
  • Verwendet WaitForInstanceCompletionAsync für den effizienten Abruf.
List<string> workItems = new List<string>
{
    "Task1",
    "Task2",
    "Task3",
    "LongerTask4",
    "VeryLongTask5"
};

// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    "ParallelProcessingOrchestration", 
    workItems);

// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true,
    cts.Token);

worker.py

Zum Veranschaulichen des MustersAuffächern nach außen/Auffächern nach innen erstellt die Workerprojektorchestrierung parallele Aktivitätsaufgaben und wartet auf den Abschluss aller dieser Aufgaben. Der Orchestrator:

  1. Empfängt eine Liste der Arbeitsaufgaben als Eingabe.
  2. Er fächert nach außen auf, indem parallele Aufgaben für jedes Arbeitselement erstellt werden (durch Aufrufen von process_work_item für die einzelnen Arbeitselemente).
  3. Es wartet, bis alle Aufgaben mit der Verwendung task.when_allabgeschlossen wurden.
  4. Anschließend fächert er nach innen auf, in dem die Ergebnisse mit der aggregate_results-Aktivität aggregiert werden.
  5. Das endgültige aggregierte Ergebnis wird an den Client zurückgegeben.

Mithilfe von Auffächern nach außen/Auffächern nach innen erstellt die Orchestrierung parallele Aktivitätsaufgaben und wartet, bis alle abgeschlossen sind.

# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
    logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")

    # Fan out: Create a task for each work item
    parallel_tasks = []
    for item in work_items:
        parallel_tasks.append(ctx.call_activity("process_work_item", input=item))

    # Wait for all tasks to complete
    logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
    results = yield task.when_all(parallel_tasks)

    # Fan in: Aggregate all the results
    logger.info("All parallel tasks completed, aggregating results")
    final_result = yield ctx.call_activity("aggregate_results", input=results)

    return final_result

client.py

Das Clientprojekt:

  • Verwendet dieselbe Verbindungszeichenfolgenlogik wie der Worker.
  • Erstellt eine Liste der Arbeitsaufgaben, die parallel verarbeitet werden sollen.
  • Plant eine Orchestrierungsinstanz mit der Liste als Eingabe.
  • Wartet, bis die Orchestrierung abgeschlossen ist, und zeigt die aggregierten Ergebnisse an.
  • Verwendet wait_for_orchestration_completion für den effizienten Abruf.
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))

logger.info(f"Starting new fan out/fan in orchestration with {count} work items")

# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
    "fan_out_fan_in_orchestrator", 
    input=work_items
)

logger.info(f"Started orchestration with ID = {instance_id}")

# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
    instance_id,
    timeout=60
)

Zum Veranschaulichen des MustersAuffächern nach außen/Auffächern nach innen erstellt die Projektorchestrierung FanOutFanInPattern parallele Aktivitätsaufgaben und wartet auf den Abschluss aller dieser Aufgaben. Der Orchestrator:

  1. Verwendet eine Liste der Arbeitsaufgaben als Eingabe.
  2. Fächert nach außen auf, indem für jedes Arbeitselement mithilfe von `` eine separate Aufgabe erstellt wird.
  3. Führt alle Aufgaben parallel aus.
  4. Wartet, bis alle Aufgaben mithilfe von `` abgeschlossen wurden.
  5. Fächert durch Aggregieren aller einzelnen Ergebnisse mithilfe von `` nach innen auf.
  6. Gibt das endgültige aggregierte Ergebnis an den Client zurück.

Das Projekt enthält:

  • DurableTaskSchedulerWorkerExtensions-Worker: Definiert den Orchestrator und Aktivitätsfunktionen.
  • DurableTaskSchedulerClientExtension-Client: Richtet den Workerhost mit entsprechender Behandlung von Verbindungszeichenfolgen ein.

Arbeiter

Mithilfe von Auffächern nach außen/Auffächern nach innen erstellt die Orchestrierung parallele Aktivitätsaufgaben und wartet, bis alle abgeschlossen sind.

DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "FanOutFanIn_WordCount"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                List<?> inputs = ctx.getInput(List.class);
                List<Task<Integer>> tasks = inputs.stream()
                        .map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
                        .collect(Collectors.toList());
                List<Integer> allWordCountResults = ctx.allOf(tasks).await();
                int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
                ctx.complete(totalWordCount);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "CountWords"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringTokenizer tokenizer = new StringTokenizer(input);
                return tokenizer.countTokens();
            };
        }
    })
    .build();

// Start the worker
worker.start();

Kunde

Das Clientprojekt:

  • Verwendet dieselbe Verbindungszeichenfolgenlogik wie der Worker.
  • Erstellt eine Liste der Arbeitsaufgaben, die parallel verarbeitet werden sollen.
  • Plant eine Orchestrierungsinstanz mit der Liste als Eingabe.
  • Wartet, bis die Orchestrierung abgeschlossen ist, und zeigt die aggregierten Ergebnisse an.
  • Verwendet waitForInstanceCompletion für den effizienten Abruf.
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();

// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
        "Hello, world!",
        "The quick brown fox jumps over the lazy dog.",
        "If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
        "The greatest glory in living lies not in never falling, but in rising every time we fall.",
        "Always remember that you are absolutely unique. Just like everyone else.");

// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
        "FanOutFanIn_WordCount",
        new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));

Nächste Schritte

Nachdem Sie das Beispiel lokal mithilfe des Emulators für dauerhafte Aufgabenplanung ausgeführt haben, versuchen Sie, eine Scheduler- und Task Hub-Ressource zu erstellen und in Azure-Container-Apps bereitzustellen.