Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Lernprogramm erfahren Sie, wie Sie einen Workflow mit Verzweigungslogik mithilfe von Agent Framework erstellen. Die Verzweigungslogik ermöglicht Es Ihrem Workflow, Entscheidungen auf der Grundlage bestimmter Bedingungen zu treffen, wodurch komplexeres und dynamischeres Verhalten ermöglicht wird.
Bedingte Kanten
Bedingte Kanten ermöglichen es Ihrem Workflow, Routingentscheidungen basierend auf dem Inhalt oder den Eigenschaften von Nachrichten zu treffen, die über den Workflow fließen. Dies ermöglicht die dynamische Verzweigung, bei der verschiedene Ausführungspfade basierend auf Laufzeitbedingungen verwendet werden.
Was Sie erstellen werden
Sie erstellen einen E-Mail-Verarbeitungsworkflow, der das bedingte Routing veranschaulicht:
- Ein Spamerkennungs-Agent, der eingehende E-Mails analysiert und strukturierte JSON zurückgibt.
- Bedingte Kanten, die E-Mails basierend auf der Klassifizierung an verschiedene Handler weiterleiten.
- Ein legitimer E-Mail-Handler, der professionelle Antworten erstellt.
- Ein Spamhandler, der verdächtige E-Mails kennzeichnet.
- Gemeinsame Zustandsverwaltung zum Speichern von E-Mail-Daten zwischen Workflowschritten.
Behandelte Konzepte
Voraussetzungen
- .NET 8.0 SDK oder höher.
- Azure OpenAI-Dienstendpunkt und -Bereitstellung konfiguriert.
- Azure CLI installiert und authentifiziert (für die Azure-Anmeldeinformationsauthentifizierung).
- Grundlegendes Verständnis der C#- und asynchronen Programmierung.
- Eine neue Konsolenanwendung.
Installieren von NuGet-Paketen
Installieren Sie zunächst die erforderlichen Pakete für Ihr .NET-Projekt:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
Definieren von Datenmodellen
Definieren Sie zunächst die Datenstrukturen, die durch Ihren Workflow fließen:
using System.Text.Json.Serialization;
/// <summary>
/// Represents the result of spam detection.
/// </summary>
public sealed class DetectionResult
{
[JsonPropertyName("is_spam")]
public bool IsSpam { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Email ID is generated by the executor, not the agent
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents an email.
/// </summary>
internal sealed class Email
{
[JsonPropertyName("email_id")]
public string EmailId { get; set; } = string.Empty;
[JsonPropertyName("email_content")]
public string EmailContent { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
public const string EmailStateScope = "EmailState";
}
Erstellen von Bedingungsfunktionen
Die Bedingungsfunktion wertet das Spamerkennungsergebnis aus, um zu bestimmen, welchen Pfad der Workflow übernehmen soll:
/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedResult">The expected spam detection result</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(bool expectedResult) =>
detectionResult => detectionResult is DetectionResult result && result.IsSpam == expectedResult;
Diese Bedingungsfunktion:
- Verwendet einen
bool expectedResultParameter (true für Spam, false für Nichtspam) - Gibt eine Funktion zurück, die als Randbedingung verwendet werden kann
- Überprüft sicher, ob die Nachricht ein
DetectionResultist und dieIsSpam-Eigenschaft vergleicht.
KI-Agenten erstellen
Richten Sie die KI-Agents ein, die die Spamerkennung und E-Mail-Unterstützung behandeln:
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
/// <summary>
/// Creates a spam detection agent.
/// </summary>
/// <returns>A ChatClientAgent configured for spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(DetectionResult)))
}
});
/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft professional responses to emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(EmailResponse)))
}
});
Implementieren von Executors
Erstellen Sie die Workflowausführer, die unterschiedliche Phasen der E-Mail-Verarbeitung behandeln:
using Microsoft.Agents.AI.Workflows;
using System.Text.Json;
/// <summary>
/// Executor that detects spam using an AI agent.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
this._spamDetectionAgent = spamDetectionAgent;
}
public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content to shared state
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for spam detection
var response = await this._spamDetectionAgent.RunAsync(message);
var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
detectionResult!.EmailId = newEmail.EmailId;
return detectionResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.IsSpam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope)
?? throw new InvalidOperationException("Email not found.");
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}");
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.IsSpam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
Den Workflow mit bedingten Kanten erstellen
Erstellen Sie nun das Hauptprogramm, das den Workflow erstellt und ausführt:
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
// Create executors
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
// Build the workflow with conditional edges
var workflow = new WorkflowBuilder(spamDetectionExecutor)
// Non-spam path: route to email assistant when IsSpam = false
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
// Spam path: route to spam handler when IsSpam = true
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
.Build();
// Execute the workflow with sample spam email
string emailContent = "Congratulations! You've won $1,000,000! Click here to claim your prize now!";
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, emailContent));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent}");
}
}
}
}
Funktionsweise
Workfloweintrag: Der Workflow beginnt mit
spamDetectionExecutordem Empfangen einesChatMessage.Spamanalyse: Der Spamerkennungs-Agent analysiert die E-Mail und gibt eine strukturierte
DetectionResultmitIsSpamundReasonEigenschaften zurück.Bedingtes Routing: Basierend auf dem
IsSpamWert:-
Bei Spam (
IsSpam = true): Leitet anHandleSpamExecutorweiter mitGetCondition(true) -
Falls legitim (
IsSpam = false): Routen zuEmailAssistantExecutormitGetCondition(false)
-
Bei Spam (
Antwortgenerierung: Für legitime E-Mails erstellt der E-Mail-Assistent eine professionelle Antwort.
Endgültige Ausgabe: Der Workflow liefert entweder eine Spambenachrichtigung oder sendet die entwurfsierte E-Mail-Antwort.
Wichtige Merkmale bedingter Kanten
Type-Safe Bedingungen: Die
GetConditionMethode erstellt wiederverwendbare Bedingungsfunktionen, die Inhalte von Nachrichten sicher auswerten.Mehrere Pfade: Ein einzelner Executor kann mehrere ausgehende Kanten mit unterschiedlichen Bedingungen aufweisen, wodurch komplexe Verzweigungslogik ermöglicht wird.
Freigegebener Status: E-Mail-Daten werden über die Bereichsstatusverwaltung hinweg beibehalten, sodass nachgeschaltete Executoren auf ursprüngliche Inhalte zugreifen können.
Fehlerbehandlung: Executors überprüfen ihre Eingaben und lösen aussagekräftige Ausnahmen aus, wenn unerwartete Nachrichtentypen empfangen werden.
Saubere Architektur: Jeder Executor hat eine einzige Verantwortung, wodurch der Workflow verwaltet und getestet werden kann.
Ausführen des Beispiels
Wenn Sie diesen Workflow mit der Beispielspam-E-Mail ausführen:
Email marked as spam: This email contains common spam indicators including monetary prizes, urgency tactics, and suspicious links that are typical of phishing attempts.
Versuchen Sie, den E-Mail-Inhalt in etwas Legitimes zu ändern:
string emailContent = "Hi, I wanted to follow up on our meeting yesterday and get your thoughts on the project proposal.";
Der Workflow leitet stattdessen an den E-Mail-Assistenten weiter und generiert stattdessen eine professionelle Antwort.
Dieses bedingte Routingmuster bildet die Grundlage für die Erstellung komplexer Workflows, die komplexe Entscheidungsstrukturen und Geschäftslogik verarbeiten können.
Vollständige Implementierung
Die vollständige funktionierende Implementierung finden Sie in diesem Beispiel im Agent Framework-Repository.
Was Sie erstellen werden
Sie erstellen einen E-Mail-Verarbeitungsworkflow, der das bedingte Routing veranschaulicht:
- Ein Spamerkennungs-Agent, der eingehende E-Mails analysiert
- Bedingte Kanten, die E-Mails basierend auf der Klassifizierung an verschiedene Handler weiterleiten
- Ein legitimer E-Mail-Handler, der professionelle Antworten erstellt
- Ein Spamhandler, der verdächtige E-Mails markiert
Behandelte Konzepte
Voraussetzungen
- Python 3.10 oder höher
- Agent Framework installiert:
pip install agent-framework-core --pre - Azure OpenAI-Dienst mit ordnungsgemäßen Umgebungsvariablen konfiguriert
- Azure CLI-Authentifizierung:
az login
Schritt 1: Importieren erforderlicher Abhängigkeiten
Importieren Sie zunächst die erforderlichen Komponenten für bedingte Workflows:
import asyncio
import os
from dataclasses import dataclass
from typing import Any, Literal
from uuid import uuid4
from typing_extensions import Never
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatMessage,
Role,
WorkflowBuilder,
WorkflowContext,
executor,
Case,
Default,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel
Schritt 2: Definieren von Datenmodellen
Erstellen Sie Pydantische Modelle für den strukturierten Datenaustausch zwischen Workflowkomponenten:
class DetectionResult(BaseModel):
"""Represents the result of spam detection."""
# is_spam drives the routing decision taken by edge conditions
is_spam: bool
# Human readable rationale from the detector
reason: str
# The agent must include the original email so downstream agents can operate without reloading content
email_content: str
class EmailResponse(BaseModel):
"""Represents the response from the email assistant."""
# The drafted reply that a user could copy or send
response: str
Schritt 3: Erstellen von Bedingungsfunktionen
Definieren Sie Bedingungsfunktionen, die Routingentscheidungen bestimmen:
def get_condition(expected_result: bool):
"""Create a condition callable that routes based on DetectionResult.is_spam."""
# The returned function will be used as an edge predicate.
# It receives whatever the upstream executor produced.
def condition(message: Any) -> bool:
# Defensive guard. If a non AgentExecutorResponse appears, let the edge pass to avoid dead ends.
if not isinstance(message, AgentExecutorResponse):
return True
try:
# Prefer parsing a structured DetectionResult from the agent JSON text.
# Using model_validate_json ensures type safety and raises if the shape is wrong.
detection = DetectionResult.model_validate_json(message.agent_run_response.text)
# Route only when the spam flag matches the expected path.
return detection.is_spam == expected_result
except Exception:
# Fail closed on parse errors so we do not accidentally route to the wrong path.
# Returning False prevents this edge from activating.
return False
return condition
Schritt 4: Erstellen von Handler-Exekutoren
Definieren von Executors zur Behandlung unterschiedlicher Routing-Ergebnisse
@executor(id="send_email")
async def handle_email_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Handle legitimate emails by drafting a professional response."""
# Downstream of the email assistant. Parse a validated EmailResponse and yield the workflow output.
email_response = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent:\n{email_response.response}")
@executor(id="handle_spam")
async def handle_spam_classifier_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Handle spam emails by marking them appropriately."""
# Spam path. Confirm the DetectionResult and yield the workflow output. Guard against accidental non spam input.
detection = DetectionResult.model_validate_json(response.agent_run_response.text)
if detection.is_spam:
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
# This indicates the routing predicate and executor contract are out of sync.
raise RuntimeError("This executor should only handle spam messages.")
@executor(id="to_email_assistant_request")
async def to_email_assistant_request(
response: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorRequest]
) -> None:
"""Transform spam detection response into a request for the email assistant."""
# Parse the detection result and extract the email content for the assistant
detection = DetectionResult.model_validate_json(response.agent_run_response.text)
# Create a new request for the email assistant with the original email content
request = AgentExecutorRequest(
messages=[ChatMessage(Role.USER, text=detection.email_content)],
should_respond=True
)
await ctx.send_message(request)
Schritt 5: Erstellen von KI-Agents
Richten Sie die Azure OpenAI-Agents mit strukturierter Ausgabeformatierung ein:
async def main() -> None:
# Create agents
# AzureCliCredential uses your current az login. This avoids embedding secrets in code.
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Agent 1. Classifies spam and returns a DetectionResult object.
# response_format enforces that the LLM returns parsable JSON for the Pydantic model.
spam_detection_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
"Include the original email content in email_content."
),
response_format=DetectionResult,
),
id="spam_detection_agent",
)
# Agent 2. Drafts a professional reply. Also uses structured JSON output for reliability.
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft professional responses to emails. "
"Your input might be a JSON object that includes 'email_content'; base your reply on that content. "
"Return JSON with a single field 'response' containing the drafted reply."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
Schritt 6: Erstellen des bedingten Workflows
Erstellen Sie einen Workflow mit bedingten Kanten, die basierend auf den Ergebnissen der Spamerkennung weitergeleitet werden:
# Build the workflow graph.
# Start at the spam detector.
# If not spam, hop to a transformer that creates a new AgentExecutorRequest,
# then call the email assistant, then finalize.
# If spam, go directly to the spam handler and finalize.
workflow = (
WorkflowBuilder()
.set_start_executor(spam_detection_agent)
# Not spam path: transform response -> request for assistant -> assistant -> send email
.add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False))
.add_edge(to_email_assistant_request, email_assistant_agent)
.add_edge(email_assistant_agent, handle_email_response)
# Spam path: send to spam handler
.add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True))
.build()
)
Schritt 7: Ausführen des Workflows
Führen Sie den Workflow mit Beispiel-E-Mail-Inhalten aus:
# Read Email content from the sample resource file.
# This keeps the sample deterministic since the model sees the same email every run.
email_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "resources", "email.txt")
with open(email_path) as email_file: # noqa: ASYNC230
email = email_file.read()
# Execute the workflow. Since the start is an AgentExecutor, pass an AgentExecutorRequest.
# The workflow completes when it becomes idle (no more work to do).
request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email)], should_respond=True)
events = await workflow.run(request)
outputs = events.get_outputs()
if outputs:
print(f"Workflow output: {outputs[0]}")
if __name__ == "__main__":
asyncio.run(main())
Funktionsweise von bedingten Kanten
Bedingungsfunktionen: Die
get_condition()Funktion erstellt ein Prädikat, das den Nachrichteninhalt untersucht und zurückgibtTrueoderFalseob der Rand durchlaufen werden soll.Nachrichtenüberprüfung: Bedingungen können jeden Aspekt der Nachricht prüfen, einschließlich strukturierter Daten aus Agentantworten, die mit Pydantischen Modellen analysiert wurden.
Defensive Programmierung: Die Bedingungsfunktion enthält fehlerbehandlung, um Routingfehler beim Analysieren strukturierter Daten zu verhindern.
Dynamisches Routing: Basierend auf dem Spamerkennungsergebnis werden E-Mails automatisch an den E-Mail-Assistenten (für legitime E-Mails) oder den Spamhandler (für verdächtige E-Mails) weitergeleitet.
Wichtige Konzepte
- Edgebedingungen: Boolesche Prädikate, die bestimmen, ob ein Rand durchlaufen werden soll
-
Strukturierte Ausgaben: Die Verwendung von Pydantic-Modellen mit
response_formatsorgt für zuverlässiges Parsing von Daten. - Defensive Routing: Bedingungsfunktionen behandeln Randfälle, um Workflow-Sackgassen zu verhindern
- Nachrichtentransformation: Executors können Nachrichtentypen zwischen Workflowschritten transformieren.
Vollständige Implementierung
Die vollständige Arbeitsimplementierung finden Sie im edge_condition.py Beispiel im Agent Framework-Repository.
Switch-Case Verzweigungen
Erstellen von bedingten Kanten
Im vorherigen Beispiel für bedingte Kanten wurde das bidirektionale Routing (Spam und legitime E-Mails) veranschaulicht. Viele reale Szenarien erfordern jedoch anspruchsvollere Entscheidungsstrukturen. Switch-Case-Kanten bieten eine übersichtlichere, wartungsfreundlichere Lösung, wenn Sie basierend auf unterschiedlichen Bedingungen zu mehreren Zielen geleitet werden müssen.
Was Sie mit Switch-Case entwickeln werden
Sie erweitern den E-Mail-Verarbeitungsworkflow, um drei Entscheidungspfade zu verarbeiten:
- NotSpam → E-Mail-Assistent → Senden von E-Mails
- Spam -→ Behandeln von Spamausführern
- Unsicher → Unsicheren Executor behandeln (Standardfall)
Die wichtigste Verbesserung ist die Verwendung des SwitchBuilder Musters anstelle mehrerer einzelner bedingter Kanten, wodurch der Workflow leichter zu verstehen und zu verwalten ist, wenn die Entscheidungskomplexität wächst.
Behandelte Konzepte
Datenmodelle für Switch-Case
Aktualisieren Sie Ihre Datenmodelle, um die Drei-Wege-Klassifizierung zu unterstützen:
/// <summary>
/// Represents the possible decisions for spam detection.
/// </summary>
public enum SpamDecision
{
NotSpam,
Spam,
Uncertain
}
/// <summary>
/// Represents the result of spam detection with enhanced decision support.
/// </summary>
public sealed class DetectionResult
{
[JsonPropertyName("spam_decision")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public SpamDecision spamDecision { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Email ID is generated by the executor, not the agent
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents an email stored in shared state.
/// </summary>
internal sealed class Email
{
[JsonPropertyName("email_id")]
public string EmailId { get; set; } = string.Empty;
[JsonPropertyName("email_content")]
public string EmailContent { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
public const string EmailStateScope = "EmailState";
}
Bedingungsfabrik für switch-case
Erstellen Sie eine wiederverwendbare Bedingungsfactory, die Prädikate für jede Spamentscheidung generiert:
/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedDecision">The expected spam detection decision</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(SpamDecision expectedDecision) =>
detectionResult => detectionResult is DetectionResult result && result.spamDecision == expectedDecision;
Dieser Fabrikansatz:
- Reduziert die Codeduplizierung: Eine Funktion generiert alle Bedingungs-Prädikate.
- Stellt Konsistenz sicher: Alle Bedingungen folgen demselben Muster.
- Vereinfacht die Wartung: Änderungen an der Bedingungslogik erfolgen an einer zentralen Stelle.
Erweiterter KI-Agent
Aktualisieren Sie den Spamerkennungs-Agent so, dass er weniger sicher ist und Drei-Wege-Klassifizierungen zurückgibt:
/// <summary>
/// Creates a spam detection agent with enhanced uncertainty handling.
/// </summary>
/// <returns>A ChatClientAgent configured for three-way spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails. Be less confident in your assessments.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
}
});
/// <summary>
/// Creates an email assistant agent (unchanged from conditional edges example).
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
});
Workflowausführer mit erweitertem Routing
Implementieren Sie Ausführungsinstanzen, die das Drei-Wege-Routing mit der Verwaltung eines gemeinsamen Zustands bearbeiten.
/// <summary>
/// Executor that detects spam using an AI agent with three-way classification.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
this._spamDetectionAgent = spamDetectionAgent;
}
public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content in shared state
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for enhanced spam detection
var response = await this._spamDetectionAgent.RunAsync(message);
var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
detectionResult!.EmailId = newEmail.EmailId;
return detectionResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}").ConfigureAwait(false);
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}").ConfigureAwait(false);
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
/// <summary>
/// Executor that handles uncertain emails requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<DetectionResult>
{
public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Uncertain)
{
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
}
else
{
throw new ArgumentException("This executor should only handle uncertain spam decisions.");
}
}
}
Workflow mit Switch-Case-Struktur erstellen
Ersetzen Sie mehrere bedingte Kanten durch das übersichtlichere Switch-Case-Muster:
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
// Create executors
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();
// Build the workflow using switch-case for cleaner three-way routing
WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(
GetCondition(expectedDecision: SpamDecision.NotSpam),
emailAssistantExecutor
)
.AddCase(
GetCondition(expectedDecision: SpamDecision.Spam),
handleSpamExecutor
)
.WithDefault(
handleUncertainExecutor
)
)
// After the email assistant writes a response, it will be sent to the send email executor
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor);
var workflow = builder.Build();
// Read an email from a text file (use ambiguous content for demonstration)
string email = Resources.Read("ambiguous_email.txt");
// Execute the workflow
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent}");
}
}
}
}
Switch-Case Vorteile
-
Übersichtlichere Syntax: Dies
SwitchBuilderbietet eine besser lesbare Alternative zu mehreren bedingten Kanten - Sortierte Auswertung: Fälle werden sequenziell ausgewertet und bei der ersten Übereinstimmung beendet.
-
Garantiertes Routing: Die
WithDefault()Methode stellt sicher, dass Nachrichten nie hängen bleiben - Bessere Wartung: Das Hinzufügen neuer Fälle erfordert minimale Änderungen an der Workflowstruktur.
- Typsicherheit: Jeder Executor überprüft seine Eingabe, um Routingfehler frühzeitig abzufangen.
Mustervergleich
Vorher (Bedingte Kanten):
var workflow = new WorkflowBuilder(spamDetectionExecutor)
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
// No clean way to handle a third case
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
.Build();
Nach (Switch-Case):
WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
.AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
.WithDefault(handleUncertainExecutor) // Clean default case
)
// Continue building the rest of the workflow
Das Switch-Case-Muster wird wesentlich besser skaliert, da die Anzahl der Routingentscheidungen wächst, und der Standardfall bietet ein Sicherheitsnetz für unerwartete Werte.
Ausführen des Beispiels
Wenn Sie diesen Workflow mit mehrdeutigen E-Mail-Inhalten ausführen:
Email marked as uncertain: This email contains promotional language but might be from a legitimate business contact, requiring human review for proper classification.
Versuchen Sie, den E-Mail-Inhalt in etwas eindeutig Spam oder eindeutig legitim zu ändern, um die verschiedenen Routingpfade in Aktion zu erleben.
Vollständige Implementierung
Die vollständige funktionierende Implementierung finden Sie in diesem Beispiel im Agent Framework-Repository.
Erstellen von bedingten Kanten
Im vorherigen Beispiel für bedingte Kanten wurde das bidirektionale Routing (Spam und legitime E-Mails) veranschaulicht. Viele reale Szenarien erfordern jedoch anspruchsvollere Entscheidungsstrukturen. Switch-Case-Kanten bieten eine übersichtlichere, wartungsfreundlichere Lösung, wenn Sie basierend auf unterschiedlichen Bedingungen zu mehreren Zielen geleitet werden müssen.
Was Sie als Nächstes erstellen
Sie erweitern den E-Mail-Verarbeitungsworkflow, um drei Entscheidungspfade zu verarbeiten:
- NotSpam → E-Mail-Assistent → Senden von E-Mails
- Spam → Als Spam markieren
- Unsicher → Flag für manuelle Überprüfung (Standardfall)
Die Hauptverbesserung besteht darin, anstelle mehrerer einzelner bedingter Kanten eine einzige Switch-Case-Edgegruppe zu verwenden, wodurch der Workflow mit zunehmender Entscheidungskomplexität einfacher zu verstehen und zu verwalten ist.
Behandelte Konzepte
Erweiterte Datenmodelle
Aktualisieren Sie Ihre Datenmodelle, um die Drei-Wege-Klassifizierung zu unterstützen:
from typing import Literal
class DetectionResultAgent(BaseModel):
"""Structured output returned by the spam detection agent."""
# The agent classifies the email into one of three categories
spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
reason: str
class EmailResponse(BaseModel):
"""Structured output returned by the email assistant agent."""
response: str
@dataclass
class DetectionResult:
"""Internal typed payload used for routing and downstream handling."""
spam_decision: str
reason: str
email_id: str
@dataclass
class Email:
"""In memory record of the email content stored in shared state."""
email_id: str
email_content: str
Switch-Case Bedingungsfactory
Erstellen Sie eine wiederverwendbare Bedingungsfactory, die Prädikate für jede Spamentscheidung generiert:
def get_case(expected_decision: str):
"""Factory that returns a predicate matching a specific spam_decision value."""
def condition(message: Any) -> bool:
# Only match when the upstream payload is a DetectionResult with the expected decision
return isinstance(message, DetectionResult) and message.spam_decision == expected_decision
return condition
Dieser Fabrikansatz:
- Reduziert die Codeduplizierung: Eine Funktion generiert alle Bedingungs-Prädikate.
- Stellt Konsistenz sicher: Alle Bedingungen folgen demselben Muster.
- Vereinfacht die Wartung: Änderungen an der Bedingungslogik erfolgen an einer zentralen Stelle.
Workflowausführer mit freigegebenem Status
Implementieren Sie Executoren, die den freigegebenen Zustand verwenden, um zu vermeiden, dass große E-Mail-Inhalte über jeden Workflowschritt übergeben werden:
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Store email content once and pass around a lightweight ID reference."""
# Persist the raw email content in shared state
new_email = Email(email_id=str(uuid4()), email_content=email_text)
await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
# Forward email to spam detection agent
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
)
@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
"""Transform agent response into a typed DetectionResult with email ID."""
# Parse the agent's structured JSON output
parsed = DetectionResultAgent.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
# Create typed message for switch-case routing
await ctx.send_message(DetectionResult(
spam_decision=parsed.spam_decision,
reason=parsed.reason,
email_id=email_id
))
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Handle NotSpam emails by forwarding to the email assistant."""
# Guard against misrouting
if detection.spam_decision != "NotSpam":
raise RuntimeError("This executor should only handle NotSpam messages.")
# Retrieve original email content from shared state
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Parse email assistant response and yield final output."""
parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle confirmed spam emails."""
if detection.spam_decision == "Spam":
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
raise RuntimeError("This executor should only handle Spam messages.")
@executor(id="handle_uncertain")
async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle uncertain classifications that need manual review."""
if detection.spam_decision == "Uncertain":
# Include original content for human review
email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.yield_output(
f"Email marked as uncertain: {detection.reason}. Email content: {getattr(email, 'email_content', '')}"
)
else:
raise RuntimeError("This executor should only handle Uncertain messages.")
Erstellen eines erweiterten KI-Agents
Aktualisieren Sie den Spamerkennungs-Agent so, dass er weniger sicher ist und Drei-Wege-Klassifizierungen zurückgibt:
async def main():
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Enhanced spam detection agent with three-way classification
spam_detection_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Be less confident in your assessments. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=DetectionResultAgent,
),
id="spam_detection_agent",
)
# Email assistant remains the same
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
Erstellen eines Workflows mit der Switch-Case-Edge-Gruppe
Ersetzen Sie mehrere bedingte Kanten durch eine einzelne Switch-Case-Gruppe:
# Build workflow using switch-case for cleaner three-way routing
workflow = (
WorkflowBuilder()
.set_start_executor(store_email)
.add_edge(store_email, spam_detection_agent)
.add_edge(spam_detection_agent, to_detection_result)
.add_switch_case_edge_group(
to_detection_result,
[
# Explicit cases for specific decisions
Case(condition=get_case("NotSpam"), target=submit_to_email_assistant),
Case(condition=get_case("Spam"), target=handle_spam),
# Default case catches anything that doesn't match above
Default(target=handle_uncertain),
],
)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
.build()
)
Ausführen und Testen
Führen Sie den Workflow mit mehrdeutigen E-Mail-Inhalten aus, die das dreiseitige Routing veranschaulicht:
# Use ambiguous email content that might trigger uncertain classification
email = (
"Hey there, I noticed you might be interested in our latest offer—no pressure, but it expires soon. "
"Let me know if you'd like more details."
)
# Execute and display results
events = await workflow.run(email)
outputs = events.get_outputs()
if outputs:
for output in outputs:
print(f"Workflow output: {output}")
Wichtige Vorteile von Switch-Case Edges
- Übersichtlichere Syntax: Eine Randgruppe anstelle mehrerer bedingter Kanten
- Sortierte Auswertung: Fälle werden sequenziell ausgewertet und bei der ersten Übereinstimmung beendet.
- Garantiertes Routing: Der Standardfall stellt sicher, dass Nachrichten nie hängen bleiben
- Bessere Wartung: Das Hinzufügen neuer Fälle erfordert minimale Änderungen.
- Typsicherheit: Jeder Executor überprüft seine Eingabe, um Routingfehler abzufangen.
Vergleich: Bedingt im Vergleich zu Switch-Case
Vorher (Bedingte Kanten):
.add_edge(detector, handler_a, condition=lambda x: x.result == "A")
.add_edge(detector, handler_b, condition=lambda x: x.result == "B")
.add_edge(detector, handler_c, condition=lambda x: x.result == "C")
Nach (Switch-Case):
.add_switch_case_edge_group(
detector,
[
Case(condition=lambda x: x.result == "A", target=handler_a),
Case(condition=lambda x: x.result == "B", target=handler_b),
Default(target=handler_c), # Catches everything else
],
)
Das Switch-Case-Muster wird wesentlich besser skaliert, da die Anzahl der Routingentscheidungen wächst, und der Standardfall bietet ein Sicherheitsnetz für unerwartete Werte.
Switch-Case Beispielcode
Die vollständige Arbeitsimplementierung finden Sie im switch_case_edge_group.py Beispiel im Agent Framework-Repository.
Ränder für mehrfache Auswahl
Über Switch-Case hinaus: Multi-Auswahl-Routing
Während Switch-Case-Edges Nachrichten an genau ein Ziel weiterleiten, müssen Workflows in der Praxis häufig mehrere parallele Vorgänge basierend auf Datenmerkmalen auslösen. Partitionierte Kanten (implementiert als Fanout-Kanten mit Partitionierern) ermöglichen anspruchsvolle Fanoutmuster, bei denen eine einzelne Nachricht mehrere nachgeschaltete Executoren gleichzeitig aktivieren kann.
Erweiterter E-Mail-Verarbeitungsworkflow
Basierend auf dem Switch-Case-Beispiel erstellen Sie ein erweitertes E-Mail-Verarbeitungssystem, das komplexe Routinglogik veranschaulicht:
- Spam-E-Mails → einzelner Spam-Handler (z. B. Switch-Case)
- Legitime E-Mails → E-Mail-Assistent immer auslösen + Zusammenfassung für lange E-Mails bedingt auslösen
- Unklare E-Mails → Ein einzelner unklarer Verwalter (z. B. switch-case)
- Datenbankpersistenz → für kurze E-Mails und zusammengefasste lange E-Mails ausgelöst
Dieses Muster ermöglicht parallele Verarbeitungspipelinen, die sich an Inhaltseigenschaften anpassen.
Behandelte Konzepte
Datenmodelle für Mehrfachauswahl
Erweitern Sie die Datenmodelle, um die Analyse und Zusammenfassung der E-Mail-Länge zu unterstützen:
/// <summary>
/// Represents the result of enhanced email analysis with additional metadata.
/// </summary>
public sealed class AnalysisResult
{
[JsonPropertyName("spam_decision")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public SpamDecision spamDecision { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Additional properties for sophisticated routing
[JsonIgnore]
public int EmailLength { get; set; }
[JsonIgnore]
public string EmailSummary { get; set; } = string.Empty;
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email summary agent.
/// </summary>
public sealed class EmailSummary
{
[JsonPropertyName("summary")]
public string Summary { get; set; } = string.Empty;
}
/// <summary>
/// A custom workflow event for database operations.
/// </summary>
internal sealed class DatabaseEvent(string message) : WorkflowEvent(message) { }
/// <summary>
/// Constants for email processing thresholds.
/// </summary>
public static class EmailProcessingConstants
{
public const int LongEmailThreshold = 100;
}
Zielzuweisungsfunktion: Das Herzstück der Mehrfachauswahl
Die Zielzuweisungsfunktion bestimmt, welche Executoren jede Nachricht empfangen sollen:
/// <summary>
/// Creates a target assigner for routing messages based on the analysis result.
/// </summary>
/// <returns>A function that takes an analysis result and returns the target partitions.</returns>
private static Func<AnalysisResult?, int, IEnumerable<int>> GetTargetAssigner()
{
return (analysisResult, targetCount) =>
{
if (analysisResult is not null)
{
if (analysisResult.spamDecision == SpamDecision.Spam)
{
return [0]; // Route only to spam handler (index 0)
}
else if (analysisResult.spamDecision == SpamDecision.NotSpam)
{
// Always route to email assistant (index 1)
List<int> targets = [1];
// Conditionally add summarizer for long emails (index 2)
if (analysisResult.EmailLength > EmailProcessingConstants.LongEmailThreshold)
{
targets.Add(2);
}
return targets;
}
else // Uncertain
{
return [3]; // Route only to uncertain handler (index 3)
}
}
throw new ArgumentException("Invalid analysis result.");
};
}
Wichtige Features der Zielzuweisungsfunktion
- Dynamische Zielauswahl: Gibt eine Liste der zu aktivierenden Executorindizes zurück.
- Inhaltsfähiges Routing: Trifft Entscheidungen basierend auf Nachrichteneigenschaften wie E-Mail-Länge
- Parallele Verarbeitung: Mehrere Ziele können gleichzeitig ausgeführt werden
- Bedingte Logik: Komplexe Verzweigung basierend auf mehreren Kriterien
Erweiterte Workflow-Executoren
Implementieren Sie Executoren, die die erweiterte Analyse und das Routing behandeln:
/// <summary>
/// Executor that analyzes emails using an AI agent with enhanced analysis.
/// </summary>
internal sealed class EmailAnalysisExecutor : Executor<ChatMessage, AnalysisResult>
{
private readonly AIAgent _emailAnalysisAgent;
public EmailAnalysisExecutor(AIAgent emailAnalysisAgent) : base("EmailAnalysisExecutor")
{
this._emailAnalysisAgent = emailAnalysisAgent;
}
public override async ValueTask<AnalysisResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for enhanced analysis
var response = await this._emailAnalysisAgent.RunAsync(message);
var analysisResult = JsonSerializer.Deserialize<AnalysisResult>(response.Text);
// Enrich with metadata for routing decisions
analysisResult!.EmailId = newEmail.EmailId;
analysisResult.EmailLength = newEmail.EmailContent.Length;
return analysisResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<AnalysisResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that summarizes emails using an AI agent for long emails.
/// </summary>
internal sealed class EmailSummaryExecutor : Executor<AnalysisResult, AnalysisResult>
{
private readonly AIAgent _emailSummaryAgent;
public EmailSummaryExecutor(AIAgent emailSummaryAgent) : base("EmailSummaryExecutor")
{
this._emailSummaryAgent = emailSummaryAgent;
}
public override async ValueTask<AnalysisResult> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Read the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Generate summary for long emails
var response = await this._emailSummaryAgent.RunAsync(email!.EmailContent);
var emailSummary = JsonSerializer.Deserialize<EmailSummary>(response.Text);
// Enrich the analysis result with the summary
message.EmailSummary = emailSummary!.Summary;
return message;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}");
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<AnalysisResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
/// <summary>
/// Executor that handles uncertain messages requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<AnalysisResult>
{
public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Uncertain)
{
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
}
else
{
throw new ArgumentException("This executor should only handle uncertain spam decisions.");
}
}
}
/// <summary>
/// Executor that handles database access with custom events.
/// </summary>
internal sealed class DatabaseAccessExecutor : Executor<AnalysisResult>
{
public DatabaseAccessExecutor() : base("DatabaseAccessExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Simulate database operations
await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await Task.Delay(100); // Simulate database access delay
// Emit custom database event for monitoring
await context.AddEventAsync(new DatabaseEvent($"Email {message.EmailId} saved to database."));
}
}
Erweiterte KI-Agenten
Erstellen Sie Agents für Analyse, Unterstützung und Zusammenfassung:
/// <summary>
/// Create an enhanced email analysis agent.
/// </summary>
/// <returns>A ChatClientAgent configured for comprehensive email analysis</returns>
private static ChatClientAgent GetEmailAnalysisAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<AnalysisResult>()
}
});
/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
});
/// <summary>
/// Creates an agent that summarizes emails.
/// </summary>
/// <returns>A ChatClientAgent configured for email summarization</returns>
private static ChatClientAgent GetEmailSummaryAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an assistant that helps users summarize emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailSummary>()
}
});
Konstruktion von Workflows mit Mehrfachauswahl
Erstellen Sie den Workflow mit anspruchsvollem Routing und paralleler Verarbeitung:
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent emailAnalysisAgent = GetEmailAnalysisAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
AIAgent emailSummaryAgent = GetEmailSummaryAgent(chatClient);
// Create executors
var emailAnalysisExecutor = new EmailAnalysisExecutor(emailAnalysisAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var emailSummaryExecutor = new EmailSummaryExecutor(emailSummaryAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();
var databaseAccessExecutor = new DatabaseAccessExecutor();
// Build the workflow with multi-selection fan-out
WorkflowBuilder builder = new(emailAnalysisExecutor);
builder.AddFanOutEdge(
emailAnalysisExecutor,
targets: [
handleSpamExecutor, // Index 0: Spam handler
emailAssistantExecutor, // Index 1: Email assistant (always for NotSpam)
emailSummaryExecutor, // Index 2: Summarizer (conditionally for long NotSpam)
handleUncertainExecutor, // Index 3: Uncertain handler
],
targetSelector: GetTargetAssigner()
)
// Email assistant branch
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
// Database persistence: conditional routing
.AddEdge<AnalysisResult>(
emailAnalysisExecutor,
databaseAccessExecutor,
condition: analysisResult => analysisResult?.EmailLength <= EmailProcessingConstants.LongEmailThreshold) // Short emails
.AddEdge(emailSummaryExecutor, databaseAccessExecutor) // Long emails with summary
.WithOutputFrom(handleUncertainExecutor, handleSpamExecutor, sendEmailExecutor);
var workflow = builder.Build();
// Read a moderately long email to trigger both assistant and summarizer
string email = Resources.Read("email.txt");
// Execute the workflow with custom event handling
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"Output: {outputEvent}");
}
if (evt is DatabaseEvent databaseEvent)
{
Console.WriteLine($"Database: {databaseEvent}");
}
}
}
}
Mustervergleich: Mehrfachauswahl im Vergleich zu Switch-Case
Switch-Case-Struktur (vorherig):
// One input → exactly one output
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
.AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
.WithDefault(handleUncertainExecutor)
)
Mehrfachauswahlmuster:
// One input → one or more outputs (dynamic fan-out)
builder.AddFanOutEdge(
emailAnalysisExecutor,
targets: [handleSpamExecutor, emailAssistantExecutor, emailSummaryExecutor, handleUncertainExecutor],
targetSelector: GetTargetAssigner() // Returns list of target indices
)
Wichtige Vorteile von Mehrfachauswahl-Rändern
- Parallele Verarbeitung: Mehrere Zweige können gleichzeitig ausgeführt werden
- Bedingter Fanout: Die Anzahl der Ziele variiert je nach Inhalt.
- Inhaltsfähiges Routing: Entscheidungen basierend auf Nachrichteneigenschaften, nicht nur Typ
- Effiziente Ressourcennutzung: Nur notwendige Verzweigungen werden aktiviert.
- Komplexe Geschäftslogik: Unterstützt anspruchsvolle Routingszenarien
Ausführen des Beispiels für mehrfache Auswahl
Wenn Sie diesen Workflow mit einer langen E-Mail ausführen:
Output: Email sent: [Professional response generated by AI]
Database: Email abc123 saved to database.
Beim Ausführen mit einer kurzen E-Mail wird die Zusammenfassung übersprungen.
Output: Email sent: [Professional response generated by AI]
Database: Email def456 saved to database.
Real-World-Anwendungsfälle
- E-Mail-Systeme: Weiterleitung zum Antwortassistent + Archiv + Analytics (bedingt)
- Inhaltsverarbeitung: Triggertranskription + Übersetzung + Analyse (basierend auf Dem Inhaltstyp)
- Auftragsabwicklung: Weiterleitung zur Erfüllung + Abrechnung + Benachrichtigungen (basierend auf Auftragsmerkmalen)
- Datenpipelines: Auslösen unterschiedlicher Analyseflüsse basierend auf Datenmerkmalen
Vollständige Implementierung der Mehrfachauswahl
Die vollständige funktionierende Implementierung finden Sie in diesem Beispiel im Agent Framework-Repository.
Über Switch-Case hinaus: Multi-Auswahl-Routing
Während Switch-Case-Edges Nachrichten an genau ein Ziel weiterleiten, müssen Workflows in der Praxis häufig mehrere parallele Vorgänge basierend auf Datenmerkmalen auslösen. Partitionierte Kanten (implementiert als Multiauswahl-Edgegruppen) ermöglichen anspruchsvolle Fanoutmuster, bei denen eine einzelne Nachricht mehrere nachgeschaltete Executoren gleichzeitig aktivieren kann.
Erweiterter E-Mail-Verarbeitungsworkflow
Basierend auf dem Switch-Case-Beispiel erstellen Sie ein erweitertes E-Mail-Verarbeitungssystem, das komplexe Routinglogik veranschaulicht:
- Spam-E-Mails → einzelner Spam-Handler (z. B. Switch-Case)
- Legitime E-Mails → E-Mail-Assistent immer auslösen + Zusammenfassung für lange E-Mails bedingt auslösen
- Unklare E-Mails → Ein einzelner unklarer Verwalter (z. B. switch-case)
- Datenbankpersistenz → für kurze E-Mails und zusammengefasste lange E-Mails ausgelöst
Dieses Muster ermöglicht parallele Verarbeitungspipelinen, die sich an Inhaltseigenschaften anpassen.
Behandelte Konzepte
Erweiterte Datenmodelle für mehrfache Auswahl
Erweitern Sie die Datenmodelle, um die Analyse und Zusammenfassung der E-Mail-Länge zu unterstützen:
class AnalysisResultAgent(BaseModel):
"""Enhanced structured output from email analysis agent."""
spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
reason: str
class EmailResponse(BaseModel):
"""Response from email assistant."""
response: str
class EmailSummaryModel(BaseModel):
"""Summary generated by email summary agent."""
summary: str
@dataclass
class AnalysisResult:
"""Internal analysis result with email metadata for routing decisions."""
spam_decision: str
reason: str
email_length: int # Used for conditional routing
email_summary: str # Populated by summary agent
email_id: str
@dataclass
class Email:
"""Email content stored in shared state."""
email_id: str
email_content: str
# Custom event for database operations
class DatabaseEvent(WorkflowEvent):
"""Custom event for tracking database operations."""
pass
Auswahlfunktion: Das Herzstück der Mehrfachauswahl
Die Auswahlfunktion bestimmt, welche Executoren jede Nachricht empfangen sollen:
LONG_EMAIL_THRESHOLD = 100
def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str]:
"""Intelligent routing based on spam decision and email characteristics."""
# Target order: [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain]
handle_spam_id, submit_to_email_assistant_id, summarize_email_id, handle_uncertain_id = target_ids
if analysis.spam_decision == "Spam":
# Route only to spam handler
return [handle_spam_id]
elif analysis.spam_decision == "NotSpam":
# Always route to email assistant
targets = [submit_to_email_assistant_id]
# Conditionally add summarizer for long emails
if analysis.email_length > LONG_EMAIL_THRESHOLD:
targets.append(summarize_email_id)
return targets
else: # Uncertain
# Route only to uncertain handler
return [handle_uncertain_id]
Wichtige Features von Auswahlfunktionen
- Dynamische Zielauswahl: Gibt eine Liste der zu aktivierenden Executor-IDs zurück.
- Inhaltsfähiges Routing: Trifft Entscheidungen basierend auf Nachrichteneigenschaften
- Parallele Verarbeitung: Mehrere Ziele können gleichzeitig ausgeführt werden
- Bedingte Logik: Komplexe Verzweigung basierend auf mehreren Kriterien
Workflowausführer mit mehrfacher Auswahl
Implementieren Sie Executoren, die die erweiterte Analyse und das Routing behandeln:
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Store email and initiate analysis."""
new_email = Email(email_id=str(uuid4()), email_content=email_text)
await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
)
@executor(id="to_analysis_result")
async def to_analysis_result(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
"""Transform agent response into enriched analysis result."""
parsed = AnalysisResultAgent.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")
# Create enriched analysis result with email length for routing decisions
await ctx.send_message(
AnalysisResult(
spam_decision=parsed.spam_decision,
reason=parsed.reason,
email_length=len(email.email_content), # Key for conditional routing
email_summary="",
email_id=email_id,
)
)
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Handle legitimate emails by forwarding to email assistant."""
if analysis.spam_decision != "NotSpam":
raise RuntimeError("This executor should only handle NotSpam messages.")
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Final step for email assistant branch."""
parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="summarize_email")
async def summarize_email(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Generate summary for long emails (parallel branch)."""
# Only called for long NotSpam emails by selection function
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="merge_summary")
async def merge_summary(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
"""Merge summary back into analysis result for database persistence."""
summary = EmailSummaryModel.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")
# Create analysis result with summary for database storage
await ctx.send_message(
AnalysisResult(
spam_decision="NotSpam",
reason="",
email_length=len(email.email_content),
email_summary=summary.summary, # Now includes summary
email_id=email_id,
)
)
@executor(id="handle_spam")
async def handle_spam(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle spam emails (single target like switch-case)."""
if analysis.spam_decision == "Spam":
await ctx.yield_output(f"Email marked as spam: {analysis.reason}")
else:
raise RuntimeError("This executor should only handle Spam messages.")
@executor(id="handle_uncertain")
async def handle_uncertain(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle uncertain emails (single target like switch-case)."""
if analysis.spam_decision == "Uncertain":
email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.yield_output(
f"Email marked as uncertain: {analysis.reason}. Email content: {getattr(email, 'email_content', '')}"
)
else:
raise RuntimeError("This executor should only handle Uncertain messages.")
@executor(id="database_access")
async def database_access(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Simulate database persistence with custom events."""
await asyncio.sleep(0.05) # Simulate DB operation
await ctx.add_event(DatabaseEvent(f"Email {analysis.email_id} saved to database."))
Erweiterte KI-Agenten
Erstellen Sie Agents für Analyse, Unterstützung und Zusammenfassung:
async def main() -> None:
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Enhanced analysis agent
email_analysis_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=AnalysisResultAgent,
),
id="email_analysis_agent",
)
# Email assistant (same as before)
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
# New: Email summary agent for long emails
email_summary_agent = AgentExecutor(
chat_client.create_agent(
instructions="You are an assistant that helps users summarize emails.",
response_format=EmailSummaryModel,
),
id="email_summary_agent",
)
Erstellen eines Multiauswahl-Workflows
Erstellen Sie den Workflow mit anspruchsvollem Routing und paralleler Verarbeitung:
workflow = (
WorkflowBuilder()
.set_start_executor(store_email)
.add_edge(store_email, email_analysis_agent)
.add_edge(email_analysis_agent, to_analysis_result)
# Multi-selection edge group: intelligent fan-out based on content
.add_multi_selection_edge_group(
to_analysis_result,
[handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain],
selection_func=select_targets,
)
# Email assistant branch (always for NotSpam)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
# Summary branch (only for long NotSpam emails)
.add_edge(summarize_email, email_summary_agent)
.add_edge(email_summary_agent, merge_summary)
# Database persistence: conditional routing
.add_edge(to_analysis_result, database_access,
condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD) # Short emails
.add_edge(merge_summary, database_access) # Long emails with summary
.build()
)
Ausführung mit Ereignisstreaming
Führen Sie den Workflow aus, und beobachten Sie die parallele Ausführung über benutzerdefinierte Ereignisse:
# Use a moderately long email to trigger both assistant and summarizer
email = """
Hello team, here are the updates for this week:
1. Project Alpha is on track and we should have the first milestone completed by Friday.
2. The client presentation has been scheduled for next Tuesday at 2 PM.
3. Please review the Q4 budget allocation and provide feedback by Wednesday.
Let me know if you have any questions or concerns.
Best regards,
Alex
"""
# Stream events to see parallel execution
async for event in workflow.run_stream(email):
if isinstance(event, DatabaseEvent):
print(f"Database: {event}")
elif isinstance(event, WorkflowOutputEvent):
print(f"Output: {event.data}")
Mehrfachauswahl vs. Switch-Case-Vergleich
Switch-Case-Struktur (vorherig):
# One input → exactly one output
.add_switch_case_edge_group(
source,
[
Case(condition=lambda x: x.result == "A", target=handler_a),
Case(condition=lambda x: x.result == "B", target=handler_b),
Default(target=handler_c),
],
)
Mehrfachauswahlmuster:
# One input → one or more outputs (dynamic fan-out)
.add_multi_selection_edge_group(
source,
[handler_a, handler_b, handler_c, handler_d],
selection_func=intelligent_router, # Returns list of target IDs
)
Vorteile der Mehrfachauswahl in C#
- Parallele Verarbeitung: Mehrere Zweige können gleichzeitig ausgeführt werden
- Bedingter Fanout: Die Anzahl der Ziele variiert je nach Inhalt.
- Inhaltsfähiges Routing: Entscheidungen basierend auf Nachrichteneigenschaften, nicht nur Typ
- Effiziente Ressourcennutzung: Nur notwendige Verzweigungen werden aktiviert.
- Komplexe Geschäftslogik: Unterstützt anspruchsvolle Routingszenarien
C#-Real-World-Anwendungen
- E-Mail-Systeme: Weiterleitung zum Antwortassistent + Archiv + Analytics (bedingt)
- Inhaltsverarbeitung: Triggertranskription + Übersetzung + Analyse (basierend auf Dem Inhaltstyp)
- Auftragsabwicklung: Weiterleitung zur Erfüllung + Abrechnung + Benachrichtigungen (basierend auf Auftragsmerkmalen)
- Datenpipelines: Auslösen unterschiedlicher Analyseflüsse basierend auf Datenmerkmalen
Beispielcode für mehrfache Auswahl
Die vollständige Arbeitsimplementierung finden Sie im multi_selection_edge_group.py Beispiel im Agent Framework-Repository.