Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Les kits SDK Durable Task fournissent une bibliothèque cliente légère pour le planificateur de tâches durables. Dans ce guide de démarrage rapide, vous allez apprendre à créer des orchestrations qui utilisent le modèle d’application fan-out/fan-in pour effectuer un traitement parallèle.
Important
Actuellement, les kits SDK Durable Task ne sont pas disponibles pour JavaScript et PowerShell.
Important
Actuellement, les kits SDK Durable Task ne sont pas disponibles pour JavaScript et PowerShell.
- Configurez et exécutez l’émulateur Durable Task Scheduler pour le développement local.
- Exécutez les projets Worker et client.
- Passez en revue l’état et l’historique de l’orchestration via le tableau de bord planificateur de tâches durables.
Conditions préalables
Avant de commencer :
- Vérifiez que vous disposez du Kit de développement logiciel (SDK) .NET 8 ou version ultérieure.
- Installez Docker pour exécuter l’émulateur.
- Clonez le dépôt GitHub du Planificateur de tâches durables pour utiliser l’exemple de démarrage rapide.
- Vérifiez que vous disposez de Python 3.9+ ou version ultérieure.
- Installez Docker pour exécuter l’émulateur.
- Clonez le dépôt GitHub du Planificateur de tâches durables pour utiliser l’exemple de démarrage rapide.
- Vérifiez que vous disposez de Java 8 ou 11.
- Installez Docker pour exécuter l’émulateur.
- Clonez le dépôt GitHub du Planificateur de tâches durables pour utiliser l’exemple de démarrage rapide.
Configurer l’émulateur Durable Task Scheduler
Le code de l’application recherche une ressource de planificateur et de hub de tâches déployée. Si aucun n’est trouvé, le code revient à l’émulateur. L’émulateur simule un planificateur et un hub de tâches dans un conteneur Docker, ce qui le rend idéal pour le développement local requis dans ce guide de démarrage rapide.
À partir du
Azure-Samples/Durable-Task-Schedulerrépertoire racine, accédez à l’exemple de répertoire du Kit de développement logiciel (SDK) .NET.cd samples/durable-task-sdks/dotnet/FanOutFanInExtrayez l’image Docker de l’émulateur.
docker pull mcr.microsoft.com/dts/dts-emulator:latestExécutez l’émulateur. Le conteneur peut prendre quelques secondes pour être prêt.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Étant donné que l’exemple de code utilise automatiquement les paramètres de l’émulateur par défaut, vous n’avez pas besoin de définir de variables d’environnement. Les paramètres de l’émulateur par défaut pour ce guide de démarrage rapide sont les suivants :
- Point de terminaison :
http://localhost:8080 - Hub de tâches :
default
À partir du
Azure-Samples/Durable-Task-Schedulerrépertoire racine, accédez à l’exemple de répertoire du Kit de développement logiciel (SDK) Python.cd samples/durable-task-sdks/python/fan-out-fan-inExtrayez l’image Docker de l’émulateur.
docker pull mcr.microsoft.com/dts/dts-emulator:latestExécutez l’émulateur. Le conteneur peut prendre quelques secondes pour être prêt.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Étant donné que l’exemple de code utilise automatiquement les paramètres de l’émulateur par défaut, vous n’avez pas besoin de définir de variables d’environnement. Les paramètres de l’émulateur par défaut pour ce guide de démarrage rapide sont les suivants :
- Point de terminaison :
http://localhost:8080 - Hub de tâches :
default
À partir du
Azure-Samples/Durable-Task-Schedulerrépertoire racine, accédez à l’exemple de répertoire du Kit de développement logiciel (SDK) Java.cd samples/durable-task-sdks/java/fan-out-fan-inExtrayez l’image Docker de l’émulateur.
docker pull mcr.microsoft.com/dts/dts-emulator:latestExécutez l’émulateur. Le conteneur peut prendre quelques secondes pour être prêt.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Étant donné que l’exemple de code utilise automatiquement les paramètres de l’émulateur par défaut, vous n’avez pas besoin de définir de variables d’environnement. Les paramètres de l’émulateur par défaut pour ce guide de démarrage rapide sont les suivants :
- Point de terminaison :
http://localhost:8080 - Hub de tâches :
default
Exécuter le démarrage rapide
À partir du
FanOutFanInrépertoire, accédez auWorkerrépertoire pour générer et exécuter le worker.cd Worker dotnet build dotnet runDans un terminal distinct, à partir du
FanOutFanInrépertoire, accédez auClientrépertoire pour générer et exécuter le client.cd Client dotnet build dotnet run
Présentation de la sortie
Lorsque vous exécutez cet exemple, vous recevez la sortie des processus Worker et client. Décompressez ce qui s’est passé dans le code lorsque vous avez exécuté le projet.
Productivité des travailleurs
La sortie du Worker affiche :
- Inscription de l’orchestrateur et des activités
- Entrées de journal lorsque chaque activité est appelée
- Traitement parallèle de plusieurs éléments de travail
- Agrégation finale des résultats
Sortie du client
La sortie du client affiche :
- L'orchestration commence par une liste d'éléments de travail
- Identifiant unique d'instance d'orchestration
- Résultats agrégés finaux, montrant chaque élément de travail et son résultat correspondant
- Nombre total d’éléments traités
Exemple de sortie
Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
Activez un environnement virtuel Python.
Installez les packages requis.
pip install -r requirements.txtDémarrez le Worker.
python worker.pySortie attendue
Vous pouvez voir la sortie indiquant que le Worker a démarré et attend des éléments de travail.
Starting Fan Out/Fan In pattern worker... Using taskhub: default Using endpoint: http://localhost:8080 Starting gRPC worker that connects to http://localhost:8080 Successfully connected to http://localhost:8080. Waiting for work items...Dans un nouveau terminal, activez l’environnement virtuel et exécutez le client.
Vous pouvez fournir le nombre d’éléments de travail en tant qu’argument. Si aucun argument n’est fourni, l’exemple exécute 10 éléments par défaut.
python client.py [number_of_items]
Présentation de la sortie
Lorsque vous exécutez cet exemple, vous recevez la sortie des processus Worker et client. Décompressez ce qui s’est passé dans le code lorsque vous avez exécuté le projet.
Productivité des travailleurs
La sortie du Worker affiche :
- Inscription de l’orchestrateur et des activités.
- Messages d’état lors du traitement de chaque élément de travail en parallèle, montrant qu’ils s’exécutent simultanément.
- Retards aléatoires pour chaque élément de travail (entre 0,5 et 2 secondes) pour simuler des temps de traitement variables.
- Message final montrant l’agrégation des résultats.
Sortie du client
La sortie du client affiche :
- Orchestration commençant par le nombre spécifié d’éléments de travail.
- Identifiant unique de l'instance d'orchestration.
- Résultat agrégé final, qui inclut :
- Nombre total d’éléments traités
- La somme de tous les résultats (le résultat de chaque élément est le carré de sa valeur)
- Moyenne de tous les résultats
Exemple de sortie
Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED
À partir du fan-out-fan-in répertoire, générez et exécutez l’application à l’aide de Gradle.
./gradlew runFanOutFanInPattern
Conseil / Astuce
Si vous recevez le message zsh: permission denied: ./gradlewd’erreur, essayez d’exécuter chmod +x gradlew avant d’exécuter l’application.
Présentation de la sortie
Lorsque vous exécutez cet exemple, vous recevez une sortie qui affiche :
- Inscription de l’orchestrateur et des activités.
- Messages d’état lors du traitement de chaque élément de travail en parallèle, montrant qu’ils s’exécutent simultanément.
- Retards aléatoires pour chaque élément de travail (entre 0,5 et 2 secondes) pour simuler des temps de traitement variables.
- Message final montrant l’agrégation des résultats.
Décompressez ce qui s’est passé dans le code lorsque vous avez exécuté le projet.
Exemple de sortie
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60
Maintenant que vous avez exécuté le projet localement, vous pouvez maintenant apprendre à déployer sur Azure hébergé dans Azure Container Apps.
Afficher l’état et l’historique de l’orchestration
Vous pouvez afficher l’état et l’historique de l’orchestration via le tableau de bord planificateur de tâches durables. Par défaut, le tableau de bord s’exécute sur le port 8082.
- Accédez à http://localhost:8082 dans votre navigateur web.
- Cliquez sur le hub de tâches par défaut . L’instance d’orchestration que vous avez créée se trouve dans la liste.
- Cliquez sur l’ID d’instance d’orchestration pour afficher les détails de l’exécution, notamment :
- Exécution parallèle de plusieurs tâches d’activité
- Étape d’agrégation fan-in
- Entrée et sortie à chaque étape
- Temps nécessaire pour chaque étape
Présentation de la structure de code
Projet du travailleur
Pour illustrer le modèle fan-out/fan-in, l’orchestration du projet Worker crée des tâches d’activité parallèles et attend que tout se termine. Orchestrateur :
- Prend une liste d’éléments de travail comme entrée.
- Effectue un fan-out en créant une tâche distincte pour chaque élément de travail à l’aide de
ProcessWorkItemActivity. - Exécute toutes les tâches en parallèle.
- Attend que toutes les tâches se terminent à l’aide de
Task.WhenAll. - Effectue un fan-in en agrégeant tous les résultats individuels à l’aide de
AggregateResultsActivity. - Retourne le résultat agrégé final au client.
Le projet Worker contient :
- ParallelProcessingOrchestration.cs : définit les fonctions d’orchestrateur et d’activité dans un seul fichier.
- Program.cs : configure l’hôte worker avec une gestion appropriée des chaînes de connexion.
ParallelProcessingOrchestration.cs
En utilisant le processus du fan-out/fan-in, l’orchestration crée des tâches d’activité parallèles et attend qu’elles soient toutes terminées.
public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
Chaque activité est implémentée en tant que classe distincte décorée avec l’attribut [DurableTask] .
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
// Implementation processes a single work item
}
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
// Implementation aggregates individual results
}
Program.cs
Le travailleur utilise Microsoft.Extensions.Hosting pour gérer correctement le cycle de vie.
using Microsoft.Extensions.Hosting;
//..
builder.Services.AddDurableTaskWorker()
.AddTasks(registry =>
{
registry.AddOrchestrator<ParallelProcessingOrchestration>();
registry.AddActivity<ProcessWorkItemActivity>();
registry.AddActivity<AggregateResultsActivity>();
})
.UseDurableTaskScheduler(connectionString);
Projet client
Projet client :
- Utilise la même logique de chaîne de connexion que le travailleur.
- Crée une liste d’éléments de travail à traiter en parallèle.
- Planifie une instance d’orchestration avec la liste comme entrée.
- Attend que l’orchestration se termine et affiche les résultats agrégés.
- Utilise
WaitForInstanceCompletionAsyncpour un sondage efficace.
List<string> workItems = new List<string>
{
"Task1",
"Task2",
"Task3",
"LongerTask4",
"VeryLongTask5"
};
// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"ParallelProcessingOrchestration",
workItems);
// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
instanceId,
getInputsAndOutputs: true,
cts.Token);
worker.py
Pour illustrer le modèle fan-out/fan-in, l’orchestration du projet Worker crée des tâches d’activité parallèles et attend que tout se termine. Orchestrateur :
- Reçoit une liste d’éléments de travail en entrée.
- Il effectue un « fan-out » en créant des tâches parallèles pour chaque élément de travail (en appelant
process_work_itempour chacun d’entre eux). - Il attend que toutes les tâches se terminent en utilisant
task.when_all. - Il effectue ensuite un « fan-in » en agrégeant les résultats avec l’activité
aggregate_results. - Le résultat agrégé final est retourné au client.
En utilisant le processus du fan-out/fan-in, l’orchestration crée des tâches d’activité parallèles et attend qu’elles soient toutes terminées.
# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")
# Fan out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity("process_work_item", input=item))
# Wait for all tasks to complete
logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
results = yield task.when_all(parallel_tasks)
# Fan in: Aggregate all the results
logger.info("All parallel tasks completed, aggregating results")
final_result = yield ctx.call_activity("aggregate_results", input=results)
return final_result
client.py
Projet client :
- Utilise la même logique de chaîne de connexion que le travailleur.
- Crée une liste d’éléments de travail à traiter en parallèle.
- Planifie une instance d’orchestration avec la liste comme entrée.
- Attend que l’orchestration se termine et affiche les résultats agrégés.
- Utilise
wait_for_orchestration_completionpour un sondage efficace.
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))
logger.info(f"Starting new fan out/fan in orchestration with {count} work items")
# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
"fan_out_fan_in_orchestrator",
input=work_items
)
logger.info(f"Started orchestration with ID = {instance_id}")
# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=60
)
Pour illustrer le modèle fan-out/fan-in, l’orchestration du projet FanOutFanInPattern crée des tâches d’activité parallèles et attend qu’elles soient toutes terminées. Orchestrateur :
- Prend une liste d’éléments de travail comme entrée.
- Effectue un fan-out en créant une tâche distincte pour chaque élément de travail à l’aide de ``.
- Exécute toutes les tâches en parallèle.
- Attend que toutes les tâches se terminent en utilisant ``.
- Effectue un fan-in en agrégeant tous les résultats individuels à l’aide de ``.
- Retourne le résultat agrégé final au client.
Le projet contient :
-
Worker
DurableTaskSchedulerWorkerExtensions: définit les fonctions d’orchestrateur et d’activité. -
Client
DurableTaskSchedulerClientExtension: configure l’hôte Worker avec une gestion appropriée des chaînes de connexion.
Travailleur
En utilisant le processus du fan-out/fan-in, l’orchestration crée des tâches d’activité parallèles et attend qu’elles soient toutes terminées.
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
List<Integer> allWordCountResults = ctx.allOf(tasks).await();
int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalWordCount);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
.build();
// Start the worker
worker.start();
Client
Projet client :
- Utilise la même logique de chaîne de connexion que le travailleur.
- Crée une liste d’éléments de travail à traiter en parallèle.
- Planifie une instance d’orchestration avec la liste comme entrée.
- Attend que l’orchestration se termine et affiche les résultats agrégés.
- Utilise
waitForInstanceCompletionpour un sondage efficace.
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();
// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
"The greatest glory in living lies not in never falling, but in rising every time we fall.",
"Always remember that you are absolutely unique. Just like everyone else.");
// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));
Étapes suivantes
Maintenant que vous avez exécuté l’exemple localement à l’aide de l’émulateur Planificateur de tâches durables, essayez de créer un planificateur et une ressource de hub de tâches et de déployer sur Azure Container Apps.