Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Zestawy SDK Durable Task udostępniają uproszczoną bibliotekę klienta dla harmonogramu Durable Task. W tym szybkim startcie dowiesz się, jak tworzyć orkiestracje korzystające ze wzorca aplikacji fan-out/fan-in do wykonywania przetwarzania równoległego.
Ważne
Obecnie zestawy SDK Durable Task nie są dostępne dla języków JavaScript i PowerShell.
Ważne
Obecnie zestawy SDK Durable Task nie są dostępne dla języków JavaScript i PowerShell.
- Skonfiguruj i uruchom emulator narzędzia Durable Task Scheduler na potrzeby programowania lokalnego.
- Uruchom projekty pracowników i klientów.
- Przejrzyj stan orkiestracji i historię za pomocą pulpitu nawigacyjnego narzędzia Durable Task Scheduler.
Wymagania wstępne
Przed rozpoczęciem:
- Upewnij się, że masz zestaw .NET 8 SDK lub nowszy.
- Zainstaluj platformę Docker na potrzeby uruchamiania emulatora.
- Sklonuj repozytorium GitHub Durable Task Scheduler , aby użyć przykładu szybkiego startu.
- Upewnij się, że masz środowisko Python w wersji 3.9 lub nowszej.
- Zainstaluj platformę Docker na potrzeby uruchamiania emulatora.
- Sklonuj repozytorium GitHub Durable Task Scheduler , aby użyć przykładu szybkiego startu.
- Upewnij się, że masz środowisko Java 8 lub 11.
- Zainstaluj platformę Docker na potrzeby uruchamiania emulatora.
- Sklonuj repozytorium GitHub Durable Task Scheduler , aby użyć przykładu szybkiego startu.
Konfigurowanie emulatora harmonogramu zadań Durable Task Scheduler
Kod aplikacji poszukuje wdrożonego harmonogramu oraz zasobów centrum zadań. Jeśli żaden z nich nie zostanie znaleziony, kod wróci do emulatora. Emulator symuluje centrum harmonogramu i zadań w kontenerze platformy Docker, co czyni go idealnym rozwiązaniem dla lokalnego programowania wymaganego w tym przewodniku Szybki start.
Azure-Samples/Durable-Task-SchedulerW katalogu głównym przejdź do przykładowego katalogu zestawu SDK platformy .NET.cd samples/durable-task-sdks/dotnet/FanOutFanInPobierz obraz Dockera dla emulatora.
docker pull mcr.microsoft.com/dts/dts-emulator:latestUruchom emulator. Przygotowanie kontenera może potrwać kilka sekund.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Ponieważ przykładowy kod automatycznie używa domyślnych ustawień emulatora, nie trzeba ustawiać żadnych zmiennych środowiskowych. Domyślne ustawienia emulatora dla przewodnika "Szybki start" to:
- Punkt końcowy:
http://localhost:8080 - Centrum zadań:
default
Azure-Samples/Durable-Task-SchedulerW katalogu głównym przejdź do przykładowego katalogu zestawu SDK języka Python.cd samples/durable-task-sdks/python/fan-out-fan-inPobierz obraz Dockera dla emulatora.
docker pull mcr.microsoft.com/dts/dts-emulator:latestUruchom emulator. Przygotowanie kontenera może potrwać kilka sekund.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Ponieważ przykładowy kod automatycznie używa domyślnych ustawień emulatora, nie trzeba ustawiać żadnych zmiennych środowiskowych. Domyślne ustawienia emulatora dla przewodnika "Szybki start" to:
- Punkt końcowy:
http://localhost:8080 - Centrum zadań:
default
Z katalogu głównego
Azure-Samples/Durable-Task-Scheduler, przejdź do przykładowego katalogu zestawu Java SDK.cd samples/durable-task-sdks/java/fan-out-fan-inPobierz obraz Dockera dla emulatora.
docker pull mcr.microsoft.com/dts/dts-emulator:latestUruchom emulator. Przygotowanie kontenera może potrwać kilka sekund.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Ponieważ przykładowy kod automatycznie używa domyślnych ustawień emulatora, nie trzeba ustawiać żadnych zmiennych środowiskowych. Domyślne ustawienia emulatora dla przewodnika "Szybki start" to:
- Punkt końcowy:
http://localhost:8080 - Centrum zadań:
default
Uruchom szybki start
Przejdź z katalogu
FanOutFanIndo kataloguWorker, aby zbudować i uruchomić proces roboczy.cd Worker dotnet build dotnet runW osobnym terminalu przejdź z katalogu
FanOutFanIndo kataloguClient, aby zbudować i uruchomić klienta.cd Client dotnet build dotnet run
Informacje o danych wyjściowych
Po uruchomieniu tego przykładu dane wyjściowe są odbierane zarówno z procesów roboczych, jak i klienckich. Rozpakuj zawartość kodu po uruchomieniu projektu.
Dane wyjściowe procesu roboczego
Dane wyjściowe pracownika pokazują:
- Rejestracja orkiestratora i działań
- Wpisy dziennika, gdy każde działanie jest wywoływane
- Równoległe przetwarzanie wielu elementów roboczych
- Końcowa agregacja wyników
Dane wyjściowe klienta
Dane wyjściowe klienta pokazują:
- Orkiestracja rozpoczynająca się od listy elementów roboczych
- Unikatowy identyfikator instancji orkiestracji
- Końcowe zagregowane wyniki pokazujące każdy element roboczy i odpowiadający mu wynik
- Łączna liczba przetworzonych elementów
Przykładowe dane wyjściowe
Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
Aktywowanie środowiska wirtualnego języka Python.
Zainstaluj wymagane pakiety.
pip install -r requirements.txtUruchom proces roboczy.
python worker.pyOczekiwane dane wyjściowe
Możesz zobaczyć dane wyjściowe wskazujące, że pracownik został uruchomiony i czeka na zadania.
Starting Fan Out/Fan In pattern worker... Using taskhub: default Using endpoint: http://localhost:8080 Starting gRPC worker that connects to http://localhost:8080 Successfully connected to http://localhost:8080. Waiting for work items...W nowym terminalu aktywuj środowisko wirtualne i uruchom klienta.
Liczbę elementów roboczych można podać jako argument. Jeśli argument nie zostanie podany, przykład domyślnie uruchamia 10 elementów.
python client.py [number_of_items]
Informacje o danych wyjściowych
Po uruchomieniu tego przykładu dane wyjściowe są odbierane zarówno z procesów roboczych, jak i klienckich. Rozpakuj zawartość kodu po uruchomieniu projektu.
Dane wyjściowe procesu roboczego
Dane wyjściowe pracownika pokazują:
- Rejestracja orkiestratora i działań.
- Komunikaty o stanie podczas przetwarzania każdego elementu roboczego równolegle, pokazujące, że są wykonywane współbieżnie.
- Losowe opóźnienia dla każdego elementu roboczego (od 0,5 do 2 sekund) w celu symulowania różnych czasów przetwarzania.
- Końcowy komunikat przedstawiający agregację wyników.
Dane wyjściowe klienta
Dane wyjściowe klienta pokazują:
- Aranżacja rozpoczynająca się od określonej liczby elementów roboczych.
- Unikalny identyfikator wystąpienia orkiestracji.
- Końcowy zagregowany wynik, który obejmuje:
- Łączna liczba przetworzonych elementów
- Suma wszystkich wyników (każdy wynik elementu jest kwadratem jego wartości)
- Średnia wszystkich wyników
Przykładowe dane wyjściowe
Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED
Z katalogu fan-out-fan-in zbuduj i uruchom aplikację przy użyciu Gradle.
./gradlew runFanOutFanInPattern
Wskazówka
Jeśli zostanie wyświetlony komunikat zsh: permission denied: ./gradlew o błędzie, spróbuj uruchomić chmod +x gradlew przed uruchomieniem aplikacji.
Informacje o danych wyjściowych
Po uruchomieniu tego przykładu są wyświetlane dane wyjściowe:
- Rejestracja orkiestratora i działań.
- Komunikaty o stanie podczas przetwarzania każdego elementu roboczego równolegle, pokazujące, że są wykonywane współbieżnie.
- Losowe opóźnienia dla każdego elementu roboczego (od 0,5 do 2 sekund) w celu symulowania różnych czasów przetwarzania.
- Końcowy komunikat przedstawiający agregację wyników.
Rozpakuj zawartość kodu po uruchomieniu projektu.
Przykładowe dane wyjściowe
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60
Teraz, po uruchomieniu projektu lokalnie, możesz teraz dowiedzieć się, jak wdrożyć na platformie Azure hostowanej w usłudze Azure Container Apps.
Wyświetlanie stanu orkiestracji i historii
Stan i historia orkiestracji można wyświetlić za pomocą pulpitu nawigacyjnego narzędzia Durable Task Scheduler. Domyślnie pulpit nawigacyjny działa na porcie 8082.
- Przejdź do http://localhost:8082 w swojej przeglądarce internetowej.
- Kliknij domyślne centrum zadań. Utworzone wystąpienie orkiestracji znajduje się na liście.
- Kliknij ID wystąpienia orkiestracji, aby zobaczyć szczegóły wykonania, które obejmują:
- Równoległe wykonywanie wielu zadań działania
- Krok agregacji wentylatora
- Dane wejściowe i wyjściowe w każdym kroku
- Czas potrzebny na każdy krok
Opis struktury kodu
Projekt pracownika
Aby zademonstrować wzorzec fan-out/fan-in, orkiestracja projektu roboczego tworzy równoległe zadania operacyjne i czeka na ukończenie wszystkich zadań. Orkiestrator:
- Pobiera listę elementów roboczych jako dane wejściowe.
- Rozdziela zadania przez utworzenie oddzielnego zadania dla każdego elementu roboczego przy użyciu polecenia
ProcessWorkItemActivity. - Wykonuje wszystkie zadania równolegle.
- Czeka na ukończenie wszystkich zadań za pomocą
Task.WhenAll. - Grupowanie wszystkich indywidualnych wyników przy użyciu
AggregateResultsActivity. - Zwraca końcowy zagregowany wynik do klienta.
Projekt roboczy zawiera:
- ParallelProcessingOrchestration.cs: definiuje funkcje orkiestratora i działania w jednym pliku.
- Program.cs: konfiguruje hosta procesu roboczego z odpowiednią obsługą parametrów połączenia.
ParallelProcessingOrchestration.cs
Przy użyciu fan-out/fan-in orkiestracja tworzy równoległe zadania działań i czeka na ukończenie wszystkich zadań.
public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
Każde działanie jest implementowane jako oddzielna klasa ozdobiona atrybutem [DurableTask] .
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
// Implementation processes a single work item
}
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
// Implementation aggregates individual results
}
Program.cs
Pracownik używa Microsoft.Extensions.Hosting do odpowiedniego zarządzania swoim cyklem życia.
using Microsoft.Extensions.Hosting;
//..
builder.Services.AddDurableTaskWorker()
.AddTasks(registry =>
{
registry.AddOrchestrator<ParallelProcessingOrchestration>();
registry.AddActivity<ProcessWorkItemActivity>();
registry.AddActivity<AggregateResultsActivity>();
})
.UseDurableTaskScheduler(connectionString);
Projekt klienta
Projekt klienta:
- Używa tej samej logiki parametrów połączenia co pracownik.
- Tworzy listę elementów roboczych, które mają być przetwarzane równolegle.
- Planuje instancję orkiestracji z listą jako wejściem.
- Czeka na ukończenie aranżacji i wyświetla zagregowane wyniki.
- Używa
WaitForInstanceCompletionAsyncdo wydajnego sondowania.
List<string> workItems = new List<string>
{
"Task1",
"Task2",
"Task3",
"LongerTask4",
"VeryLongTask5"
};
// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"ParallelProcessingOrchestration",
workItems);
// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
instanceId,
getInputsAndOutputs: true,
cts.Token);
worker.py
Aby zademonstrować wzorzec fan-out/fan-in, orkiestracja projektu roboczego tworzy równoległe zadania operacyjne i czeka na ukończenie wszystkich zadań. Orkiestrator:
- Odbiera listę elementów roboczych jako dane wejściowe.
- Rozdziela się, tworząc zadania równoległe dla każdego elementu roboczego (wywołując
process_work_itemdla każdego z nich). - Oczekuje na ukończenie wszystkich zadań przy użyciu
task.when_all. - Następnie zbiega się, łącząc wyniki z działaniem
aggregate_results. - Końcowy zagregowany wynik jest zwracany do klienta.
Przy użyciu fan-out/fan-in orkiestracja tworzy równoległe zadania działań i czeka na ukończenie wszystkich zadań.
# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")
# Fan out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity("process_work_item", input=item))
# Wait for all tasks to complete
logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
results = yield task.when_all(parallel_tasks)
# Fan in: Aggregate all the results
logger.info("All parallel tasks completed, aggregating results")
final_result = yield ctx.call_activity("aggregate_results", input=results)
return final_result
client.py
Projekt klienta:
- Używa tej samej logiki parametrów połączenia co pracownik.
- Tworzy listę elementów roboczych, które mają być przetwarzane równolegle.
- Planuje instancję orkiestracji z listą jako wejściem.
- Czeka na ukończenie aranżacji i wyświetla zagregowane wyniki.
- Używa
wait_for_orchestration_completiondo wydajnego sondowania.
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))
logger.info(f"Starting new fan out/fan in orchestration with {count} work items")
# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
"fan_out_fan_in_orchestrator",
input=work_items
)
logger.info(f"Started orchestration with ID = {instance_id}")
# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=60
)
Aby zademonstrować wzorzec fan-out/fan-in, FanOutFanInPattern orkiestracja projektu tworzy równoległe zadania dotyczące działań i czeka na ukończenie wszystkich. Orkiestrator:
- Pobiera listę elementów roboczych jako dane wejściowe.
- Fani, tworząc oddzielne zadanie dla każdego elementu roboczego przy użyciu elementu "".
- Wykonuje wszystkie zadania równolegle.
- Czeka na ukończenie wszystkich zadań używając "".
- Agregacja wyników indywidualnych dokonywana jest przez fanów przy użyciu ''.
- Zwraca końcowy zagregowany wynik do klienta.
Projekt zawiera:
-
DurableTaskSchedulerWorkerExtensionsworker: definiuje funkcje orkiestratora i działania. -
DurableTaskSchedulerClientExtensionklient: Konfiguruje hosta roboczego z odpowiednią obsługą ciągów połączenia.
Robotnik
Przy użyciu fan-out/fan-in orkiestracja tworzy równoległe zadania działań i czeka na ukończenie wszystkich zadań.
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
List<Integer> allWordCountResults = ctx.allOf(tasks).await();
int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalWordCount);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
.build();
// Start the worker
worker.start();
Klient
Projekt klienta:
- Używa tej samej logiki parametrów połączenia co pracownik.
- Tworzy listę elementów roboczych, które mają być przetwarzane równolegle.
- Planuje instancję orkiestracji z listą jako wejściem.
- Czeka na ukończenie aranżacji i wyświetla zagregowane wyniki.
- Używa
waitForInstanceCompletiondo wydajnego sondowania.
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();
// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
"The greatest glory in living lies not in never falling, but in rising every time we fall.",
"Always remember that you are absolutely unique. Just like everyone else.");
// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));
Dalsze kroki
Teraz, po uruchomieniu przykładu lokalnie przy użyciu emulatora narzędzia Durable Task Scheduler, spróbuj utworzyć zasób harmonogramu i centrum zadań oraz wdrożyć go w usłudze Azure Container Apps.