Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Importante
Atualmente, os SDKs de Tarefa Durável não estão disponíveis para JavaScript e PowerShell.
Importante
Atualmente, os SDKs de Tarefa Durável não estão disponíveis para JavaScript e PowerShell.
Neste início rápido, vai aprender a:
- Configure e execute o emulador do Durable Task Scheduler para desenvolvimento local.
- Execute os projetos de trabalhador e cliente.
- Verifique os registos dos Apps de Contêiner do Azure.
- Analise o status e o histórico da orquestração por meio do painel do Agendador de Tarefas Duráveis.
Pré-requisitos
Antes de começar:
- Certifique-se de ter o .NET 8 SDK ou posterior.
- Instale o Docker para executar o emulador.
- Instalar a CLI do Azure Developer
- Clone o repositório GitHub do Durable Task Scheduler para usar o exemplo de início rápido.
- Certifique-se de ter Python 3.9+ ou posterior.
- Instale o Docker para executar o emulador.
- Instalar a CLI do Azure Developer
- Clone o repositório GitHub do Durable Task Scheduler para usar o exemplo de início rápido.
- Certifique-se de ter Java 8 ou 11.
- Instale o Docker para executar o emulador.
- Instalar a CLI do Azure Developer
- Clone o repositório GitHub do Durable Task Scheduler para usar o exemplo de início rápido.
Preparar o projeto
Em uma nova janela do terminal, a partir do Azure-Samples/Durable-Task-Scheduler diretório, navegue até o diretório de exemplo.
cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining
Implantar usando a CLI do Azure Developer
Execute
azd uppara provisionar a infraestrutura e implantar o aplicativo nos Aplicativos de Contêiner do Azure em um único comando.azd upQuando solicitado no terminal, forneça os seguintes parâmetros.
Parâmetro Descrição Nome do ambiente Prefixo para o grupo de recursos criado para armazenar todos os recursos do Azure. Localização do Azure Localização do Azure para os seus recursos. Subscrição do Azure A subscrição do Azure para os seus recursos. Este processo pode levar algum tempo para ser concluído. À medida que o
azd upcomando é concluído, a saída da CLI exibe dois links do portal do Azure para monitorar o progresso da implantação. A saída também demonstra comoazd up:- Cria e configura todos os recursos necessários do Azure através dos arquivos Bicep fornecidos no diretório
./infrausandoazd provision. Depois de provisionado pela CLI do Desenvolvedor do Azure, você pode acessar esses recursos por meio do portal do Azure. Os arquivos que provisionam os recursos do Azure incluem:main.parameters.jsonmain.bicep- Um
appdiretório de recursos organizado por funcionalidade - Uma
corebiblioteca de referência que contém os módulos Bicep usados peloazdmodelo
- Implanta o código usando
azd deploy
Resultado esperado
Packaging services (azd package) (✓) Done: Packaging service client - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} (✓) Done: Packaging service worker - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} Provisioning Azure resources (azd provision) Provisioning Azure resources can take some time. Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID) Location: West US 2 You can view detailed progress in the Azure Portal: https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s) (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s) (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s) (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s) (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s) Deploying services (azd deploy) (✓) Done: Deploying service client - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/ (✓) Done: Deploying service worker - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/ SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.- Cria e configura todos os recursos necessários do Azure através dos arquivos Bicep fornecidos no diretório
Confirmar a implantação bem-sucedida
No portal do Azure, verifique se as orquestrações estão sendo executadas com êxito.
Copie o nome do grupo de recursos da saída do terminal.
Entre no portal do Azure e procure esse nome de grupo de recursos.
Na página de visão geral do grupo de recursos, clique no recurso do aplicativo de contêiner cliente.
Selecione Monitoramento>do fluxo de log.
Confirme se o contêiner do cliente está registrando as tarefas de encadeamento de funções.
Navegue de volta para a página do grupo de recursos para selecionar o
workercontêiner.Selecione Monitoramento>do fluxo de log.
Confirme se o contêiner de trabalho está registrando as tarefas de encadeamento de funções.
Confirme se o aplicativo de contêiner de exemplo está registrando as tarefas de encadeamento de funções.
Compreender o código
Projeto do cliente
O projeto do Cliente:
- Usa a mesma lógica de cadeia de conexão que o trabalhador
- Implementa um agendador de orquestração sequencial que:
- Programa 20 instâncias de orquestração, uma de cada vez
- Espera 5 segundos entre o agendamento de cada orquestração
- Rastreia todas as instâncias de orquestração em uma lista
- Aguarda que todas as orquestrações sejam concluídas antes de sair
- Usa o log padrão para mostrar o progresso e os resultados
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
// Create a unique instance ID
string instanceName = $"{name}_{i+1}";
// Schedule the orchestration
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"GreetingOrchestration",
instanceName);
// Wait 5 seconds before scheduling the next one
await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}
// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
id, getInputsAndOutputs: false, CancellationToken.None);
}
Projeto Trabalhador
O projeto Worker contém:
- GreetingOrchestration.cs: Define o orquestrador e as funções de atividade em um único arquivo
- Program.cs: Configura o host de trabalho com manipulação adequada da cadeia de conexão
Implementação de orquestração
A orquestração chama diretamente cada atividade em sequência usando o método padrão CallActivityAsync :
public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
// Step 1: Say hello to the person
string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);
// Step 2: Process the greeting
string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);
// Step 3: Finalize the response
string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);
return finalResponse;
}
Cada atividade é implementada como uma classe separada decorada com o [DurableTask] atributo:
[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
// Implementation details
}
O trabalhador usa Microsoft.Extensions.Hosting para o gerenciamento adequado do ciclo de vida:
var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
.AddTasks(registry => {
registry.AddAllGeneratedTasks();
})
.UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();
Cliente
O projeto do Cliente:
- Usa a mesma lógica de cadeia de conexão que o trabalhador
- Implementa um agendador de orquestração sequencial que:
- Programa 20 instâncias de orquestração, uma de cada vez
- Espera 5 segundos entre o agendamento de cada orquestração
- Rastreia todas as instâncias de orquestração em uma lista
- Aguarda que todas as orquestrações sejam concluídas antes de sair
- Usa o log padrão para mostrar o progresso e os resultados
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
try:
# Create a unique instance name
instance_name = f"{name}_{i+1}"
logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")
# Schedule the orchestration
instance_id = client.schedule_new_orchestration(
"function_chaining_orchestrator",
input=instance_name
)
instance_ids.append(instance_id)
logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")
# Wait before scheduling next orchestration (except for the last one)
if i < TOTAL_ORCHESTRATIONS - 1:
logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
try:
logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=120
)
Trabalhador
Implementação de orquestração
A orquestração chama diretamente cada atividade em sequência usando a função padrão call_activity :
# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
"""Orchestrator that demonstrates function chaining pattern."""
logger.info(f"Starting function chaining orchestration for {name}")
# Call first activity - passing input directly without named parameter
greeting = yield ctx.call_activity('say_hello', input=name)
# Call second activity with the result from first activity
processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)
# Call third activity with the result from second activity
final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)
return final_response
Cada atividade é implementada como uma função separada:
# Activity functions
def say_hello(ctx, name: str) -> str:
"""First activity that greets the user."""
logger.info(f"Activity say_hello called with name: {name}")
return f"Hello {name}!"
def process_greeting(ctx, greeting: str) -> str:
"""Second activity that processes the greeting."""
logger.info(f"Activity process_greeting called with greeting: {greeting}")
return f"{greeting} How are you today?"
def finalize_response(ctx, response: str) -> str:
"""Third activity that finalizes the response."""
logger.info(f"Activity finalize_response called with response: {response}")
return f"{response} I hope you're doing well!"
O trabalhador usa DurableTaskSchedulerWorker para o gerenciamento adequado do ciclo de vida:
with DurableTaskSchedulerWorker(
host_address=host_address,
secure_channel=endpoint != "http://localhost:8080",
taskhub=taskhub_name,
token_credential=credential
) as worker:
# Register activities and orchestrators
worker.add_activity(say_hello)
worker.add_activity(process_greeting)
worker.add_activity(finalize_response)
worker.add_orchestrator(function_chaining_orchestrator)
# Start the worker (without awaiting)
worker.start()
A aplicação de contêiner de exemplo contém tanto o código de trabalhador como o de cliente.
Cliente
O código do cliente:
- Usa a mesma lógica de cadeia de conexão que o trabalhador
- Implementa um agendador de orquestração sequencial que:
- Programa 20 instâncias de orquestração, uma de cada vez
- Espera 5 segundos entre o agendamento de cada orquestração
- Rastreia todas as instâncias de orquestração em uma lista
- Aguarda que todas as orquestrações sejam concluídas antes de sair
- Usa o log padrão para mostrar o progresso e os resultados
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null
? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
: DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();
// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
"ActivityChaining",
new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))
Trabalhador
A orquestração chama diretamente cada atividade em sequência usando o método padrão callActivity :
DurableTaskGrpcWorker worker = (credential != null
? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
: DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "ActivityChaining"; }
@Override
public TaskOrchestration create() {
return ctx -> {
String input = ctx.getInput(String.class);
String x = ctx.callActivity("Reverse", input, String.class).await();
String y = ctx.callActivity("Capitalize", x, String.class).await();
String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
ctx.complete(z);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "Reverse"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringBuilder builder = new StringBuilder(input);
builder.reverse();
return builder.toString();
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "Capitalize"; }
@Override
public TaskActivity create() {
return ctx -> ctx.getInput(String.class).toUpperCase();
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "ReplaceWhitespace"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
return input.trim().replaceAll("\\s", "-");
};
}
})
.build();
// Start the worker
worker.start();