Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Von Bedeutung
Derzeit sind die sdKs für dauerhafte Aufgaben für JavaScript und PowerShell nicht verfügbar.
Von Bedeutung
Derzeit sind die sdKs für dauerhafte Aufgaben für JavaScript und PowerShell nicht verfügbar.
In dieser Schnellstartanleitung wird Folgendes vermittelt:
- Richten Sie den Emulator "Durable Task Scheduler" für die lokale Entwicklung ein und führen Sie diesen aus.
- Führen Sie die Arbeiter- und Clientprojekte aus.
- Überprüfen Sie die Azure-Container-Apps-Protokolle.
- Überprüfen Sie den Status und die Historie der Orchestrierung über das Dashboard des Durable Task Schedulers.
Voraussetzungen
Bevor Sie beginnen:
- Stellen Sie sicher, dass Sie über .NET 8 SDK oder höher verfügen.
- Installieren Sie Docker zum Ausführen des Emulators.
- Installieren Sie die Azure Developer CLI
- Klonen Sie das GitHub-Repository "Durable Task Scheduler ", um das Schnellstartbeispiel zu verwenden.
- Stellen Sie sicher, dass Sie Python 3.9+ oder höher haben.
- Installieren Sie Docker zum Ausführen des Emulators.
- Installieren Sie die Azure Developer CLI
- Klonen Sie das GitHub-Repository "Durable Task Scheduler ", um das Schnellstartbeispiel zu verwenden.
- Stellen Sie sicher, dass Sie Über Java 8 oder 11 verfügen.
- Installieren Sie Docker zum Ausführen des Emulators.
- Installieren Sie die Azure Developer CLI
- Klonen Sie das GitHub-Repository "Durable Task Scheduler ", um das Schnellstartbeispiel zu verwenden.
Vorbereiten des Projekts
Navigieren Sie in einem neuen Terminalfenster aus dem Azure-Samples/Durable-Task-Scheduler Verzeichnis in das Beispielverzeichnis.
cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining
Bereitstellen mithilfe der Azure Developer CLI
Führen Sie
azd upaus, um die Infrastruktur bereitzustellen und die Anwendung mit einem einzigen Befehl in Azure Container Apps bereitzustellen.azd upWenn Sie im Terminal dazu aufgefordert werden, geben Sie die folgenden Parameter ein.
Parameter BESCHREIBUNG Umgebungsname Präfix für die Ressourcengruppe, die für alle Azure-Ressourcen erstellt wurde. Azure-Standort Der Azure-Speicherort für Ihre Ressourcen. Azure-Abonnement Das Azure-Abonnement für Ihre Ressourcen. Dieser Vorgang nimmt einige Zeit in Anspruch. Nach Abschluss des
azd upBefehls zeigt die CLI-Ausgabe zwei Azure-Portal-Links an, um den Bereitstellungsfortschritt zu überwachen. Die Ausgabe veranschaulicht außerdem, wieazd up:- Erstellt und konfiguriert alle erforderlichen Azure-Ressourcen mithilfe der bereitgestellten Bicep-Dateien im
./infraVerzeichnis mitazd provision. Nachdem Sie die Azure Developer CLI bereitgestellt haben, können Sie über das Azure-Portal auf diese Ressourcen zugreifen. Zu den Dateien, die die Azure-Ressourcen bereitstellen, gehören:main.parameters.jsonmain.bicep- Ein
appRessourcenverzeichnis, das nach Funktionalität organisiert ist - Eine
coreReferenzbibliothek, die die von derazdVorlage verwendeten Bicep-Module enthält
- Stellt den Code mithilfe von
azd deploybereit
Erwartete Ausgabe
Packaging services (azd package) (✓) Done: Packaging service client - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} (✓) Done: Packaging service worker - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} Provisioning Azure resources (azd provision) Provisioning Azure resources can take some time. Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID) Location: West US 2 You can view detailed progress in the Azure Portal: https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s) (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s) (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s) (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s) (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s) Deploying services (azd deploy) (✓) Done: Deploying service client - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/ (✓) Done: Deploying service worker - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/ SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.- Erstellt und konfiguriert alle erforderlichen Azure-Ressourcen mithilfe der bereitgestellten Bicep-Dateien im
Erfolgreiche Bereitstellung bestätigen
Überprüfen Sie im Azure-Portal, ob die Orchestrierungen erfolgreich ausgeführt werden.
Kopieren Sie den Ressourcengruppennamen aus der Terminalausgabe.
Melden Sie sich beim Azure-Portal an, und suchen Sie nach diesem Ressourcengruppennamen.
Klicken Sie auf der Übersichtsseite der Ressourcengruppe auf die Clientcontainer-App-Ressource.
Wählen Sie Überwachung>Protokolldatenstrom aus.
Vergewissern Sie sich, dass der Clientcontainer die Funktionskettenaufgaben protokolliert.
Navigieren Sie zurück zur Ressourcengruppenseite, um den
workerContainer auszuwählen.Wählen Sie Überwachung>Protokolldatenstrom aus.
Vergewissern Sie sich, dass der Workercontainer die Aufgaben zur Funktionsverkettung protokolliert.
Vergewissern Sie sich, dass die Beispielcontainer-App die Funktionskettenaufgaben protokolliert.
Grundlegendes zum Code
Clientprojekt
Das Clientprojekt:
- Verwendet dieselbe Verbindungszeichenfolgenlogik wie der Worker.
- Implementiert einen sequenziellen Orchestrierungsplaner, der:
- Plant 20 Orchestrierungsinstanzen, eine nach der anderen.
- Wartet 5 Sekunden zwischen der Planung der einzelnen Orchestrierungen.
- Verfolgt alle Orchestrierungsinstanzen in einer Liste nach.
- Wartet vor Beendigung darauf, dass alle Orchestrierungen abgeschlossen sind.
- Verwendet die Standardprotokollierung zum Anzeigen des Fortschritts und der Ergebnisse
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
// Create a unique instance ID
string instanceName = $"{name}_{i+1}";
// Schedule the orchestration
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"GreetingOrchestration",
instanceName);
// Wait 5 seconds before scheduling the next one
await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}
// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
id, getInputsAndOutputs: false, CancellationToken.None);
}
Arbeiterprojekt
Das Worker-Projekt enthält:
- GreetingOrchestration.cs: Definiert die Orchestrator- und Aktivitätsfunktionen in einer einzelnen Datei.
- Program.cs: Richtet den Workerhost mit entsprechender Behandlung von Verbindungszeichenfolgen ein.
Orchestrierungsimplementierung
Die Orchestrierung ruft jede Aktivität mithilfe der Standardmethode CallActivityAsync direkt in Sequenz auf:
public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
// Step 1: Say hello to the person
string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);
// Step 2: Process the greeting
string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);
// Step 3: Finalize the response
string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);
return finalResponse;
}
Jede Aktivität wird als separate Klasse implementiert, die mit dem [DurableTask] Attribut versehen ist:
[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
// Implementation details
}
Der Mitarbeiter verwendet Microsoft.Extensions.Hosting für die ordnungsgemäße Lebenszyklusverwaltung:
var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
.AddTasks(registry => {
registry.AddAllGeneratedTasks();
})
.UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();
Kunde
Das Clientprojekt:
- Verwendet dieselbe Verbindungszeichenfolgenlogik wie der Worker.
- Implementiert einen sequenziellen Orchestrierungsplaner, der:
- Plant 20 Orchestrierungsinstanzen, eine nach der anderen.
- Wartet 5 Sekunden zwischen der Planung der einzelnen Orchestrierungen.
- Verfolgt alle Orchestrierungsinstanzen in einer Liste nach.
- Wartet vor Beendigung darauf, dass alle Orchestrierungen abgeschlossen sind.
- Verwendet die Standardprotokollierung zum Anzeigen des Fortschritts und der Ergebnisse
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
try:
# Create a unique instance name
instance_name = f"{name}_{i+1}"
logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")
# Schedule the orchestration
instance_id = client.schedule_new_orchestration(
"function_chaining_orchestrator",
input=instance_name
)
instance_ids.append(instance_id)
logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")
# Wait before scheduling next orchestration (except for the last one)
if i < TOTAL_ORCHESTRATIONS - 1:
logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
try:
logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=120
)
Arbeiter
Orchestrierungsimplementierung
Die Orchestrierung ruft jede Aktivität mithilfe der Standardfunktion call_activity direkt in Sequenz auf:
# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
"""Orchestrator that demonstrates function chaining pattern."""
logger.info(f"Starting function chaining orchestration for {name}")
# Call first activity - passing input directly without named parameter
greeting = yield ctx.call_activity('say_hello', input=name)
# Call second activity with the result from first activity
processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)
# Call third activity with the result from second activity
final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)
return final_response
Jede Aktivität wird als separate Funktion implementiert:
# Activity functions
def say_hello(ctx, name: str) -> str:
"""First activity that greets the user."""
logger.info(f"Activity say_hello called with name: {name}")
return f"Hello {name}!"
def process_greeting(ctx, greeting: str) -> str:
"""Second activity that processes the greeting."""
logger.info(f"Activity process_greeting called with greeting: {greeting}")
return f"{greeting} How are you today?"
def finalize_response(ctx, response: str) -> str:
"""Third activity that finalizes the response."""
logger.info(f"Activity finalize_response called with response: {response}")
return f"{response} I hope you're doing well!"
Der Mitarbeiter verwendet DurableTaskSchedulerWorker für die ordnungsgemäße Lebenszyklusverwaltung:
with DurableTaskSchedulerWorker(
host_address=host_address,
secure_channel=endpoint != "http://localhost:8080",
taskhub=taskhub_name,
token_credential=credential
) as worker:
# Register activities and orchestrators
worker.add_activity(say_hello)
worker.add_activity(process_greeting)
worker.add_activity(finalize_response)
worker.add_orchestrator(function_chaining_orchestrator)
# Start the worker (without awaiting)
worker.start()
Die Beispielcontainer-App enthält sowohl den Worker- als auch den Clientcode.
Kunde
Der Clientcode:
- Verwendet dieselbe Verbindungszeichenfolgenlogik wie der Worker.
- Implementiert einen sequenziellen Orchestrierungsplaner, der:
- Plant 20 Orchestrierungsinstanzen, eine nach der anderen.
- Wartet 5 Sekunden zwischen der Planung der einzelnen Orchestrierungen.
- Verfolgt alle Orchestrierungsinstanzen in einer Liste nach.
- Wartet vor Beendigung darauf, dass alle Orchestrierungen abgeschlossen sind.
- Verwendet die Standardprotokollierung zum Anzeigen des Fortschritts und der Ergebnisse
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null
? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
: DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();
// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
"ActivityChaining",
new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))
Arbeiter
Die Orchestrierung ruft jede Aktivität mithilfe der Standardmethode callActivity direkt in Sequenz auf:
DurableTaskGrpcWorker worker = (credential != null
? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
: DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "ActivityChaining"; }
@Override
public TaskOrchestration create() {
return ctx -> {
String input = ctx.getInput(String.class);
String x = ctx.callActivity("Reverse", input, String.class).await();
String y = ctx.callActivity("Capitalize", x, String.class).await();
String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
ctx.complete(z);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "Reverse"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringBuilder builder = new StringBuilder(input);
builder.reverse();
return builder.toString();
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "Capitalize"; }
@Override
public TaskActivity create() {
return ctx -> ctx.getInput(String.class).toUpperCase();
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "ReplaceWhitespace"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
return input.trim().replaceAll("\\s", "-");
};
}
})
.build();
// Start the worker
worker.start();