Freigeben über


Schnellstart: Hosten einer Durable Task SDK-App in Azure-Container-Apps

Von Bedeutung

Derzeit sind die sdKs für dauerhafte Aufgaben für JavaScript und PowerShell nicht verfügbar.

Von Bedeutung

Derzeit sind die sdKs für dauerhafte Aufgaben für JavaScript und PowerShell nicht verfügbar.

In dieser Schnellstartanleitung wird Folgendes vermittelt:

  • Richten Sie den Emulator "Durable Task Scheduler" für die lokale Entwicklung ein und führen Sie diesen aus.
  • Führen Sie die Arbeiter- und Clientprojekte aus.
  • Überprüfen Sie die Azure-Container-Apps-Protokolle.
  • Überprüfen Sie den Status und die Historie der Orchestrierung über das Dashboard des Durable Task Schedulers.

Voraussetzungen

Bevor Sie beginnen:

Vorbereiten des Projekts

Navigieren Sie in einem neuen Terminalfenster aus dem Azure-Samples/Durable-Task-Scheduler Verzeichnis in das Beispielverzeichnis.

cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining

Bereitstellen mithilfe der Azure Developer CLI

  1. Führen Sie azd up aus, um die Infrastruktur bereitzustellen und die Anwendung mit einem einzigen Befehl in Azure Container Apps bereitzustellen.

    azd up
    
  2. Wenn Sie im Terminal dazu aufgefordert werden, geben Sie die folgenden Parameter ein.

    Parameter BESCHREIBUNG
    Umgebungsname Präfix für die Ressourcengruppe, die für alle Azure-Ressourcen erstellt wurde.
    Azure-Standort Der Azure-Speicherort für Ihre Ressourcen.
    Azure-Abonnement Das Azure-Abonnement für Ihre Ressourcen.

    Dieser Vorgang nimmt einige Zeit in Anspruch. Nach Abschluss des azd up Befehls zeigt die CLI-Ausgabe zwei Azure-Portal-Links an, um den Bereitstellungsfortschritt zu überwachen. Die Ausgabe veranschaulicht außerdem, wie azd up:

    • Erstellt und konfiguriert alle erforderlichen Azure-Ressourcen mithilfe der bereitgestellten Bicep-Dateien im ./infra Verzeichnis mitazd provision. Nachdem Sie die Azure Developer CLI bereitgestellt haben, können Sie über das Azure-Portal auf diese Ressourcen zugreifen. Zu den Dateien, die die Azure-Ressourcen bereitstellen, gehören:
      • main.parameters.json
      • main.bicep
      • Ein app Ressourcenverzeichnis, das nach Funktionalität organisiert ist
      • Eine core Referenzbibliothek, die die von der azd Vorlage verwendeten Bicep-Module enthält
    • Stellt den Code mithilfe von azd deploy bereit

    Erwartete Ausgabe

    Packaging services (azd package)
    
    (✓) Done: Packaging service client
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    (✓) Done: Packaging service worker
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    Provisioning Azure resources (azd provision)
    Provisioning Azure resources can take some time.
    
    Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID)
    Location: West US 2
    
     You can view detailed progress in the Azure Portal:
     https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT
    
     (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s)
     (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s)
     (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s)
     (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s)
     (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s)   
    
    Deploying services (azd deploy)
    
     (✓) Done: Deploying service client
     - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/
    
     (✓) Done: Deploying service worker
     - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/
    
    
    SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.   
    

Erfolgreiche Bereitstellung bestätigen

Überprüfen Sie im Azure-Portal, ob die Orchestrierungen erfolgreich ausgeführt werden.

  1. Kopieren Sie den Ressourcengruppennamen aus der Terminalausgabe.

  2. Melden Sie sich beim Azure-Portal an, und suchen Sie nach diesem Ressourcengruppennamen.

  3. Klicken Sie auf der Übersichtsseite der Ressourcengruppe auf die Clientcontainer-App-Ressource.

  4. Wählen Sie Überwachung>Protokolldatenstrom aus.

  1. Vergewissern Sie sich, dass der Clientcontainer die Funktionskettenaufgaben protokolliert.

    Screenshot des Protokolldatenstroms des Clientcontainers im Azure-Portal.

  2. Navigieren Sie zurück zur Ressourcengruppenseite, um den worker Container auszuwählen.

  3. Wählen Sie Überwachung>Protokolldatenstrom aus.

  4. Vergewissern Sie sich, dass der Workercontainer die Aufgaben zur Funktionsverkettung protokolliert.

    Screenshot des Protokolldatenstroms des Arbeitscontainers im Azure-Portal.

  1. Vergewissern Sie sich, dass die Beispielcontainer-App die Funktionskettenaufgaben protokolliert.

    Screenshot des Protokolldatenstroms der Java-Beispiel-App im Azure-Portal.

Grundlegendes zum Code

Clientprojekt

Das Clientprojekt:

  • Verwendet dieselbe Verbindungszeichenfolgenlogik wie der Worker.
  • Implementiert einen sequenziellen Orchestrierungsplaner, der:
    • Plant 20 Orchestrierungsinstanzen, eine nach der anderen.
    • Wartet 5 Sekunden zwischen der Planung der einzelnen Orchestrierungen.
    • Verfolgt alle Orchestrierungsinstanzen in einer Liste nach.
    • Wartet vor Beendigung darauf, dass alle Orchestrierungen abgeschlossen sind.
  • Verwendet die Standardprotokollierung zum Anzeigen des Fortschritts und der Ergebnisse
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
    // Create a unique instance ID
    string instanceName = $"{name}_{i+1}";

    // Schedule the orchestration
    string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
        "GreetingOrchestration", 
        instanceName);

    // Wait 5 seconds before scheduling the next one
    await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}

// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
    OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
        id, getInputsAndOutputs: false, CancellationToken.None);
}

Arbeiterprojekt

Das Worker-Projekt enthält:

  • GreetingOrchestration.cs: Definiert die Orchestrator- und Aktivitätsfunktionen in einer einzelnen Datei.
  • Program.cs: Richtet den Workerhost mit entsprechender Behandlung von Verbindungszeichenfolgen ein.

Orchestrierungsimplementierung

Die Orchestrierung ruft jede Aktivität mithilfe der Standardmethode CallActivityAsync direkt in Sequenz auf:

public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
    // Step 1: Say hello to the person
    string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);

    // Step 2: Process the greeting
    string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);

    // Step 3: Finalize the response
    string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);

    return finalResponse;
}

Jede Aktivität wird als separate Klasse implementiert, die mit dem [DurableTask] Attribut versehen ist:

[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
    // Implementation details
}

Der Mitarbeiter verwendet Microsoft.Extensions.Hosting für die ordnungsgemäße Lebenszyklusverwaltung:

var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
    .AddTasks(registry => {
        registry.AddAllGeneratedTasks();
    })
    .UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();

Kunde

Das Clientprojekt:

  • Verwendet dieselbe Verbindungszeichenfolgenlogik wie der Worker.
  • Implementiert einen sequenziellen Orchestrierungsplaner, der:
    • Plant 20 Orchestrierungsinstanzen, eine nach der anderen.
    • Wartet 5 Sekunden zwischen der Planung der einzelnen Orchestrierungen.
    • Verfolgt alle Orchestrierungsinstanzen in einer Liste nach.
    • Wartet vor Beendigung darauf, dass alle Orchestrierungen abgeschlossen sind.
  • Verwendet die Standardprotokollierung zum Anzeigen des Fortschritts und der Ergebnisse
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
    try:
        # Create a unique instance name
        instance_name = f"{name}_{i+1}"
        logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")

        # Schedule the orchestration
        instance_id = client.schedule_new_orchestration(
            "function_chaining_orchestrator",
            input=instance_name
        )

        instance_ids.append(instance_id)
        logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")

        # Wait before scheduling next orchestration (except for the last one)
        if i < TOTAL_ORCHESTRATIONS - 1:
            logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
        await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
    try:
        logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
        result = client.wait_for_orchestration_completion(
            instance_id,
            timeout=120
        )

Arbeiter

Orchestrierungsimplementierung

Die Orchestrierung ruft jede Aktivität mithilfe der Standardfunktion call_activity direkt in Sequenz auf:

# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
    """Orchestrator that demonstrates function chaining pattern."""
    logger.info(f"Starting function chaining orchestration for {name}")

    # Call first activity - passing input directly without named parameter
    greeting = yield ctx.call_activity('say_hello', input=name)

    # Call second activity with the result from first activity
    processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)

    # Call third activity with the result from second activity
    final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)

    return final_response

Jede Aktivität wird als separate Funktion implementiert:

# Activity functions
def say_hello(ctx, name: str) -> str:
    """First activity that greets the user."""
    logger.info(f"Activity say_hello called with name: {name}")
    return f"Hello {name}!"

def process_greeting(ctx, greeting: str) -> str:
    """Second activity that processes the greeting."""
    logger.info(f"Activity process_greeting called with greeting: {greeting}")
    return f"{greeting} How are you today?"

def finalize_response(ctx, response: str) -> str:
    """Third activity that finalizes the response."""
    logger.info(f"Activity finalize_response called with response: {response}")
    return f"{response} I hope you're doing well!"

Der Mitarbeiter verwendet DurableTaskSchedulerWorker für die ordnungsgemäße Lebenszyklusverwaltung:

with DurableTaskSchedulerWorker(
    host_address=host_address, 
    secure_channel=endpoint != "http://localhost:8080",
    taskhub=taskhub_name, 
    token_credential=credential
) as worker:

    # Register activities and orchestrators
    worker.add_activity(say_hello)
    worker.add_activity(process_greeting)
    worker.add_activity(finalize_response)
    worker.add_orchestrator(function_chaining_orchestrator)

    # Start the worker (without awaiting)
    worker.start()

Die Beispielcontainer-App enthält sowohl den Worker- als auch den Clientcode.

Kunde

Der Clientcode:

  • Verwendet dieselbe Verbindungszeichenfolgenlogik wie der Worker.
  • Implementiert einen sequenziellen Orchestrierungsplaner, der:
    • Plant 20 Orchestrierungsinstanzen, eine nach der anderen.
    • Wartet 5 Sekunden zwischen der Planung der einzelnen Orchestrierungen.
    • Verfolgt alle Orchestrierungsinstanzen in einer Liste nach.
    • Wartet vor Beendigung darauf, dass alle Orchestrierungen abgeschlossen sind.
  • Verwendet die Standardprotokollierung zum Anzeigen des Fortschritts und der Ergebnisse
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null 
    ? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();

// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
        "ActivityChaining",
        new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))

Arbeiter

Die Orchestrierung ruft jede Aktivität mithilfe der Standardmethode callActivity direkt in Sequenz auf:

DurableTaskGrpcWorker worker = (credential != null 
    ? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "ActivityChaining"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                String x = ctx.callActivity("Reverse", input, String.class).await();
                String y = ctx.callActivity("Capitalize", x, String.class).await();
                String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
                ctx.complete(z);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Reverse"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringBuilder builder = new StringBuilder(input);
                builder.reverse();
                return builder.toString();
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Capitalize"; }

        @Override
        public TaskActivity create() {
            return ctx -> ctx.getInput(String.class).toUpperCase();
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "ReplaceWhitespace"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                return input.trim().replaceAll("\\s", "-");
            };
        }
    })
    .build();

// Start the worker
worker.start();

Nächste Schritte