Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Die SDKs für dauerhafte Aufgaben bieten eine einfache Clientbibliothek für den Durable Task Scheduler. In dieser Schnellstartanleitung erfahren Sie, wie Sie Orchestrierungen erstellen, die das Aus-/Fan-In-Anwendungsmuster verwenden, um parallele Verarbeitung durchzuführen.
Von Bedeutung
Derzeit sind die sdKs für dauerhafte Aufgaben für JavaScript und PowerShell nicht verfügbar.
Von Bedeutung
Derzeit sind die sdKs für dauerhafte Aufgaben für JavaScript und PowerShell nicht verfügbar.
- Richten Sie den Emulator "Durable Task Scheduler" für die lokale Entwicklung ein und führen Sie diesen aus.
- Führen Sie die Arbeiter- und Clientprojekte aus.
- Überprüfen Sie den Status und die Historie der Orchestrierung über das Dashboard des Durable Task Schedulers.
Voraussetzungen
Bevor Sie beginnen:
- Stellen Sie sicher, dass Sie über .NET 8 SDK oder höher verfügen.
- Installieren Sie Docker zum Ausführen des Emulators.
- Klonen Sie das GitHub-Repository "Durable Task Scheduler ", um das Schnellstartbeispiel zu verwenden.
- Stellen Sie sicher, dass Sie Python 3.9+ oder höher haben.
- Installieren Sie Docker zum Ausführen des Emulators.
- Klonen Sie das GitHub-Repository "Durable Task Scheduler ", um das Schnellstartbeispiel zu verwenden.
- Stellen Sie sicher, dass Sie Über Java 8 oder 11 verfügen.
- Installieren Sie Docker zum Ausführen des Emulators.
- Klonen Sie das GitHub-Repository "Durable Task Scheduler ", um das Schnellstartbeispiel zu verwenden.
Einrichten des Emulators "Durable Task Scheduler"
Der Anwendungscode sucht nach einer bereitgestellten Scheduler- und Task Hub-Ressource. Werden diese nicht gefunden, greift der Code auf den Emulator zurück. Der Emulator simuliert einen Planer und einen Aufgabenhub in einem Docker-Container, sodass er ideal für die lokale Entwicklung geeignet ist, die in dieser Schnellstartanleitung erforderlich ist.
Navigieren Sie im
Azure-Samples/Durable-Task-SchedulerStammverzeichnis zum .NET SDK-Beispielverzeichnis.cd samples/durable-task-sdks/dotnet/FanOutFanInPullen Sie das Docker-Image für den Emulator:
docker pull mcr.microsoft.com/dts/dts-emulator:latestFühren Sie den Emulator aus. Der Container kann einige Sekunden dauern, bis er bereit ist.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Da der Beispielcode automatisch die Standardeinstellungen des Emulators verwendet, müssen Sie keine Umgebungsvariablen festlegen. Die Standard-Emulatoreinstellungen für diese Schnellstartanleitung sind:
- Endpunkt:
http://localhost:8080 - Aufgabenhub:
default
Navigieren Sie im
Azure-Samples/Durable-Task-SchedulerStammverzeichnis zum Python SDK-Beispielverzeichnis.cd samples/durable-task-sdks/python/fan-out-fan-inPullen Sie das Docker-Image für den Emulator:
docker pull mcr.microsoft.com/dts/dts-emulator:latestFühren Sie den Emulator aus. Der Container kann einige Sekunden dauern, bis er bereit ist.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Da der Beispielcode automatisch die Standardeinstellungen des Emulators verwendet, müssen Sie keine Umgebungsvariablen festlegen. Die Standard-Emulatoreinstellungen für diese Schnellstartanleitung sind:
- Endpunkt:
http://localhost:8080 - Aufgabenhub:
default
Navigieren Sie im
Azure-Samples/Durable-Task-SchedulerStammverzeichnis zum Java SDK-Beispielverzeichnis.cd samples/durable-task-sdks/java/fan-out-fan-inPullen Sie das Docker-Image für den Emulator:
docker pull mcr.microsoft.com/dts/dts-emulator:latestFühren Sie den Emulator aus. Der Container kann einige Sekunden dauern, bis er bereit ist.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Da der Beispielcode automatisch die Standardeinstellungen des Emulators verwendet, müssen Sie keine Umgebungsvariablen festlegen. Die Standard-Emulatoreinstellungen für diese Schnellstartanleitung sind:
- Endpunkt:
http://localhost:8080 - Aufgabenhub:
default
Den Schnellstart ausführen
Navigieren Sie im Verzeichnis
FanOutFanInzum VerzeichnisWorker, um den Worker zu erstellen und auszuführen.cd Worker dotnet build dotnet runNavigieren Sie in einem separaten Terminal vom
FanOutFanInVerzeichnis zumClientVerzeichnis, um den Client zu erstellen und auszuführen.cd Client dotnet build dotnet run
Grundlegendes zur Ausgabe
Wenn Sie dieses Beispiel ausführen, erhalten Sie die Ausgabe von den Worker- und von den Clientprozessen. Entpacken Sie, was im Code passiert ist, wenn Sie das Projekt ausgeführt haben.
Arbeitsleistung
In der Workerausgabe wird Folgendes angezeigt:
- Registrierung des Orchestrators und der Aktivitäten
- Protokollierung beim Aufruf jeder Aktivität
- Parallele Verarbeitung mehrerer Arbeitsaufgaben
- Endgültige Aggregation der Ergebnisse
Clientausgabe
Die Clientausgabe zeigt Folgendes an:
- Die Orchestrierung beginnt mit einer Liste von Arbeitselementen.
- Die eindeutige Orchestrierungsinstanz-ID
- Die endgültigen aggregierten Ergebnisse mit den einzelnen Arbeitsaufgaben und dem entsprechenden Ergebnis
- Gesamtanzahl der verarbeiteten Elemente
Beispielausgabe
Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
Aktivieren sie eine virtuelle Python-Umgebung.
Installieren Sie die erforderlichen Pakete.
pip install -r requirements.txtStarten Sie den Arbeiter.
python worker.pyErwartete Ausgabe
Die Ausgabe zeigt, dass der Worker gestartet wurde und auf Arbeitselemente wartet.
Starting Fan Out/Fan In pattern worker... Using taskhub: default Using endpoint: http://localhost:8080 Starting gRPC worker that connects to http://localhost:8080 Successfully connected to http://localhost:8080. Waiting for work items...Aktivieren Sie in einem neuen Terminal die virtuelle Umgebung, und führen Sie den Client aus.
Sie können die Anzahl der Arbeitsaufgaben als Argument angeben. Wenn kein Argument angegeben wird, führt das Beispiel standardmäßig 10 Elemente aus.
python client.py [number_of_items]
Grundlegendes zur Ausgabe
Wenn Sie dieses Beispiel ausführen, erhalten Sie die Ausgabe von den Worker- und von den Clientprozessen. Entpacken Sie, was im Code passiert ist, wenn Sie das Projekt ausgeführt haben.
Arbeitsleistung
In der Workerausgabe wird Folgendes angezeigt:
- Registrierung des Orchestrators und der Aktivitäten.
- Statusmeldungen beim parallelen Verarbeiten jeder Arbeitsaufgabe, die anzeigt, dass sie gleichzeitig ausgeführt werden.
- Zufällige Verzögerungen für jede Arbeitsaufgabe (zwischen 0,5 und 2 Sekunden), um unterschiedliche Verarbeitungszeiten zu simulieren.
- Eine letzte Meldung mit der Aggregation der Ergebnisse.
Clientausgabe
Die Clientausgabe zeigt Folgendes an:
- Die Orchestrierung beginnt mit der angegebenen Anzahl von Arbeitselementen.
- Die eindeutige Orchestrierungsinstanz-ID.
- Das endgültige aggregierte Ergebnis, das Folgendes umfasst:
- Die Gesamtanzahl der verarbeiteten Elemente
- Die Summe aller Ergebnisse (jedes Elementergebnis ist das Quadrat seines Werts)
- Der Mittelwert aller Ergebnisse
Beispielausgabe
Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED
Erstellen Sie aus dem fan-out-fan-in Verzeichnis die Anwendung mit Gradle, und führen Sie sie aus.
./gradlew runFanOutFanInPattern
Tipp
Wenn die Fehlermeldung zsh: permission denied: ./gradlew angezeigt wird, versuchen Sie, chmod +x gradlew auszuführen, bevor Sie die Anwendung starten.
Grundlegendes zur Ausgabe
Wenn Sie dieses Beispiel ausführen, erhalten Sie eine Ausgabe, die Folgendes zeigt:
- Registrierung des Orchestrators und der Aktivitäten.
- Statusmeldungen beim parallelen Verarbeiten jeder Arbeitsaufgabe, die anzeigt, dass sie gleichzeitig ausgeführt werden.
- Zufällige Verzögerungen für jede Arbeitsaufgabe (zwischen 0,5 und 2 Sekunden), um unterschiedliche Verarbeitungszeiten zu simulieren.
- Eine letzte Meldung mit der Aggregation der Ergebnisse.
Entpacken Sie, was im Code passiert ist, wenn Sie das Projekt ausgeführt haben.
Beispielausgabe
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60
Sie haben das Projekt lokal ausgeführt und können sich nun damit vertraut machen, wie Sie die Bereitstellung in einer in Azure Container Apps gehosteten Azure-Instanz ausführen.
Orchestrierungsstatus und Verlauf anzeigen
Sie können den Orchestrierungsstatus und den Verlauf über das Dashboard "Durable Task Scheduler" anzeigen. Standardmäßig wird das Dashboard auf Port 8082 ausgeführt.
- Navigieren Sie in Ihrem Webbrowser zu http://localhost:8082.
- Klicken Sie auf den Standardaufgabenhub . Die von Ihnen erstellte Orchestrierungsinstanz befindet sich in der Liste.
- Klicken Sie auf die Orchestrierungsinstanz-ID, um die Ausführungsdetails anzuzeigen, einschließlich:
- Parallele Ausführung mehrerer Aktivitätsaufgaben
- Der Aggregationsschritt für das Auffächern nach innen
- Eingabe und Ausgabe in jedem Schritt
- Die Für jeden Schritt benötigte Zeit
Grundlegendes zur Codestruktur
Das Arbeiterprojekt
Zum Veranschaulichen des MustersAuffächern nach außen/Auffächern nach innen erstellt die Workerprojektorchestrierung parallele Aktivitätsaufgaben und wartet auf den Abschluss aller dieser Aufgaben. Der Orchestrator:
- Verwendet eine Liste der Arbeitsaufgaben als Eingabe.
- Fächert nach außen auf, indem für jedes Arbeitselement mithilfe von
ProcessWorkItemActivityeine separate Aufgabe erstellt wird. - Führt alle Aufgaben parallel aus.
- Wartet, bis alle Aufgaben mit der Verwendung
Task.WhenAllabgeschlossen werden. - Fächert durch Aggregieren aller einzelnen Ergebnisse mithilfe von
AggregateResultsActivitynach innen auf. - Gibt das endgültige aggregierte Ergebnis an den Client zurück.
Das Workerprojekt enthält Folgendes:
- ParallelProcessingOrchestration.cs: Definiert die Orchestrator- und Aktivitätsfunktionen in einer einzelnen Datei.
- Program.cs: Richtet den Workerhost mit entsprechender Behandlung von Verbindungszeichenfolgen ein.
ParallelProcessingOrchestration.cs
Mithilfe von Auffächern nach außen/Auffächern nach innen erstellt die Orchestrierung parallele Aktivitätsaufgaben und wartet, bis alle abgeschlossen sind.
public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
Jede Aktivität wird als separate Klasse implementiert, die mit dem [DurableTask] Attribut versehen ist.
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
// Implementation processes a single work item
}
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
// Implementation aggregates individual results
}
Program.cs
Der Worker verwendet Microsoft.Extensions.Hosting für die ordnungsgemäße Lebenszyklusverwaltung.
using Microsoft.Extensions.Hosting;
//..
builder.Services.AddDurableTaskWorker()
.AddTasks(registry =>
{
registry.AddOrchestrator<ParallelProcessingOrchestration>();
registry.AddActivity<ProcessWorkItemActivity>();
registry.AddActivity<AggregateResultsActivity>();
})
.UseDurableTaskScheduler(connectionString);
Das Clientprojekt
Das Clientprojekt:
- Verwendet dieselbe Verbindungszeichenfolgenlogik wie der Worker.
- Erstellt eine Liste der Arbeitsaufgaben, die parallel verarbeitet werden sollen.
- Plant eine Orchestrierungsinstanz mit der Liste als Eingabe.
- Wartet, bis die Orchestrierung abgeschlossen ist, und zeigt die aggregierten Ergebnisse an.
- Verwendet
WaitForInstanceCompletionAsyncfür den effizienten Abruf.
List<string> workItems = new List<string>
{
"Task1",
"Task2",
"Task3",
"LongerTask4",
"VeryLongTask5"
};
// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"ParallelProcessingOrchestration",
workItems);
// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
instanceId,
getInputsAndOutputs: true,
cts.Token);
worker.py
Zum Veranschaulichen des MustersAuffächern nach außen/Auffächern nach innen erstellt die Workerprojektorchestrierung parallele Aktivitätsaufgaben und wartet auf den Abschluss aller dieser Aufgaben. Der Orchestrator:
- Empfängt eine Liste der Arbeitsaufgaben als Eingabe.
- Er fächert nach außen auf, indem parallele Aufgaben für jedes Arbeitselement erstellt werden (durch Aufrufen von
process_work_itemfür die einzelnen Arbeitselemente). - Es wartet, bis alle Aufgaben mit der Verwendung
task.when_allabgeschlossen wurden. - Anschließend fächert er nach innen auf, in dem die Ergebnisse mit der
aggregate_results-Aktivität aggregiert werden. - Das endgültige aggregierte Ergebnis wird an den Client zurückgegeben.
Mithilfe von Auffächern nach außen/Auffächern nach innen erstellt die Orchestrierung parallele Aktivitätsaufgaben und wartet, bis alle abgeschlossen sind.
# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")
# Fan out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity("process_work_item", input=item))
# Wait for all tasks to complete
logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
results = yield task.when_all(parallel_tasks)
# Fan in: Aggregate all the results
logger.info("All parallel tasks completed, aggregating results")
final_result = yield ctx.call_activity("aggregate_results", input=results)
return final_result
client.py
Das Clientprojekt:
- Verwendet dieselbe Verbindungszeichenfolgenlogik wie der Worker.
- Erstellt eine Liste der Arbeitsaufgaben, die parallel verarbeitet werden sollen.
- Plant eine Orchestrierungsinstanz mit der Liste als Eingabe.
- Wartet, bis die Orchestrierung abgeschlossen ist, und zeigt die aggregierten Ergebnisse an.
- Verwendet
wait_for_orchestration_completionfür den effizienten Abruf.
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))
logger.info(f"Starting new fan out/fan in orchestration with {count} work items")
# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
"fan_out_fan_in_orchestrator",
input=work_items
)
logger.info(f"Started orchestration with ID = {instance_id}")
# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=60
)
Zum Veranschaulichen des MustersAuffächern nach außen/Auffächern nach innen erstellt die Projektorchestrierung FanOutFanInPattern parallele Aktivitätsaufgaben und wartet auf den Abschluss aller dieser Aufgaben. Der Orchestrator:
- Verwendet eine Liste der Arbeitsaufgaben als Eingabe.
- Fächert nach außen auf, indem für jedes Arbeitselement mithilfe von `` eine separate Aufgabe erstellt wird.
- Führt alle Aufgaben parallel aus.
- Wartet, bis alle Aufgaben mithilfe von `` abgeschlossen wurden.
- Fächert durch Aggregieren aller einzelnen Ergebnisse mithilfe von `` nach innen auf.
- Gibt das endgültige aggregierte Ergebnis an den Client zurück.
Das Projekt enthält:
-
DurableTaskSchedulerWorkerExtensions-Worker: Definiert den Orchestrator und Aktivitätsfunktionen. -
DurableTaskSchedulerClientExtension-Client: Richtet den Workerhost mit entsprechender Behandlung von Verbindungszeichenfolgen ein.
Arbeiter
Mithilfe von Auffächern nach außen/Auffächern nach innen erstellt die Orchestrierung parallele Aktivitätsaufgaben und wartet, bis alle abgeschlossen sind.
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
List<Integer> allWordCountResults = ctx.allOf(tasks).await();
int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalWordCount);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
.build();
// Start the worker
worker.start();
Kunde
Das Clientprojekt:
- Verwendet dieselbe Verbindungszeichenfolgenlogik wie der Worker.
- Erstellt eine Liste der Arbeitsaufgaben, die parallel verarbeitet werden sollen.
- Plant eine Orchestrierungsinstanz mit der Liste als Eingabe.
- Wartet, bis die Orchestrierung abgeschlossen ist, und zeigt die aggregierten Ergebnisse an.
- Verwendet
waitForInstanceCompletionfür den effizienten Abruf.
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();
// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
"The greatest glory in living lies not in never falling, but in rising every time we fall.",
"Always remember that you are absolutely unique. Just like everyone else.");
// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));
Nächste Schritte
Nachdem Sie das Beispiel lokal mithilfe des Emulators für dauerhafte Aufgabenplanung ausgeführt haben, versuchen Sie, eine Scheduler- und Task Hub-Ressource zu erstellen und in Azure-Container-Apps bereitzustellen.