Udostępnij przez


Szybki start: hostowanie aplikacji zestawu Durable Task SDK w usłudze Azure Container Apps

Ważne

Obecnie zestawy SDK Durable Task nie są dostępne w językach JavaScript i PowerShell.

Ważne

Obecnie zestawy SDK Durable Task nie są dostępne w językach JavaScript i PowerShell.

W tym przewodniku Szybki start zawarto informacje na temat wykonywania następujących czynności:

  • Skonfiguruj i uruchom emulator narzędzia Durable Task Scheduler na potrzeby programowania lokalnego.
  • Uruchom projekty pracowników i klientów.
  • Sprawdź dzienniki usługi Azure Container Apps.
  • Przejrzyj stan orkiestracji i historię za pomocą pulpitu nawigacyjnego narzędzia Durable Task Scheduler.

Wymagania wstępne

Przed rozpoczęciem:

Przygotowywanie projektu

W nowym oknie terminalu przejdź z katalogu Azure-Samples/Durable-Task-Scheduler do przykładowego katalogu.

cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining

Wdrażanie przy użyciu interfejsu wiersza polecenia dla deweloperów platformy Azure

  1. Uruchom polecenie azd up , aby aprowizować infrastrukturę i wdrożyć aplikację w usłudze Azure Container Apps w jednym poleceniu.

    azd up
    
  2. Po wyświetleniu monitu w terminalu podaj następujące parametry.

    Parametr Opis
    Nazwa środowiska Prefiks grupy zasobów utworzony do przechowywania wszystkich zasobów platformy Azure.
    Lokalizacja platformy Azure Lokalizacja Azure dla Twoich zasobów.
    Subskrypcja platformy Azure Subskrypcja platformy Azure dla twoich zasobów.

    Ukończenie tego procesu może zająć trochę czasu. Po zakończeniu działania polecenia dane wyjściowe CLI wyświetlają dwa linki do portalu Azure, które służą do monitorowania postępu wdrażania. Dane wyjściowe pokazują również, jak azd up:

    • Tworzy i konfiguruje wszystkie niezbędne zasoby platformy Azure za pośrednictwem udostępnionych plików Bicep w ./infra katalogu przy użyciu polecenia azd provision. Po aprowizacji za pomocą interfejsu wiersza polecenia dla deweloperów platformy Azure możesz uzyskać dostęp do tych zasobów za pośrednictwem witryny Azure Portal. Pliki do aprowizacji zasobów platformy Azure obejmują:
      • main.parameters.json
      • main.bicep
      • app Katalog zasobów uporządkowany według funkcji
      • Biblioteka referencyjna core, która zawiera moduły Bicep używane w szablonie azd
    • Wdraża kod przy użyciu polecenia azd deploy

    Oczekiwane dane wyjściowe

    Packaging services (azd package)
    
    (✓) Done: Packaging service client
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    (✓) Done: Packaging service worker
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    Provisioning Azure resources (azd provision)
    Provisioning Azure resources can take some time.
    
    Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID)
    Location: West US 2
    
     You can view detailed progress in the Azure Portal:
     https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT
    
     (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s)
     (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s)
     (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s)
     (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s)
     (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s)   
    
    Deploying services (azd deploy)
    
     (✓) Done: Deploying service client
     - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/
    
     (✓) Done: Deploying service worker
     - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/
    
    
    SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.   
    

Potwierdzanie pomyślnego wdrożenia

W portalu Azure sprawdź, czy orkiestracje działają pomyślnie.

  1. Skopiuj nazwę grupy zasobów z danych wyjściowych terminalu.

  2. Zaloguj się do witryny Azure Portal i wyszukaj nazwę tej grupy zasobów.

  3. Na stronie przeglądu grupy zasobów kliknij zasób aplikacji kontenera dla klienta.

  4. Wybierz pozycję Monitorowanie>Strumienia dziennika.

  1. Upewnij się, że kontener klienta rejestruje zadania łańcucha funkcji.

    Zrzut ekranu przedstawiający strumień dziennika kontenera klienta w witrynie Azure Portal.

  2. Wróć do strony grupy zasobów, aby wybrać worker kontener.

  3. Wybierz pozycję Monitorowanie>Strumienia dziennika.

  4. Upewnij się, że kontener procesu roboczego rejestruje zadania łańcucha funkcji.

    Zrzut ekranu przedstawiający strumień logów kontenera roboczego w portalu Azure.

  1. Upewnij się, że przykładowa aplikacja kontenera rejestruje zadania łańcucha funkcji.

    Zrzut ekranu przedstawiający strumień dziennika przykładowej aplikacji Java w witrynie Azure Portal.

Omówienie kodu

Projekt klienta

Projekt klienta:

  • Używa tej samej logiki parametrów połączenia co proces roboczy
  • Implementuje harmonogram aranżacji sekwencyjnej, który:
    • Planuje 20 wystąpień orkiestracji, jedno po drugim
    • Czeka 5 sekund między planowaniem każdej aranżacji
    • Śledzi wszystkie wystąpienia orkiestracji na liście
    • Czeka na ukończenie wszystkich aranżacji przed zakończeniem
  • Używa standardowego rejestrowania do pokazywania postępu i wyników
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
    // Create a unique instance ID
    string instanceName = $"{name}_{i+1}";

    // Schedule the orchestration
    string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
        "GreetingOrchestration", 
        instanceName);

    // Wait 5 seconds before scheduling the next one
    await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}

// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
    OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
        id, getInputsAndOutputs: false, CancellationToken.None);
}

Projekt pracowniczy

Projekt Worker zawiera:

  • GreetingOrchestration.cs: definiuje funkcje orkiestratora i działania w jednym pliku
  • Program.cs: Konfiguruje hosta procesu roboczego z odpowiednią obsługą parametrów połączenia

Implementacja orkiestracji

Orkiestracja bezpośrednio wywołuje każde działanie w sekwencji przy użyciu standardowej CallActivityAsync metody:

public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
    // Step 1: Say hello to the person
    string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);

    // Step 2: Process the greeting
    string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);

    // Step 3: Finalize the response
    string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);

    return finalResponse;
}

Każde działanie jest implementowane jako oddzielna klasa ozdobiona atrybutem [DurableTask] :

[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
    // Implementation details
}

Pracownik używa Microsoft.Extensions.Hosting do prawidłowego zarządzania cyklem życia.

var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
    .AddTasks(registry => {
        registry.AddAllGeneratedTasks();
    })
    .UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();

Klient

Projekt klienta:

  • Używa tej samej logiki parametrów połączenia co proces roboczy
  • Implementuje harmonogram aranżacji sekwencyjnej, który:
    • Planuje 20 wystąpień orkiestracji, jedno po drugim
    • Czeka 5 sekund między planowaniem każdej aranżacji
    • Śledzi wszystkie wystąpienia orkiestracji na liście
    • Czeka na ukończenie wszystkich aranżacji przed zakończeniem
  • Używa standardowego rejestrowania do pokazywania postępu i wyników
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
    try:
        # Create a unique instance name
        instance_name = f"{name}_{i+1}"
        logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")

        # Schedule the orchestration
        instance_id = client.schedule_new_orchestration(
            "function_chaining_orchestrator",
            input=instance_name
        )

        instance_ids.append(instance_id)
        logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")

        # Wait before scheduling next orchestration (except for the last one)
        if i < TOTAL_ORCHESTRATIONS - 1:
            logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
        await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
    try:
        logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
        result = client.wait_for_orchestration_completion(
            instance_id,
            timeout=120
        )

Robotnik

Implementacja orkiestracji

Orkiestracja bezpośrednio wywołuje każde działanie w sekwencji przy użyciu standardowej call_activity funkcji:

# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
    """Orchestrator that demonstrates function chaining pattern."""
    logger.info(f"Starting function chaining orchestration for {name}")

    # Call first activity - passing input directly without named parameter
    greeting = yield ctx.call_activity('say_hello', input=name)

    # Call second activity with the result from first activity
    processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)

    # Call third activity with the result from second activity
    final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)

    return final_response

Każde działanie jest implementowane jako osobna funkcja:

# Activity functions
def say_hello(ctx, name: str) -> str:
    """First activity that greets the user."""
    logger.info(f"Activity say_hello called with name: {name}")
    return f"Hello {name}!"

def process_greeting(ctx, greeting: str) -> str:
    """Second activity that processes the greeting."""
    logger.info(f"Activity process_greeting called with greeting: {greeting}")
    return f"{greeting} How are you today?"

def finalize_response(ctx, response: str) -> str:
    """Third activity that finalizes the response."""
    logger.info(f"Activity finalize_response called with response: {response}")
    return f"{response} I hope you're doing well!"

Pracownik używa DurableTaskSchedulerWorker do prawidłowego zarządzania cyklem życia.

with DurableTaskSchedulerWorker(
    host_address=host_address, 
    secure_channel=endpoint != "http://localhost:8080",
    taskhub=taskhub_name, 
    token_credential=credential
) as worker:

    # Register activities and orchestrators
    worker.add_activity(say_hello)
    worker.add_activity(process_greeting)
    worker.add_activity(finalize_response)
    worker.add_orchestrator(function_chaining_orchestrator)

    # Start the worker (without awaiting)
    worker.start()

Przykładowa aplikacja kontenera zawiera zarówno kod zadania roboczego, jak i kod klienta.

Klient

Kod klienta:

  • Używa tej samej logiki parametrów połączenia co proces roboczy
  • Implementuje harmonogram aranżacji sekwencyjnej, który:
    • Planuje 20 wystąpień orkiestracji, jedno po drugim
    • Czeka 5 sekund między planowaniem każdej aranżacji
    • Śledzi wszystkie wystąpienia orkiestracji na liście
    • Czeka na ukończenie wszystkich aranżacji przed zakończeniem
  • Używa standardowego rejestrowania do pokazywania postępu i wyników
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null 
    ? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();

// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
        "ActivityChaining",
        new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))

Robotnik

Orkiestracja bezpośrednio wywołuje każde działanie w sekwencji przy użyciu standardowej callActivity metody:

DurableTaskGrpcWorker worker = (credential != null 
    ? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "ActivityChaining"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                String x = ctx.callActivity("Reverse", input, String.class).await();
                String y = ctx.callActivity("Capitalize", x, String.class).await();
                String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
                ctx.complete(z);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Reverse"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringBuilder builder = new StringBuilder(input);
                builder.reverse();
                return builder.toString();
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Capitalize"; }

        @Override
        public TaskActivity create() {
            return ctx -> ctx.getInput(String.class).toUpperCase();
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "ReplaceWhitespace"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                return input.trim().replaceAll("\\s", "-");
            };
        }
    })
    .build();

// Start the worker
worker.start();

Dalsze kroki