Partilhar via


Quickstart: Crie um aplicativo com SDKs de Tarefas Duráveis e Agendador de Tarefas Duráveis

Os SDKs de Tarefas Duráveis fornecem uma biblioteca de cliente leve para o Agendador de Tarefas Duráveis. Neste guia de início rápido, você aprenderá a criar orquestrações que usam o padrão de aplicativo fan-out/fan-in para executar processamento paralelo.

Importante

Atualmente, os SDKs de Tarefa Durável não estão disponíveis para JavaScript e PowerShell.

Importante

Atualmente, os SDKs de Tarefa Durável não estão disponíveis para JavaScript e PowerShell.

  • Configure e execute o emulador do Durable Task Scheduler para desenvolvimento local.
  • Execute os projetos de trabalhador e cliente.
  • Analise o status e o histórico da orquestração por meio do painel do Agendador de Tarefas Duráveis.

Pré-requisitos

Antes de começar:

Configurar o emulador do Durable Task Scheduler

O código do aplicativo procura um agendador implantado e um recurso de hub de tarefas. Se nenhum for encontrado, o código retornará ao emulador. O emulador simula um agendador e um hub de tarefas em um contêiner do Docker, tornando-o ideal para o desenvolvimento local necessário neste início rápido.

  1. Azure-Samples/Durable-Task-Scheduler No diretório raiz, navegue até o diretório de exemplo do SDK do .NET.

    cd samples/durable-task-sdks/dotnet/FanOutFanIn
    
  2. Puxe a imagem do Docker para o emulador.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Execute o emulador. O recipiente pode demorar alguns segundos a ficar pronto.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Como o código de exemplo usa automaticamente as configurações padrão do emulador, você não precisa definir nenhuma variável de ambiente. As configurações padrão do emulador para este início rápido são:

  • Ponto final: http://localhost:8080
  • Hub de tarefas: default
  1. Azure-Samples/Durable-Task-Scheduler No diretório raiz, navegue até o diretório de exemplo do Python SDK.

    cd samples/durable-task-sdks/python/fan-out-fan-in
    
  2. Puxe a imagem do Docker para o emulador.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Execute o emulador. O recipiente pode demorar alguns segundos a ficar pronto.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Como o código de exemplo usa automaticamente as configurações padrão do emulador, você não precisa definir nenhuma variável de ambiente. As configurações padrão do emulador para este início rápido são:

  • Ponto final: http://localhost:8080
  • Hub de tarefas: default
  1. Azure-Samples/Durable-Task-Scheduler No diretório raiz, navegue até o diretório de exemplo do Java SDK.

    cd samples/durable-task-sdks/java/fan-out-fan-in
    
  2. Puxe a imagem do Docker para o emulador.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Execute o emulador. O recipiente pode demorar alguns segundos a ficar pronto.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Como o código de exemplo usa automaticamente as configurações padrão do emulador, você não precisa definir nenhuma variável de ambiente. As configurações padrão do emulador para este início rápido são:

  • Ponto final: http://localhost:8080
  • Hub de tarefas: default

Executar o arranque rápido

  1. A partir do FanOutFanIn diretório, navegue até o Worker diretório para compilar e executar o trabalhador.

    cd Worker
    dotnet build
    dotnet run
    
  2. Em um terminal separado, a partir do FanOutFanIn diretório, navegue até o Client diretório para construir e executar o cliente.

    cd Client
    dotnet build
    dotnet run
    

Compreender os resultados

Ao executar este exemplo, recebe-se a saída tanto dos processos de trabalho quanto dos de cliente. Descompacte o que aconteceu no código quando você executou o projeto.

Produção do trabalhador

O resultado do trabalho mostra:

  • Registo do orquestrador e atividades
  • Entradas de log quando cada atividade é chamada
  • Processamento paralelo de vários itens de trabalho
  • Agregação final dos resultados

Saída do cliente

A saída do cliente mostra:

  • A orquestração inicia-se com uma lista de itens de trabalho
  • O ID único da instância de orquestração
  • Os resultados finais agregados, mostrando cada item de trabalho e seu resultado correspondente
  • Contagem total de itens processados

Exemplo de saída

Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
  1. Ative um ambiente virtual Python.

    python -m venv venv
    /venv/Scripts/activate
    
  2. Instale os pacotes necessários.

    pip install -r requirements.txt
    
  3. Inicie o trabalhador.

    python worker.py
    

    Resultados esperados

    Você pode ver a saída indicando que o trabalhador iniciou e está aguardando itens de trabalho.

    Starting Fan Out/Fan In pattern worker...
    Using taskhub: default
    Using endpoint: http://localhost:8080
    Starting gRPC worker that connects to http://localhost:8080
    Successfully connected to http://localhost:8080. Waiting for work items...
    
  4. Em um novo terminal, ative o ambiente virtual e execute o cliente.

    venv/Scripts/activate
    python client.py
    

    Você pode fornecer o número de itens de trabalho como um argumento. Se nenhum argumento for fornecido, o exemplo executará 10 itens por padrão.

    python client.py [number_of_items]
    

Compreender os resultados

Ao executar este exemplo, recebe-se a saída tanto dos processos de trabalho quanto dos de cliente. Descompacte o que aconteceu no código quando você executou o projeto.

Produção do trabalhador

O resultado do trabalho mostra:

  • Registo do orquestrador e das atividades.
  • Mensagens de status ao processar cada item de trabalho em paralelo, mostrando que eles estão sendo executados simultaneamente.
  • Atrasos aleatórios para cada item de trabalho (entre 0,5 e 2 segundos) para simular tempos de processamento variáveis.
  • Uma mensagem final mostrando a agregação dos resultados.

Saída do cliente

A saída do cliente mostra:

  • A orquestração começa com o número de itens de trabalho especificado.
  • O ID exclusivo da instância de orquestração.
  • O resultado final agregado, que inclui:
    • O número total de itens processados
    • A soma de todos os resultados (onde o resultado de cada item é calculado como o quadrado do valor do próprio item)
    • A média de todos os resultados

Exemplo de saída

Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED

A partir do diretório, compile e execute o aplicativo usando o fan-out-fan-in Gradle.

./gradlew runFanOutFanInPattern

Sugestão

Se você receber a mensagem zsh: permission denied: ./gradlewde erro , tente executar chmod +x gradlew antes de executar o aplicativo.

Compreender os resultados

Ao executar este exemplo, você recebe uma saída que mostra:

  • Registo do orquestrador e das atividades.
  • Mensagens de status ao processar cada item de trabalho em paralelo, mostrando que eles estão sendo executados simultaneamente.
  • Atrasos aleatórios para cada item de trabalho (entre 0,5 e 2 segundos) para simular tempos de processamento variáveis.
  • Uma mensagem final mostrando a agregação dos resultados.

Descompacte o que aconteceu no código quando você executou o projeto.

Exemplo de saída

Starting a Gradle Daemon (subsequent builds will be faster)

> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60

Depois de executar o projeto localmente, pode aprender como implantar aplicações no Azure usando Aplicativos de Contêiner do Azure.

Ver estado e histórico da orquestração

Você pode visualizar o status e o histórico da orquestração por meio do painel do Agendador de Tarefas Duráveis. Por padrão, o painel é executado na porta 8082.

  1. Navegue no http://localhost:8082 no seu navegador da Web.
  2. Clique no hub de tarefas padrão . A instância de orquestração que você criou está na lista.
  3. Clique no ID da instância de orquestração para visualizar os detalhes da execução, que incluem:
    • A execução paralela de tarefas de múltiplas atividades
    • A etapa de agregação fan-in
    • A entrada e saída em cada etapa
    • O tempo necessário para cada etapa

Captura de tela mostrando os detalhes da instância de orquestração para o exemplo .NET.

Captura de tela mostrando os detalhes da instância de orquestração para o exemplo Python.

Captura de tela mostrando os detalhes da instância de orquestração para a amostra Java.

Noções básicas sobre a estrutura do código

O projeto do trabalhador

Para demonstrar o padrão de fan-out/fan-in, a orquestração do projeto de trabalho cria tarefas de atividade paralelas e espera que todas sejam concluídas. O orquestrador:

  1. Usa uma lista de itens de trabalho como entrada.
  2. Expanda-se criando uma tarefa separada para cada item de trabalho usando ProcessWorkItemActivity.
  3. Executa todas as tarefas em paralelo.
  4. Aguarda que todas as tarefas sejam concluídas usando Task.WhenAll.
  5. Fãs agregando todos os resultados individuais usando AggregateResultsActivity.
  6. Devolve o resultado final agregado ao cliente.

O projeto de trabalho contém:

  • ParallelProcessingOrchestration.cs: Define o orquestrador e as funções de atividade em um único arquivo.
  • Program.cs: Configura o host de trabalho com manipulação adequada de cadeia de conexão.

ParallelProcessingOrchestration.cs

Usando o método fan-out/fan-in, a orquestração cria tarefas paralelas e aguarda a conclusão de todas elas.

public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
    // Step 1: Fan-out by creating a task for each work item in parallel
    List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();

    foreach (string workItem in workItems)
    {
        // Create a task for each work item (fan-out)
        Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
            nameof(ProcessWorkItemActivity), workItem);
        processingTasks.Add(task);
    }

    // Step 2: Wait for all parallel tasks to complete
    Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

    // Step 3: Fan-in by aggregating all results
    Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
        nameof(AggregateResultsActivity), results);

    return aggregatedResults;
}

Cada atividade é implementada como uma classe separada decorada com o [DurableTask] atributo.

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    // Implementation processes a single work item
}

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    // Implementation aggregates individual results
}

Program.cs

O trabalhador usa Microsoft.Extensions.Hosting para o gerenciamento adequado do ciclo de vida.

using Microsoft.Extensions.Hosting;
//..

builder.Services.AddDurableTaskWorker()
    .AddTasks(registry =>
    {
        registry.AddOrchestrator<ParallelProcessingOrchestration>();
        registry.AddActivity<ProcessWorkItemActivity>();
        registry.AddActivity<AggregateResultsActivity>();
    })
    .UseDurableTaskScheduler(connectionString);

O projeto do cliente

O projeto do cliente:

  • Usa a mesma lógica de cadeia de conexão que o trabalhador.
  • Cria uma lista de itens de trabalho a serem processados em paralelo.
  • Programa uma instância de orquestração com a lista como entrada.
  • Aguarda a conclusão da orquestração e exibe os resultados agregados.
  • Utiliza WaitForInstanceCompletionAsync para interrogação eficiente.
List<string> workItems = new List<string>
{
    "Task1",
    "Task2",
    "Task3",
    "LongerTask4",
    "VeryLongTask5"
};

// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    "ParallelProcessingOrchestration", 
    workItems);

// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true,
    cts.Token);

worker.py

Para demonstrar o padrão de fan-out/fan-in, a orquestração do projeto de trabalho cria tarefas de atividade paralelas e espera que todas sejam concluídas. O orquestrador:

  1. Recebe uma lista de itens de trabalho como entrada.
  2. Ele "ventila" criando tarefas paralelas para cada item de trabalho (chamando process_work_item para cada um).
  3. Ele aguarda que todas as tarefas sejam concluídas usando task.when_all.
  4. Em seguida, "consolida" agregando os resultados com a atividade aggregate_results.
  5. O resultado final agregado é devolvido ao cliente.

Usando o método fan-out/fan-in, a orquestração cria tarefas paralelas e aguarda a conclusão de todas elas.

# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
    logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")

    # Fan out: Create a task for each work item
    parallel_tasks = []
    for item in work_items:
        parallel_tasks.append(ctx.call_activity("process_work_item", input=item))

    # Wait for all tasks to complete
    logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
    results = yield task.when_all(parallel_tasks)

    # Fan in: Aggregate all the results
    logger.info("All parallel tasks completed, aggregating results")
    final_result = yield ctx.call_activity("aggregate_results", input=results)

    return final_result

client.py

O projeto do cliente:

  • Usa a mesma lógica de cadeia de conexão que o trabalhador.
  • Cria uma lista de itens de trabalho a serem processados em paralelo.
  • Programa uma instância de orquestração com a lista como entrada.
  • Aguarda a conclusão da orquestração e exibe os resultados agregados.
  • Utiliza wait_for_orchestration_completion para interrogação eficiente.
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))

logger.info(f"Starting new fan out/fan in orchestration with {count} work items")

# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
    "fan_out_fan_in_orchestrator", 
    input=work_items
)

logger.info(f"Started orchestration with ID = {instance_id}")

# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
    instance_id,
    timeout=60
)

Para demonstrar o padrão de fan-out/fan-in, a orquestração do FanOutFanInPattern projeto cria tarefas paralelas e aguarda a conclusão de todas. O orquestrador:

  1. Usa uma lista de itens de trabalho como entrada.
  2. Expande criando uma tarefa separada para cada item de trabalho usando ``.
  3. Executa todas as tarefas em paralelo.
  4. Aguarda que todas as tarefas sejam concluídas usando ''.
  5. Fãs ao agregar todos os resultados individuais usando ``.
  6. Devolve o resultado final agregado ao cliente.

O projeto contém:

  • DurableTaskSchedulerWorkerExtensions trabalhador: Define as funções do orquestrador e das atividades.
  • DurableTaskSchedulerClientExtension client: Configurar o host de trabalho com gestão adequada da cadeia de conexão.

Trabalhador

Usando o método fan-out/fan-in, a orquestração cria tarefas paralelas e aguarda a conclusão de todas elas.

DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "FanOutFanIn_WordCount"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                List<?> inputs = ctx.getInput(List.class);
                List<Task<Integer>> tasks = inputs.stream()
                        .map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
                        .collect(Collectors.toList());
                List<Integer> allWordCountResults = ctx.allOf(tasks).await();
                int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
                ctx.complete(totalWordCount);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "CountWords"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringTokenizer tokenizer = new StringTokenizer(input);
                return tokenizer.countTokens();
            };
        }
    })
    .build();

// Start the worker
worker.start();

Cliente

O projeto do cliente:

  • Usa a mesma lógica de cadeia de conexão que o trabalhador.
  • Cria uma lista de itens de trabalho a serem processados em paralelo.
  • Programa uma instância de orquestração com a lista como entrada.
  • Aguarda a conclusão da orquestração e exibe os resultados agregados.
  • Utiliza waitForInstanceCompletion para interrogação eficiente.
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();

// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
        "Hello, world!",
        "The quick brown fox jumps over the lazy dog.",
        "If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
        "The greatest glory in living lies not in never falling, but in rising every time we fall.",
        "Always remember that you are absolutely unique. Just like everyone else.");

// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
        "FanOutFanIn_WordCount",
        new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));

Próximos passos

Agora que você executou o exemplo localmente usando o emulador do Agendador de Tarefas Duráveis, tente criar um recurso de agendador e hub de tarefas e implantá-lo nos Aplicativos de Contêiner do Azure.