Udostępnij przez


Szybki start: tworzenie aplikacji przy użyciu trwałych zestawów SDK do zadań i narzędzia Durable Task Scheduler

Zestawy SDK Durable Task udostępniają uproszczoną bibliotekę klienta dla harmonogramu Durable Task. W tym szybkim startcie dowiesz się, jak tworzyć orkiestracje korzystające ze wzorca aplikacji fan-out/fan-in do wykonywania przetwarzania równoległego.

Ważne

Obecnie zestawy SDK Durable Task nie są dostępne dla języków JavaScript i PowerShell.

Ważne

Obecnie zestawy SDK Durable Task nie są dostępne dla języków JavaScript i PowerShell.

  • Skonfiguruj i uruchom emulator narzędzia Durable Task Scheduler na potrzeby programowania lokalnego.
  • Uruchom projekty pracowników i klientów.
  • Przejrzyj stan orkiestracji i historię za pomocą pulpitu nawigacyjnego narzędzia Durable Task Scheduler.

Wymagania wstępne

Przed rozpoczęciem:

Konfigurowanie emulatora harmonogramu zadań Durable Task Scheduler

Kod aplikacji poszukuje wdrożonego harmonogramu oraz zasobów centrum zadań. Jeśli żaden z nich nie zostanie znaleziony, kod wróci do emulatora. Emulator symuluje centrum harmonogramu i zadań w kontenerze platformy Docker, co czyni go idealnym rozwiązaniem dla lokalnego programowania wymaganego w tym przewodniku Szybki start.

  1. Azure-Samples/Durable-Task-Scheduler W katalogu głównym przejdź do przykładowego katalogu zestawu SDK platformy .NET.

    cd samples/durable-task-sdks/dotnet/FanOutFanIn
    
  2. Pobierz obraz Dockera dla emulatora.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Uruchom emulator. Przygotowanie kontenera może potrwać kilka sekund.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Ponieważ przykładowy kod automatycznie używa domyślnych ustawień emulatora, nie trzeba ustawiać żadnych zmiennych środowiskowych. Domyślne ustawienia emulatora dla przewodnika "Szybki start" to:

  • Punkt końcowy: http://localhost:8080
  • Centrum zadań: default
  1. Azure-Samples/Durable-Task-Scheduler W katalogu głównym przejdź do przykładowego katalogu zestawu SDK języka Python.

    cd samples/durable-task-sdks/python/fan-out-fan-in
    
  2. Pobierz obraz Dockera dla emulatora.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Uruchom emulator. Przygotowanie kontenera może potrwać kilka sekund.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Ponieważ przykładowy kod automatycznie używa domyślnych ustawień emulatora, nie trzeba ustawiać żadnych zmiennych środowiskowych. Domyślne ustawienia emulatora dla przewodnika "Szybki start" to:

  • Punkt końcowy: http://localhost:8080
  • Centrum zadań: default
  1. Z katalogu głównego Azure-Samples/Durable-Task-Scheduler, przejdź do przykładowego katalogu zestawu Java SDK.

    cd samples/durable-task-sdks/java/fan-out-fan-in
    
  2. Pobierz obraz Dockera dla emulatora.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Uruchom emulator. Przygotowanie kontenera może potrwać kilka sekund.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Ponieważ przykładowy kod automatycznie używa domyślnych ustawień emulatora, nie trzeba ustawiać żadnych zmiennych środowiskowych. Domyślne ustawienia emulatora dla przewodnika "Szybki start" to:

  • Punkt końcowy: http://localhost:8080
  • Centrum zadań: default

Uruchom szybki start

  1. Przejdź z katalogu FanOutFanIn do katalogu Worker, aby zbudować i uruchomić proces roboczy.

    cd Worker
    dotnet build
    dotnet run
    
  2. W osobnym terminalu przejdź z katalogu FanOutFanIn do katalogu Client, aby zbudować i uruchomić klienta.

    cd Client
    dotnet build
    dotnet run
    

Informacje o danych wyjściowych

Po uruchomieniu tego przykładu dane wyjściowe są odbierane zarówno z procesów roboczych, jak i klienckich. Rozpakuj zawartość kodu po uruchomieniu projektu.

Dane wyjściowe procesu roboczego

Dane wyjściowe pracownika pokazują:

  • Rejestracja orkiestratora i działań
  • Wpisy dziennika, gdy każde działanie jest wywoływane
  • Równoległe przetwarzanie wielu elementów roboczych
  • Końcowa agregacja wyników

Dane wyjściowe klienta

Dane wyjściowe klienta pokazują:

  • Orkiestracja rozpoczynająca się od listy elementów roboczych
  • Unikatowy identyfikator instancji orkiestracji
  • Końcowe zagregowane wyniki pokazujące każdy element roboczy i odpowiadający mu wynik
  • Łączna liczba przetworzonych elementów

Przykładowe dane wyjściowe

Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
  1. Aktywowanie środowiska wirtualnego języka Python.

    python -m venv venv
    /venv/Scripts/activate
    
  2. Zainstaluj wymagane pakiety.

    pip install -r requirements.txt
    
  3. Uruchom proces roboczy.

    python worker.py
    

    Oczekiwane dane wyjściowe

    Możesz zobaczyć dane wyjściowe wskazujące, że pracownik został uruchomiony i czeka na zadania.

    Starting Fan Out/Fan In pattern worker...
    Using taskhub: default
    Using endpoint: http://localhost:8080
    Starting gRPC worker that connects to http://localhost:8080
    Successfully connected to http://localhost:8080. Waiting for work items...
    
  4. W nowym terminalu aktywuj środowisko wirtualne i uruchom klienta.

    venv/Scripts/activate
    python client.py
    

    Liczbę elementów roboczych można podać jako argument. Jeśli argument nie zostanie podany, przykład domyślnie uruchamia 10 elementów.

    python client.py [number_of_items]
    

Informacje o danych wyjściowych

Po uruchomieniu tego przykładu dane wyjściowe są odbierane zarówno z procesów roboczych, jak i klienckich. Rozpakuj zawartość kodu po uruchomieniu projektu.

Dane wyjściowe procesu roboczego

Dane wyjściowe pracownika pokazują:

  • Rejestracja orkiestratora i działań.
  • Komunikaty o stanie podczas przetwarzania każdego elementu roboczego równolegle, pokazujące, że są wykonywane współbieżnie.
  • Losowe opóźnienia dla każdego elementu roboczego (od 0,5 do 2 sekund) w celu symulowania różnych czasów przetwarzania.
  • Końcowy komunikat przedstawiający agregację wyników.

Dane wyjściowe klienta

Dane wyjściowe klienta pokazują:

  • Aranżacja rozpoczynająca się od określonej liczby elementów roboczych.
  • Unikalny identyfikator wystąpienia orkiestracji.
  • Końcowy zagregowany wynik, który obejmuje:
    • Łączna liczba przetworzonych elementów
    • Suma wszystkich wyników (każdy wynik elementu jest kwadratem jego wartości)
    • Średnia wszystkich wyników

Przykładowe dane wyjściowe

Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED

Z katalogu fan-out-fan-in zbuduj i uruchom aplikację przy użyciu Gradle.

./gradlew runFanOutFanInPattern

Wskazówka

Jeśli zostanie wyświetlony komunikat zsh: permission denied: ./gradlew o błędzie, spróbuj uruchomić chmod +x gradlew przed uruchomieniem aplikacji.

Informacje o danych wyjściowych

Po uruchomieniu tego przykładu są wyświetlane dane wyjściowe:

  • Rejestracja orkiestratora i działań.
  • Komunikaty o stanie podczas przetwarzania każdego elementu roboczego równolegle, pokazujące, że są wykonywane współbieżnie.
  • Losowe opóźnienia dla każdego elementu roboczego (od 0,5 do 2 sekund) w celu symulowania różnych czasów przetwarzania.
  • Końcowy komunikat przedstawiający agregację wyników.

Rozpakuj zawartość kodu po uruchomieniu projektu.

Przykładowe dane wyjściowe

Starting a Gradle Daemon (subsequent builds will be faster)

> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60

Teraz, po uruchomieniu projektu lokalnie, możesz teraz dowiedzieć się, jak wdrożyć na platformie Azure hostowanej w usłudze Azure Container Apps.

Wyświetlanie stanu orkiestracji i historii

Stan i historia orkiestracji można wyświetlić za pomocą pulpitu nawigacyjnego narzędzia Durable Task Scheduler. Domyślnie pulpit nawigacyjny działa na porcie 8082.

  1. Przejdź do http://localhost:8082 w swojej przeglądarce internetowej.
  2. Kliknij domyślne centrum zadań. Utworzone wystąpienie orkiestracji znajduje się na liście.
  3. Kliknij ID wystąpienia orkiestracji, aby zobaczyć szczegóły wykonania, które obejmują:
    • Równoległe wykonywanie wielu zadań działania
    • Krok agregacji wentylatora
    • Dane wejściowe i wyjściowe w każdym kroku
    • Czas potrzebny na każdy krok

Zrzut ekranu przedstawiający szczegóły wystąpienia orkiestracji dla przykładu .NET.

Zrzut ekranu przedstawiający szczegóły wystąpienia orkiestracji dla przykładu w Pythonie.

Zrzut ekranu przedstawiający detale wystąpienia orkiestracji dla przykładu Java.

Opis struktury kodu

Projekt pracownika

Aby zademonstrować wzorzec fan-out/fan-in, orkiestracja projektu roboczego tworzy równoległe zadania operacyjne i czeka na ukończenie wszystkich zadań. Orkiestrator:

  1. Pobiera listę elementów roboczych jako dane wejściowe.
  2. Rozdziela zadania przez utworzenie oddzielnego zadania dla każdego elementu roboczego przy użyciu polecenia ProcessWorkItemActivity.
  3. Wykonuje wszystkie zadania równolegle.
  4. Czeka na ukończenie wszystkich zadań za pomocą Task.WhenAll.
  5. Grupowanie wszystkich indywidualnych wyników przy użyciu AggregateResultsActivity.
  6. Zwraca końcowy zagregowany wynik do klienta.

Projekt roboczy zawiera:

  • ParallelProcessingOrchestration.cs: definiuje funkcje orkiestratora i działania w jednym pliku.
  • Program.cs: konfiguruje hosta procesu roboczego z odpowiednią obsługą parametrów połączenia.

ParallelProcessingOrchestration.cs

Przy użyciu fan-out/fan-in orkiestracja tworzy równoległe zadania działań i czeka na ukończenie wszystkich zadań.

public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
    // Step 1: Fan-out by creating a task for each work item in parallel
    List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();

    foreach (string workItem in workItems)
    {
        // Create a task for each work item (fan-out)
        Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
            nameof(ProcessWorkItemActivity), workItem);
        processingTasks.Add(task);
    }

    // Step 2: Wait for all parallel tasks to complete
    Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

    // Step 3: Fan-in by aggregating all results
    Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
        nameof(AggregateResultsActivity), results);

    return aggregatedResults;
}

Każde działanie jest implementowane jako oddzielna klasa ozdobiona atrybutem [DurableTask] .

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    // Implementation processes a single work item
}

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    // Implementation aggregates individual results
}

Program.cs

Pracownik używa Microsoft.Extensions.Hosting do odpowiedniego zarządzania swoim cyklem życia.

using Microsoft.Extensions.Hosting;
//..

builder.Services.AddDurableTaskWorker()
    .AddTasks(registry =>
    {
        registry.AddOrchestrator<ParallelProcessingOrchestration>();
        registry.AddActivity<ProcessWorkItemActivity>();
        registry.AddActivity<AggregateResultsActivity>();
    })
    .UseDurableTaskScheduler(connectionString);

Projekt klienta

Projekt klienta:

  • Używa tej samej logiki parametrów połączenia co pracownik.
  • Tworzy listę elementów roboczych, które mają być przetwarzane równolegle.
  • Planuje instancję orkiestracji z listą jako wejściem.
  • Czeka na ukończenie aranżacji i wyświetla zagregowane wyniki.
  • Używa WaitForInstanceCompletionAsync do wydajnego sondowania.
List<string> workItems = new List<string>
{
    "Task1",
    "Task2",
    "Task3",
    "LongerTask4",
    "VeryLongTask5"
};

// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    "ParallelProcessingOrchestration", 
    workItems);

// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true,
    cts.Token);

worker.py

Aby zademonstrować wzorzec fan-out/fan-in, orkiestracja projektu roboczego tworzy równoległe zadania operacyjne i czeka na ukończenie wszystkich zadań. Orkiestrator:

  1. Odbiera listę elementów roboczych jako dane wejściowe.
  2. Rozdziela się, tworząc zadania równoległe dla każdego elementu roboczego (wywołując process_work_item dla każdego z nich).
  3. Oczekuje na ukończenie wszystkich zadań przy użyciu task.when_all.
  4. Następnie zbiega się, łącząc wyniki z działaniem aggregate_results.
  5. Końcowy zagregowany wynik jest zwracany do klienta.

Przy użyciu fan-out/fan-in orkiestracja tworzy równoległe zadania działań i czeka na ukończenie wszystkich zadań.

# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
    logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")

    # Fan out: Create a task for each work item
    parallel_tasks = []
    for item in work_items:
        parallel_tasks.append(ctx.call_activity("process_work_item", input=item))

    # Wait for all tasks to complete
    logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
    results = yield task.when_all(parallel_tasks)

    # Fan in: Aggregate all the results
    logger.info("All parallel tasks completed, aggregating results")
    final_result = yield ctx.call_activity("aggregate_results", input=results)

    return final_result

client.py

Projekt klienta:

  • Używa tej samej logiki parametrów połączenia co pracownik.
  • Tworzy listę elementów roboczych, które mają być przetwarzane równolegle.
  • Planuje instancję orkiestracji z listą jako wejściem.
  • Czeka na ukończenie aranżacji i wyświetla zagregowane wyniki.
  • Używa wait_for_orchestration_completion do wydajnego sondowania.
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))

logger.info(f"Starting new fan out/fan in orchestration with {count} work items")

# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
    "fan_out_fan_in_orchestrator", 
    input=work_items
)

logger.info(f"Started orchestration with ID = {instance_id}")

# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
    instance_id,
    timeout=60
)

Aby zademonstrować wzorzec fan-out/fan-in, FanOutFanInPattern orkiestracja projektu tworzy równoległe zadania dotyczące działań i czeka na ukończenie wszystkich. Orkiestrator:

  1. Pobiera listę elementów roboczych jako dane wejściowe.
  2. Fani, tworząc oddzielne zadanie dla każdego elementu roboczego przy użyciu elementu "".
  3. Wykonuje wszystkie zadania równolegle.
  4. Czeka na ukończenie wszystkich zadań używając "".
  5. Agregacja wyników indywidualnych dokonywana jest przez fanów przy użyciu ''.
  6. Zwraca końcowy zagregowany wynik do klienta.

Projekt zawiera:

  • DurableTaskSchedulerWorkerExtensions worker: definiuje funkcje orkiestratora i działania.
  • DurableTaskSchedulerClientExtension klient: Konfiguruje hosta roboczego z odpowiednią obsługą ciągów połączenia.

Robotnik

Przy użyciu fan-out/fan-in orkiestracja tworzy równoległe zadania działań i czeka na ukończenie wszystkich zadań.

DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "FanOutFanIn_WordCount"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                List<?> inputs = ctx.getInput(List.class);
                List<Task<Integer>> tasks = inputs.stream()
                        .map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
                        .collect(Collectors.toList());
                List<Integer> allWordCountResults = ctx.allOf(tasks).await();
                int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
                ctx.complete(totalWordCount);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "CountWords"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringTokenizer tokenizer = new StringTokenizer(input);
                return tokenizer.countTokens();
            };
        }
    })
    .build();

// Start the worker
worker.start();

Klient

Projekt klienta:

  • Używa tej samej logiki parametrów połączenia co pracownik.
  • Tworzy listę elementów roboczych, które mają być przetwarzane równolegle.
  • Planuje instancję orkiestracji z listą jako wejściem.
  • Czeka na ukończenie aranżacji i wyświetla zagregowane wyniki.
  • Używa waitForInstanceCompletion do wydajnego sondowania.
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();

// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
        "Hello, world!",
        "The quick brown fox jumps over the lazy dog.",
        "If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
        "The greatest glory in living lies not in never falling, but in rising every time we fall.",
        "Always remember that you are absolutely unique. Just like everyone else.");

// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
        "FanOutFanIn_WordCount",
        new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));

Dalsze kroki

Teraz, po uruchomieniu przykładu lokalnie przy użyciu emulatora narzędzia Durable Task Scheduler, spróbuj utworzyć zasób harmonogramu i centrum zadań oraz wdrożyć go w usłudze Azure Container Apps.