Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Os SDKs de Tarefas Duráveis fornecem uma biblioteca de cliente leve para o Agendador de Tarefas Duráveis. Neste guia de início rápido, você aprenderá a criar orquestrações que usam o padrão de aplicativo fan-out/fan-in para executar processamento paralelo.
Importante
Atualmente, os SDKs de Tarefa Durável não estão disponíveis para JavaScript e PowerShell.
Importante
Atualmente, os SDKs de Tarefa Durável não estão disponíveis para JavaScript e PowerShell.
- Configure e execute o emulador do Durable Task Scheduler para desenvolvimento local.
- Execute os projetos de trabalhador e cliente.
- Analise o status e o histórico da orquestração por meio do painel do Agendador de Tarefas Duráveis.
Pré-requisitos
Antes de começar:
- Certifique-se de ter o .NET 8 SDK ou posterior.
- Instale o Docker para executar o emulador.
- Clone o repositório GitHub do Durable Task Scheduler para usar o exemplo de início rápido.
- Certifique-se de ter Python 3.9+ ou posterior.
- Instale o Docker para executar o emulador.
- Clone o repositório GitHub do Durable Task Scheduler para usar o exemplo de início rápido.
- Certifique-se de ter Java 8 ou 11.
- Instale o Docker para executar o emulador.
- Clone o repositório GitHub do Durable Task Scheduler para usar o exemplo de início rápido.
Configurar o emulador do Durable Task Scheduler
O código do aplicativo procura um agendador implantado e um recurso de hub de tarefas. Se nenhum for encontrado, o código retornará ao emulador. O emulador simula um agendador e um hub de tarefas em um contêiner do Docker, tornando-o ideal para o desenvolvimento local necessário neste início rápido.
Azure-Samples/Durable-Task-SchedulerNo diretório raiz, navegue até o diretório de exemplo do SDK do .NET.cd samples/durable-task-sdks/dotnet/FanOutFanInPuxe a imagem do Docker para o emulador.
docker pull mcr.microsoft.com/dts/dts-emulator:latestExecute o emulador. O recipiente pode demorar alguns segundos a ficar pronto.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Como o código de exemplo usa automaticamente as configurações padrão do emulador, você não precisa definir nenhuma variável de ambiente. As configurações padrão do emulador para este início rápido são:
- Ponto final:
http://localhost:8080 - Hub de tarefas:
default
Azure-Samples/Durable-Task-SchedulerNo diretório raiz, navegue até o diretório de exemplo do Python SDK.cd samples/durable-task-sdks/python/fan-out-fan-inPuxe a imagem do Docker para o emulador.
docker pull mcr.microsoft.com/dts/dts-emulator:latestExecute o emulador. O recipiente pode demorar alguns segundos a ficar pronto.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Como o código de exemplo usa automaticamente as configurações padrão do emulador, você não precisa definir nenhuma variável de ambiente. As configurações padrão do emulador para este início rápido são:
- Ponto final:
http://localhost:8080 - Hub de tarefas:
default
Azure-Samples/Durable-Task-SchedulerNo diretório raiz, navegue até o diretório de exemplo do Java SDK.cd samples/durable-task-sdks/java/fan-out-fan-inPuxe a imagem do Docker para o emulador.
docker pull mcr.microsoft.com/dts/dts-emulator:latestExecute o emulador. O recipiente pode demorar alguns segundos a ficar pronto.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Como o código de exemplo usa automaticamente as configurações padrão do emulador, você não precisa definir nenhuma variável de ambiente. As configurações padrão do emulador para este início rápido são:
- Ponto final:
http://localhost:8080 - Hub de tarefas:
default
Executar o arranque rápido
A partir do
FanOutFanIndiretório, navegue até oWorkerdiretório para compilar e executar o trabalhador.cd Worker dotnet build dotnet runEm um terminal separado, a partir do
FanOutFanIndiretório, navegue até oClientdiretório para construir e executar o cliente.cd Client dotnet build dotnet run
Compreender os resultados
Ao executar este exemplo, recebe-se a saída tanto dos processos de trabalho quanto dos de cliente. Descompacte o que aconteceu no código quando você executou o projeto.
Produção do trabalhador
O resultado do trabalho mostra:
- Registo do orquestrador e atividades
- Entradas de log quando cada atividade é chamada
- Processamento paralelo de vários itens de trabalho
- Agregação final dos resultados
Saída do cliente
A saída do cliente mostra:
- A orquestração inicia-se com uma lista de itens de trabalho
- O ID único da instância de orquestração
- Os resultados finais agregados, mostrando cada item de trabalho e seu resultado correspondente
- Contagem total de itens processados
Exemplo de saída
Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
Ative um ambiente virtual Python.
python -m venv venv /venv/Scripts/activateInstale os pacotes necessários.
pip install -r requirements.txtInicie o trabalhador.
python worker.pyResultados esperados
Você pode ver a saída indicando que o trabalhador iniciou e está aguardando itens de trabalho.
Starting Fan Out/Fan In pattern worker... Using taskhub: default Using endpoint: http://localhost:8080 Starting gRPC worker that connects to http://localhost:8080 Successfully connected to http://localhost:8080. Waiting for work items...Em um novo terminal, ative o ambiente virtual e execute o cliente.
venv/Scripts/activate python client.pyVocê pode fornecer o número de itens de trabalho como um argumento. Se nenhum argumento for fornecido, o exemplo executará 10 itens por padrão.
python client.py [number_of_items]
Compreender os resultados
Ao executar este exemplo, recebe-se a saída tanto dos processos de trabalho quanto dos de cliente. Descompacte o que aconteceu no código quando você executou o projeto.
Produção do trabalhador
O resultado do trabalho mostra:
- Registo do orquestrador e das atividades.
- Mensagens de status ao processar cada item de trabalho em paralelo, mostrando que eles estão sendo executados simultaneamente.
- Atrasos aleatórios para cada item de trabalho (entre 0,5 e 2 segundos) para simular tempos de processamento variáveis.
- Uma mensagem final mostrando a agregação dos resultados.
Saída do cliente
A saída do cliente mostra:
- A orquestração começa com o número de itens de trabalho especificado.
- O ID exclusivo da instância de orquestração.
- O resultado final agregado, que inclui:
- O número total de itens processados
- A soma de todos os resultados (onde o resultado de cada item é calculado como o quadrado do valor do próprio item)
- A média de todos os resultados
Exemplo de saída
Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED
A partir do diretório, compile e execute o aplicativo usando o fan-out-fan-in Gradle.
./gradlew runFanOutFanInPattern
Sugestão
Se você receber a mensagem zsh: permission denied: ./gradlewde erro , tente executar chmod +x gradlew antes de executar o aplicativo.
Compreender os resultados
Ao executar este exemplo, você recebe uma saída que mostra:
- Registo do orquestrador e das atividades.
- Mensagens de status ao processar cada item de trabalho em paralelo, mostrando que eles estão sendo executados simultaneamente.
- Atrasos aleatórios para cada item de trabalho (entre 0,5 e 2 segundos) para simular tempos de processamento variáveis.
- Uma mensagem final mostrando a agregação dos resultados.
Descompacte o que aconteceu no código quando você executou o projeto.
Exemplo de saída
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60
Depois de executar o projeto localmente, pode aprender como implantar aplicações no Azure usando Aplicativos de Contêiner do Azure.
Ver estado e histórico da orquestração
Você pode visualizar o status e o histórico da orquestração por meio do painel do Agendador de Tarefas Duráveis. Por padrão, o painel é executado na porta 8082.
- Navegue no http://localhost:8082 no seu navegador da Web.
- Clique no hub de tarefas padrão . A instância de orquestração que você criou está na lista.
- Clique no ID da instância de orquestração para visualizar os detalhes da execução, que incluem:
- A execução paralela de tarefas de múltiplas atividades
- A etapa de agregação fan-in
- A entrada e saída em cada etapa
- O tempo necessário para cada etapa
Noções básicas sobre a estrutura do código
O projeto do trabalhador
Para demonstrar o padrão de fan-out/fan-in, a orquestração do projeto de trabalho cria tarefas de atividade paralelas e espera que todas sejam concluídas. O orquestrador:
- Usa uma lista de itens de trabalho como entrada.
- Expanda-se criando uma tarefa separada para cada item de trabalho usando
ProcessWorkItemActivity. - Executa todas as tarefas em paralelo.
- Aguarda que todas as tarefas sejam concluídas usando
Task.WhenAll. - Fãs agregando todos os resultados individuais usando
AggregateResultsActivity. - Devolve o resultado final agregado ao cliente.
O projeto de trabalho contém:
- ParallelProcessingOrchestration.cs: Define o orquestrador e as funções de atividade em um único arquivo.
- Program.cs: Configura o host de trabalho com manipulação adequada de cadeia de conexão.
ParallelProcessingOrchestration.cs
Usando o método fan-out/fan-in, a orquestração cria tarefas paralelas e aguarda a conclusão de todas elas.
public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
Cada atividade é implementada como uma classe separada decorada com o [DurableTask] atributo.
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
// Implementation processes a single work item
}
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
// Implementation aggregates individual results
}
Program.cs
O trabalhador usa Microsoft.Extensions.Hosting para o gerenciamento adequado do ciclo de vida.
using Microsoft.Extensions.Hosting;
//..
builder.Services.AddDurableTaskWorker()
.AddTasks(registry =>
{
registry.AddOrchestrator<ParallelProcessingOrchestration>();
registry.AddActivity<ProcessWorkItemActivity>();
registry.AddActivity<AggregateResultsActivity>();
})
.UseDurableTaskScheduler(connectionString);
O projeto do cliente
O projeto do cliente:
- Usa a mesma lógica de cadeia de conexão que o trabalhador.
- Cria uma lista de itens de trabalho a serem processados em paralelo.
- Programa uma instância de orquestração com a lista como entrada.
- Aguarda a conclusão da orquestração e exibe os resultados agregados.
- Utiliza
WaitForInstanceCompletionAsyncpara interrogação eficiente.
List<string> workItems = new List<string>
{
"Task1",
"Task2",
"Task3",
"LongerTask4",
"VeryLongTask5"
};
// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"ParallelProcessingOrchestration",
workItems);
// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
instanceId,
getInputsAndOutputs: true,
cts.Token);
worker.py
Para demonstrar o padrão de fan-out/fan-in, a orquestração do projeto de trabalho cria tarefas de atividade paralelas e espera que todas sejam concluídas. O orquestrador:
- Recebe uma lista de itens de trabalho como entrada.
- Ele "ventila" criando tarefas paralelas para cada item de trabalho (chamando
process_work_itempara cada um). - Ele aguarda que todas as tarefas sejam concluídas usando
task.when_all. - Em seguida, "consolida" agregando os resultados com a atividade
aggregate_results. - O resultado final agregado é devolvido ao cliente.
Usando o método fan-out/fan-in, a orquestração cria tarefas paralelas e aguarda a conclusão de todas elas.
# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")
# Fan out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity("process_work_item", input=item))
# Wait for all tasks to complete
logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
results = yield task.when_all(parallel_tasks)
# Fan in: Aggregate all the results
logger.info("All parallel tasks completed, aggregating results")
final_result = yield ctx.call_activity("aggregate_results", input=results)
return final_result
client.py
O projeto do cliente:
- Usa a mesma lógica de cadeia de conexão que o trabalhador.
- Cria uma lista de itens de trabalho a serem processados em paralelo.
- Programa uma instância de orquestração com a lista como entrada.
- Aguarda a conclusão da orquestração e exibe os resultados agregados.
- Utiliza
wait_for_orchestration_completionpara interrogação eficiente.
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))
logger.info(f"Starting new fan out/fan in orchestration with {count} work items")
# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
"fan_out_fan_in_orchestrator",
input=work_items
)
logger.info(f"Started orchestration with ID = {instance_id}")
# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=60
)
Para demonstrar o padrão de fan-out/fan-in, a orquestração do FanOutFanInPattern projeto cria tarefas paralelas e aguarda a conclusão de todas. O orquestrador:
- Usa uma lista de itens de trabalho como entrada.
- Expande criando uma tarefa separada para cada item de trabalho usando ``.
- Executa todas as tarefas em paralelo.
- Aguarda que todas as tarefas sejam concluídas usando ''.
- Fãs ao agregar todos os resultados individuais usando ``.
- Devolve o resultado final agregado ao cliente.
O projeto contém:
-
DurableTaskSchedulerWorkerExtensionstrabalhador: Define as funções do orquestrador e das atividades. -
DurableTaskSchedulerClientExtensionclient: Configurar o host de trabalho com gestão adequada da cadeia de conexão.
Trabalhador
Usando o método fan-out/fan-in, a orquestração cria tarefas paralelas e aguarda a conclusão de todas elas.
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
List<Integer> allWordCountResults = ctx.allOf(tasks).await();
int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalWordCount);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
.build();
// Start the worker
worker.start();
Cliente
O projeto do cliente:
- Usa a mesma lógica de cadeia de conexão que o trabalhador.
- Cria uma lista de itens de trabalho a serem processados em paralelo.
- Programa uma instância de orquestração com a lista como entrada.
- Aguarda a conclusão da orquestração e exibe os resultados agregados.
- Utiliza
waitForInstanceCompletionpara interrogação eficiente.
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();
// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
"The greatest glory in living lies not in never falling, but in rising every time we fall.",
"Always remember that you are absolutely unique. Just like everyone else.");
// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));
Próximos passos
Agora que você executou o exemplo localmente usando o emulador do Agendador de Tarefas Duráveis, tente criar um recurso de agendador e hub de tarefas e implantá-lo nos Aplicativos de Contêiner do Azure.