Compartilhar via


Início Rápido: Criar um aplicativo com SDKs de Tarefas Duráveis e Agendador de Tarefas Duráveis

Os SDKs de Tarefa Durável fornecem uma biblioteca de clientes leve para o Agendador de Tarefas Duráveis. Neste início rápido, você aprenderá a criar orquestrações que usam o padrão de aplicativo fan-out/fan-in para executar o processamento paralelo.

Importante

Atualmente, os SDKs de Tarefa Durável não estão disponíveis para JavaScript e PowerShell.

Importante

Atualmente, os SDKs de Tarefa Durável não estão disponíveis para JavaScript e PowerShell.

  • Configure e execute o emulador do Agendador de Tarefas Duráveis para desenvolvimento local.
  • Execute os projetos de trabalho e cliente.
  • Examine o status e o histórico de orquestração por meio do painel do Agendador de Tarefas Duráveis.

Pré-requisitos

Antes de começar:

Configurar o emulador do Agendador de Tarefas Duráveis

O código do aplicativo procura um agendador implantado e um recurso do hub de tarefas. Se nenhum for encontrado, o código retornará ao emulador. O emulador simula um agendador e um hub de tarefas em um contêiner do Docker, tornando-o ideal para o desenvolvimento local necessário neste início rápido.

  1. Azure-Samples/Durable-Task-Scheduler No diretório raiz, navegue até o diretório de exemplo do SDK do .NET.

    cd samples/durable-task-sdks/dotnet/FanOutFanIn
    
  2. Efetue pull da imagem do Docker para o emulador.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Executar o emulador. O contêiner pode levar alguns segundos para estar pronto.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Como o código de exemplo usa automaticamente as configurações do emulador padrão, você não precisa definir nenhuma variável de ambiente. As configurações padrão do emulador para este início rápido são:

  • Ponto de extremidade: http://localhost:8080
  • Hub de tarefas: default
  1. Azure-Samples/Durable-Task-Scheduler No diretório raiz, navegue até o diretório de exemplo do SDK do Python.

    cd samples/durable-task-sdks/python/fan-out-fan-in
    
  2. Efetue pull da imagem do Docker para o emulador.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Executar o emulador. O contêiner pode levar alguns segundos para estar pronto.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Como o código de exemplo usa automaticamente as configurações do emulador padrão, você não precisa definir nenhuma variável de ambiente. As configurações padrão do emulador para este início rápido são:

  • Ponto de extremidade: http://localhost:8080
  • Hub de tarefas: default
  1. Azure-Samples/Durable-Task-Scheduler No diretório raiz, navegue até o diretório de exemplo do SDK do Java.

    cd samples/durable-task-sdks/java/fan-out-fan-in
    
  2. Efetue pull da imagem do Docker para o emulador.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Executar o emulador. O contêiner pode levar alguns segundos para estar pronto.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Como o código de exemplo usa automaticamente as configurações do emulador padrão, você não precisa definir nenhuma variável de ambiente. As configurações padrão do emulador para este início rápido são:

  • Ponto de extremidade: http://localhost:8080
  • Hub de tarefas: default

Executar o início rápido

  1. No diretório FanOutFanIn, navegue até o diretório Worker para criar e executar o trabalho.

    cd Worker
    dotnet build
    dotnet run
    
  2. Em um terminal separado, do FanOutFanIn diretório, navegue até o Client diretório para compilar e executar o cliente.

    cd Client
    dotnet build
    dotnet run
    

Como entender a saída

Ao executar este exemplo, você recebe a saída dos processos de trabalho e cliente. Desempacotar o que aconteceu no código quando você executou o projeto.

Saída do trabalho

A saída do trabalho mostra:

  • Registro do orquestrador e das atividades
  • Entradas de log quando cada atividade é chamada
  • Processamento paralelo de vários itens de trabalho
  • Agregação final de resultados

Saída do cliente

A saída do cliente mostra:

  • A orquestração começa com uma lista de itens de trabalho
  • A ID da instância de orquestração exclusiva
  • Os resultados agregados finais, mostrando cada item de trabalho e seu resultado correspondente
  • Contagem total de itens processados

Saída de exemplo

Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
  1. Ativar um ambiente virtual do Python.

    python -m venv venv
    /venv/Scripts/activate
    
  2. Instale os pacotes necessários.

    pip install -r requirements.txt
    
  3. Inicie o trabalho.

    python worker.py
    

    Saída esperada

    Você pode ver a saída indicando que o trabalho foi iniciado e está aguardando itens de trabalho.

    Starting Fan Out/Fan In pattern worker...
    Using taskhub: default
    Using endpoint: http://localhost:8080
    Starting gRPC worker that connects to http://localhost:8080
    Successfully connected to http://localhost:8080. Waiting for work items...
    
  4. Em um novo terminal, ative o ambiente virtual e execute o cliente.

    venv/Scripts/activate
    python client.py
    

    Você pode fornecer o número de itens de trabalho como um argumento. Se nenhum argumento for fornecido, o exemplo executará 10 itens por padrão.

    python client.py [number_of_items]
    

Como entender a saída

Ao executar este exemplo, você recebe a saída dos processos de trabalho e cliente. Desempacotar o que aconteceu no código quando você executou o projeto.

Saída do trabalho

A saída do trabalho mostra:

  • Registro do orquestrador e das atividades.
  • Mensagens de status ao processar cada item de trabalho em paralelo, mostrando que elas estão sendo executadas simultaneamente.
  • Atrasos aleatórios para cada item de trabalho (entre 0,5 e 2 segundos) para simular diferentes tempos de processamento.
  • Uma mensagem final mostrando a agregação de resultados.

Saída do cliente

A saída do cliente mostra:

  • A orquestração começando com o número especificado de itens de trabalho.
  • A ID da instância de orquestração exclusiva.
  • O resultado agregado final, que inclui:
    • O número total de itens processados
    • A soma de todos os resultados (o resultado de cada item é o quadrado de seu valor)
    • A média de todos os resultados

Saída de exemplo

Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED

fan-out-fan-in No diretório, compile e execute o aplicativo usando o Gradle.

./gradlew runFanOutFanInPattern

Dica

Se você receber a mensagem zsh: permission denied: ./gradlewde erro, tente executar chmod +x gradlew antes de executar o aplicativo.

Como entender a saída

Ao executar este exemplo, você recebe uma saída que mostra:

  • Registro do orquestrador e das atividades.
  • Mensagens de status ao processar cada item de trabalho em paralelo, mostrando que elas estão sendo executadas simultaneamente.
  • Atrasos aleatórios para cada item de trabalho (entre 0,5 e 2 segundos) para simular diferentes tempos de processamento.
  • Uma mensagem final mostrando a agregação de resultados.

Desempacotar o que aconteceu no código quando você executou o projeto.

Saída de exemplo

Starting a Gradle Daemon (subsequent builds will be faster)

> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60

Agora que você executou o projeto localmente, pode aprender a implantar no Azure usando os Aplicativos de Contêiner do Azure.

Exibir o status e o histórico de orquestração

Você pode exibir o status e o histórico de orquestração por meio do painel do Agendador de Tarefas Duráveis. Por padrão, o painel é executado na porta 8082.

  1. Navegue até http://localhost:8082 no seu navegador da Web.
  2. Clique no hub de tarefas padrão . A instância de orquestração que você criou está na lista.
  3. Clique na ID da instância de orquestração para exibir os detalhes da execução, que incluem:
    • A execução paralela de várias tarefas de atividade
    • A etapa de agregação de fan-in
    • A entrada e a saída em cada etapa
    • O tempo levado para cada etapa

Captura de tela mostrando os detalhes da instância de orquestração para o exemplo do .NET.

Captura de tela mostrando os detalhes da instância de orquestração para o exemplo do Python.

Captura de tela mostrando os detalhes da instância de orquestração para o exemplo java.

Noções básicas sobre a estrutura de código

O projeto do trabalhador

Para demonstrar o padrão fan-out/fan-in, a orquestração do projeto de trabalho cria tarefas de atividade paralela e aguarda que todos sejam concluídos. O orquestrador:

  1. Usa uma lista de itens de trabalho como entrada.
  2. Esvai criando uma tarefa separada para cada item de trabalho usando ProcessWorkItemActivity.
  3. Executa todas as tarefas em paralelo.
  4. Aguarda que todas as tarefas sejam concluídas usando Task.WhenAll.
  5. Fãs agregando todos os resultados individuais usando AggregateResultsActivity.
  6. Retorna o resultado agregado final para o cliente.

O projeto de trabalho contém:

  • ParallelProcessingOrchestration.cs: define o orquestrador e as funções de atividade em um único arquivo.
  • Program.cs: configura o host de trabalho com o tratamento adequado da cadeia de conexão.

ParallelProcessingOrchestration.cs

Usando fan-out/fan-in, a orquestração cria tarefas de atividade paralela e aguarda que todos sejam concluídos.

public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
    // Step 1: Fan-out by creating a task for each work item in parallel
    List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();

    foreach (string workItem in workItems)
    {
        // Create a task for each work item (fan-out)
        Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
            nameof(ProcessWorkItemActivity), workItem);
        processingTasks.Add(task);
    }

    // Step 2: Wait for all parallel tasks to complete
    Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

    // Step 3: Fan-in by aggregating all results
    Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
        nameof(AggregateResultsActivity), results);

    return aggregatedResults;
}

Cada atividade é implementada como uma classe separada decorada com o [DurableTask] atributo.

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    // Implementation processes a single work item
}

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    // Implementation aggregates individual results
}

Program.cs

O trabalhador usa Microsoft.Extensions.Hosting para o gerenciamento adequado do ciclo de vida.

using Microsoft.Extensions.Hosting;
//..

builder.Services.AddDurableTaskWorker()
    .AddTasks(registry =>
    {
        registry.AddOrchestrator<ParallelProcessingOrchestration>();
        registry.AddActivity<ProcessWorkItemActivity>();
        registry.AddActivity<AggregateResultsActivity>();
    })
    .UseDurableTaskScheduler(connectionString);

O projeto do cliente

O projeto cliente:

  • Usa a mesma lógica de cadeia de conexão que o trabalho.
  • Cria uma lista de itens de trabalho a serem processados em paralelo.
  • Agenda uma instância de orquestração com a lista como entrada.
  • Aguarda a conclusão da orquestração e exibe os resultados agregados.
  • Usa WaitForInstanceCompletionAsync para sondagem eficiente.
List<string> workItems = new List<string>
{
    "Task1",
    "Task2",
    "Task3",
    "LongerTask4",
    "VeryLongTask5"
};

// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    "ParallelProcessingOrchestration", 
    workItems);

// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true,
    cts.Token);

worker.py

Para demonstrar o padrão fan-out/fan-in, a orquestração do projeto de trabalho cria tarefas de atividade paralela e aguarda que todos sejam concluídos. O orquestrador:

  1. Recebe uma lista de itens de trabalho como entrada.
  2. Ele realiza "fan out" criando tarefas paralelas para cada item de trabalho (chamando process_work_item para cada um deles).
  3. Ele aguarda que todas as tarefas sejam concluídas usando task.when_all.
  4. Em seguida, ele realiza "fan in" agregando os resultados com a atividade aggregate_results.
  5. O resultado agregado final é retornado ao cliente.

Usando fan-out/fan-in, a orquestração cria tarefas de atividade paralela e aguarda que todos sejam concluídos.

# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
    logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")

    # Fan out: Create a task for each work item
    parallel_tasks = []
    for item in work_items:
        parallel_tasks.append(ctx.call_activity("process_work_item", input=item))

    # Wait for all tasks to complete
    logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
    results = yield task.when_all(parallel_tasks)

    # Fan in: Aggregate all the results
    logger.info("All parallel tasks completed, aggregating results")
    final_result = yield ctx.call_activity("aggregate_results", input=results)

    return final_result

client.py

O projeto cliente:

  • Usa a mesma lógica de cadeia de conexão que o trabalho.
  • Cria uma lista de itens de trabalho a serem processados em paralelo.
  • Agenda uma instância de orquestração com a lista como entrada.
  • Aguarda a conclusão da orquestração e exibe os resultados agregados.
  • Usa wait_for_orchestration_completion para sondagem eficiente.
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))

logger.info(f"Starting new fan out/fan in orchestration with {count} work items")

# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
    "fan_out_fan_in_orchestrator", 
    input=work_items
)

logger.info(f"Started orchestration with ID = {instance_id}")

# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
    instance_id,
    timeout=60
)

Para demonstrar o padrão fan-out/fan-in, a orquestração do projeto FanOutFanInPattern cria tarefas de atividade paralela e aguarda que todos sejam concluídos. O orquestrador:

  1. Usa uma lista de itens de trabalho como entrada.
  2. Realiza fan out criando uma tarefa separada para cada item de trabalho usando ``.
  3. Executa todas as tarefas em paralelo.
  4. Aguarda a conclusão de todas as tarefas usando ``.
  5. Realiza fan in agregando todos os resultados individuais usando ``.
  6. Retorna o resultado agregado final para o cliente.

O projeto contém:

  • DurableTaskSchedulerWorkerExtensions trabalho: define o orquestrador e as funções de atividade.
  • DurableTaskSchedulerClientExtension cliente: configura o host de trabalho com o tratamento adequado da cadeia de conexão.

Trabalhador

Usando fan-out/fan-in, a orquestração cria tarefas de atividade paralela e aguarda que todos sejam concluídos.

DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "FanOutFanIn_WordCount"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                List<?> inputs = ctx.getInput(List.class);
                List<Task<Integer>> tasks = inputs.stream()
                        .map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
                        .collect(Collectors.toList());
                List<Integer> allWordCountResults = ctx.allOf(tasks).await();
                int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
                ctx.complete(totalWordCount);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "CountWords"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringTokenizer tokenizer = new StringTokenizer(input);
                return tokenizer.countTokens();
            };
        }
    })
    .build();

// Start the worker
worker.start();

Cliente

O projeto cliente:

  • Usa a mesma lógica de cadeia de conexão que o trabalho.
  • Cria uma lista de itens de trabalho a serem processados em paralelo.
  • Agenda uma instância de orquestração com a lista como entrada.
  • Aguarda a conclusão da orquestração e exibe os resultados agregados.
  • Usa waitForInstanceCompletion para sondagem eficiente.
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();

// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
        "Hello, world!",
        "The quick brown fox jumps over the lazy dog.",
        "If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
        "The greatest glory in living lies not in never falling, but in rising every time we fall.",
        "Always remember that you are absolutely unique. Just like everyone else.");

// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
        "FanOutFanIn_WordCount",
        new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));

Próximas etapas

Agora que você executou o exemplo localmente usando o emulador do Agendador de Tarefas Duráveis, tente criar um recurso de agendador e hub de tarefas e implantar nos Aplicativos de Contêiner do Azure.