Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
In this tutorial, you will learn how to create a workflow with branching logic using Agent Framework. Branching logic allows your workflow to make decisions based on certain conditions, enabling more complex and dynamic behavior.
Conditional Edges
Conditional edges allow your workflow to make routing decisions based on the content or properties of messages flowing through the workflow. This enables dynamic branching where different execution paths are taken based on runtime conditions.
What You'll Build
You'll create an email processing workflow that demonstrates conditional routing:
- A spam detection agent that analyzes incoming emails and returns structured JSON.
- Conditional edges that route emails to different handlers based on classification.
- A legitimate email handler that drafts professional responses.
- A spam handler that marks suspicious emails.
- Shared state management to persist email data between workflow steps.
Concepts Covered
Prerequisites
- .NET 8.0 SDK or later.
- Azure OpenAI service endpoint and deployment configured.
- Azure CLI installed and authenticated (for Azure credential authentication).
- Basic understanding of C# and async programming.
- A new console application.
Install NuGet packages
First, install the required packages for your .NET project:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
Define Data Models
Start by defining the data structures that will flow through your workflow:
using System.Text.Json.Serialization;
/// <summary>
/// Represents the result of spam detection.
/// </summary>
public sealed class DetectionResult
{
[JsonPropertyName("is_spam")]
public bool IsSpam { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Email ID is generated by the executor, not the agent
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents an email.
/// </summary>
internal sealed class Email
{
[JsonPropertyName("email_id")]
public string EmailId { get; set; } = string.Empty;
[JsonPropertyName("email_content")]
public string EmailContent { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
public const string EmailStateScope = "EmailState";
}
Create Condition Functions
The condition function evaluates the spam detection result to determine which path the workflow should take:
/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedResult">The expected spam detection result</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(bool expectedResult) =>
detectionResult => detectionResult is DetectionResult result && result.IsSpam == expectedResult;
This condition function:
- Takes a
bool expectedResultparameter (true for spam, false for non-spam) - Returns a function that can be used as an edge condition
- Safely checks if the message is a
DetectionResultand compares theIsSpamproperty
Create AI Agents
Set up the AI agents that will handle spam detection and email assistance:
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
/// <summary>
/// Creates a spam detection agent.
/// </summary>
/// <returns>A ChatClientAgent configured for spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(DetectionResult)))
}
});
/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft professional responses to emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema(AIJsonUtilities.CreateJsonSchema(typeof(EmailResponse)))
}
});
Implement Executors
Create the workflow executors that handle different stages of email processing:
using Microsoft.Agents.AI.Workflows;
using System.Text.Json;
/// <summary>
/// Executor that detects spam using an AI agent.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
this._spamDetectionAgent = spamDetectionAgent;
}
public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content to shared state
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for spam detection
var response = await this._spamDetectionAgent.RunAsync(message);
var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
detectionResult!.EmailId = newEmail.EmailId;
return detectionResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.IsSpam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope)
?? throw new InvalidOperationException("Email not found.");
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}");
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.IsSpam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
Build the Workflow with Conditional Edges
Now create the main program that builds and executes the workflow:
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
// Create executors
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
// Build the workflow with conditional edges
var workflow = new WorkflowBuilder(spamDetectionExecutor)
// Non-spam path: route to email assistant when IsSpam = false
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
// Spam path: route to spam handler when IsSpam = true
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
.Build();
// Execute the workflow with sample spam email
string emailContent = "Congratulations! You've won $1,000,000! Click here to claim your prize now!";
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, emailContent));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent}");
}
}
}
}
How It Works
Workflow Entry: The workflow starts with
spamDetectionExecutorreceiving aChatMessage.Spam Analysis: The spam detection agent analyzes the email and returns a structured
DetectionResultwithIsSpamandReasonproperties.Conditional Routing: Based on the
IsSpamvalue:- If spam (
IsSpam = true): Routes toHandleSpamExecutorusingGetCondition(true) - If legitimate (
IsSpam = false): Routes toEmailAssistantExecutorusingGetCondition(false)
- If spam (
Response Generation: For legitimate emails, the email assistant drafts a professional response.
Final Output: The workflow yields either a spam notice or sends the drafted email response.
Key Features of Conditional Edges
Type-Safe Conditions: The
GetConditionmethod creates reusable condition functions that safely evaluate message content.Multiple Paths: A single executor can have multiple outgoing edges with different conditions, enabling complex branching logic.
Shared State: Email data persists across executors using scoped state management, allowing downstream executors to access original content.
Error Handling: Executors validate their inputs and throw meaningful exceptions when receiving unexpected message types.
Clean Architecture: Each executor has a single responsibility, making the workflow maintainable and testable.
Running the Example
When you run this workflow with the sample spam email:
Email marked as spam: This email contains common spam indicators including monetary prizes, urgency tactics, and suspicious links that are typical of phishing attempts.
Try changing the email content to something legitimate:
string emailContent = "Hi, I wanted to follow up on our meeting yesterday and get your thoughts on the project proposal.";
The workflow will route to the email assistant and generate a professional response instead.
This conditional routing pattern forms the foundation for building sophisticated workflows that can handle complex decision trees and business logic.
Complete Implementation
For the complete working implementation, see this sample in the Agent Framework repository.
What You'll Build
You'll create an email processing workflow that demonstrates conditional routing:
- A spam detection agent that analyzes incoming emails
- Conditional edges that route emails to different handlers based on classification
- A legitimate email handler that drafts professional responses
- A spam handler that marks suspicious emails
Concepts Covered
Prerequisites
- Python 3.10 or later
- Agent Framework installed:
pip install agent-framework-core - Azure OpenAI service configured with proper environment variables
- Azure CLI authentication:
az login
Step 1: Import Required Dependencies
Start by importing the necessary components for conditional workflows:
import asyncio
import os
from dataclasses import dataclass
from typing import Any, Literal
from uuid import uuid4
from typing_extensions import Never
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatMessage,
Role,
WorkflowBuilder,
WorkflowContext,
executor,
Case,
Default,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel
Step 2: Define Data Models
Create Pydantic models for structured data exchange between workflow components:
class DetectionResult(BaseModel):
"""Represents the result of spam detection."""
# is_spam drives the routing decision taken by edge conditions
is_spam: bool
# Human readable rationale from the detector
reason: str
# The agent must include the original email so downstream agents can operate without reloading content
email_content: str
class EmailResponse(BaseModel):
"""Represents the response from the email assistant."""
# The drafted reply that a user could copy or send
response: str
Step 3: Create Condition Functions
Define condition functions that will determine routing decisions:
def get_condition(expected_result: bool):
"""Create a condition callable that routes based on DetectionResult.is_spam."""
# The returned function will be used as an edge predicate.
# It receives whatever the upstream executor produced.
def condition(message: Any) -> bool:
# Defensive guard. If a non AgentExecutorResponse appears, let the edge pass to avoid dead ends.
if not isinstance(message, AgentExecutorResponse):
return True
try:
# Prefer parsing a structured DetectionResult from the agent JSON text.
# Using model_validate_json ensures type safety and raises if the shape is wrong.
detection = DetectionResult.model_validate_json(message.agent_run_response.text)
# Route only when the spam flag matches the expected path.
return detection.is_spam == expected_result
except Exception:
# Fail closed on parse errors so we do not accidentally route to the wrong path.
# Returning False prevents this edge from activating.
return False
return condition
Step 4: Create Handler Executors
Define executors to handle different routing outcomes:
@executor(id="send_email")
async def handle_email_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Handle legitimate emails by drafting a professional response."""
# Downstream of the email assistant. Parse a validated EmailResponse and yield the workflow output.
email_response = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent:\n{email_response.response}")
@executor(id="handle_spam")
async def handle_spam_classifier_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Handle spam emails by marking them appropriately."""
# Spam path. Confirm the DetectionResult and yield the workflow output. Guard against accidental non spam input.
detection = DetectionResult.model_validate_json(response.agent_run_response.text)
if detection.is_spam:
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
# This indicates the routing predicate and executor contract are out of sync.
raise RuntimeError("This executor should only handle spam messages.")
@executor(id="to_email_assistant_request")
async def to_email_assistant_request(
response: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorRequest]
) -> None:
"""Transform spam detection response into a request for the email assistant."""
# Parse the detection result and extract the email content for the assistant
detection = DetectionResult.model_validate_json(response.agent_run_response.text)
# Create a new request for the email assistant with the original email content
request = AgentExecutorRequest(
messages=[ChatMessage(Role.USER, text=detection.email_content)],
should_respond=True
)
await ctx.send_message(request)
Step 5: Create AI Agents
Set up the Azure OpenAI agents with structured output formatting:
async def main() -> None:
# Create agents
# AzureCliCredential uses your current az login. This avoids embedding secrets in code.
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Agent 1. Classifies spam and returns a DetectionResult object.
# response_format enforces that the LLM returns parsable JSON for the Pydantic model.
spam_detection_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
"Include the original email content in email_content."
),
response_format=DetectionResult,
),
id="spam_detection_agent",
)
# Agent 2. Drafts a professional reply. Also uses structured JSON output for reliability.
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft professional responses to emails. "
"Your input might be a JSON object that includes 'email_content'; base your reply on that content. "
"Return JSON with a single field 'response' containing the drafted reply."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
Step 6: Build the Conditional Workflow
Create a workflow with conditional edges that route based on spam detection results:
# Build the workflow graph.
# Start at the spam detector.
# If not spam, hop to a transformer that creates a new AgentExecutorRequest,
# then call the email assistant, then finalize.
# If spam, go directly to the spam handler and finalize.
workflow = (
WorkflowBuilder()
.set_start_executor(spam_detection_agent)
# Not spam path: transform response -> request for assistant -> assistant -> send email
.add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False))
.add_edge(to_email_assistant_request, email_assistant_agent)
.add_edge(email_assistant_agent, handle_email_response)
# Spam path: send to spam handler
.add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True))
.build()
)
Step 7: Execute the Workflow
Run the workflow with sample email content:
# Read Email content from the sample resource file.
# This keeps the sample deterministic since the model sees the same email every run.
email_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "resources", "email.txt")
with open(email_path) as email_file: # noqa: ASYNC230
email = email_file.read()
# Execute the workflow. Since the start is an AgentExecutor, pass an AgentExecutorRequest.
# The workflow completes when it becomes idle (no more work to do).
request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email)], should_respond=True)
events = await workflow.run(request)
outputs = events.get_outputs()
if outputs:
print(f"Workflow output: {outputs[0]}")
if __name__ == "__main__":
asyncio.run(main())
How Conditional Edges Work
Condition Functions: The
get_condition()function creates a predicate that examines the message content and returnsTrueorFalseto determine if the edge should be traversed.Message Inspection: Conditions can inspect any aspect of the message, including structured data from agent responses parsed with Pydantic models.
Defensive Programming: The condition function includes error handling to prevent routing failures when parsing structured data.
Dynamic Routing: Based on the spam detection result, emails are automatically routed to either the email assistant (for legitimate emails) or the spam handler (for suspicious emails).
Key Concepts
- Edge Conditions: Boolean predicates that determine whether an edge should be traversed
- Structured Outputs: Using Pydantic models with
response_formatensures reliable data parsing - Defensive Routing: Condition functions handle edge cases to prevent workflow dead-ends
- Message Transformation: Executors can transform message types between workflow steps
Complete Implementation
For the complete working implementation, see the edge_condition.py sample in the Agent Framework repository.
Switch-Case Edges
Building on Conditional Edges
The previous conditional edges example demonstrated two-way routing (spam vs. legitimate emails). However, many real-world scenarios require more sophisticated decision trees. Switch-case edges provide a cleaner, more maintainable solution when you need to route to multiple destinations based on different conditions.
What You'll Build with Switch-Case
You'll extend the email processing workflow to handle three decision paths:
- NotSpam → Email Assistant → Send Email
- Spam → Handle Spam Executor
- Uncertain → Handle Uncertain Executor (default case)
The key improvement is using the SwitchBuilder pattern instead of multiple individual conditional edges, making the workflow easier to understand and maintain as decision complexity grows.
Concepts Covered
Data Models for Switch-Case
Update your data models to support the three-way classification:
/// <summary>
/// Represents the possible decisions for spam detection.
/// </summary>
public enum SpamDecision
{
NotSpam,
Spam,
Uncertain
}
/// <summary>
/// Represents the result of spam detection with enhanced decision support.
/// </summary>
public sealed class DetectionResult
{
[JsonPropertyName("spam_decision")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public SpamDecision spamDecision { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Email ID is generated by the executor, not the agent
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents an email stored in shared state.
/// </summary>
internal sealed class Email
{
[JsonPropertyName("email_id")]
public string EmailId { get; set; } = string.Empty;
[JsonPropertyName("email_content")]
public string EmailContent { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class EmailStateConstants
{
public const string EmailStateScope = "EmailState";
}
Condition Factory for Switch-Case
Create a reusable condition factory that generates predicates for each spam decision:
/// <summary>
/// Creates a condition for routing messages based on the expected spam detection result.
/// </summary>
/// <param name="expectedDecision">The expected spam detection decision</param>
/// <returns>A function that evaluates whether a message meets the expected result</returns>
private static Func<object?, bool> GetCondition(SpamDecision expectedDecision) =>
detectionResult => detectionResult is DetectionResult result && result.spamDecision == expectedDecision;
This factory approach:
- Reduces Code Duplication: One function generates all condition predicates
- Ensures Consistency: All conditions follow the same pattern
- Simplifies Maintenance: Changes to condition logic happen in one place
Enhanced AI Agent
Update the spam detection agent to be less confident and return three-way classifications:
/// <summary>
/// Creates a spam detection agent with enhanced uncertainty handling.
/// </summary>
/// <returns>A ChatClientAgent configured for three-way spam detection</returns>
private static ChatClientAgent GetSpamDetectionAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails. Be less confident in your assessments.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
}
});
/// <summary>
/// Creates an email assistant agent (unchanged from conditional edges example).
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
});
Workflow Executors with Enhanced Routing
Implement executors that handle the three-way routing with shared state management:
/// <summary>
/// Executor that detects spam using an AI agent with three-way classification.
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
this._spamDetectionAgent = spamDetectionAgent;
}
public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content in shared state
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for enhanced spam detection
var response = await this._spamDetectionAgent.RunAsync(message);
var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
detectionResult!.EmailId = newEmail.EmailId;
return detectionResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}").ConfigureAwait(false);
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<DetectionResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}").ConfigureAwait(false);
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
/// <summary>
/// Executor that handles uncertain emails requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<DetectionResult>
{
public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Uncertain)
{
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
}
else
{
throw new ArgumentException("This executor should only handle uncertain spam decisions.");
}
}
}
Build Workflow with Switch-Case Pattern
Replace multiple conditional edges with the cleaner switch-case pattern:
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent spamDetectionAgent = GetSpamDetectionAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
// Create executors
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();
// Build the workflow using switch-case for cleaner three-way routing
WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(
GetCondition(expectedDecision: SpamDecision.NotSpam),
emailAssistantExecutor
)
.AddCase(
GetCondition(expectedDecision: SpamDecision.Spam),
handleSpamExecutor
)
.WithDefault(
handleUncertainExecutor
)
)
// After the email assistant writes a response, it will be sent to the send email executor
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor);
var workflow = builder.Build();
// Read an email from a text file (use ambiguous content for demonstration)
string email = Resources.Read("ambiguous_email.txt");
// Execute the workflow
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent}");
}
}
}
}
Switch-Case Benefits
- Cleaner Syntax: The
SwitchBuilderprovides a more readable alternative to multiple conditional edges - Ordered Evaluation: Cases are evaluated sequentially, stopping at the first match
- Guaranteed Routing: The
WithDefault()method ensures messages never get stuck - Better Maintainability: Adding new cases requires minimal changes to the workflow structure
- Type Safety: Each executor validates its input to catch routing errors early
Pattern Comparison
Before (Conditional Edges):
var workflow = new WorkflowBuilder(spamDetectionExecutor)
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: GetCondition(expectedResult: false))
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: GetCondition(expectedResult: true))
// No clean way to handle a third case
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
.Build();
After (Switch-Case):
WorkflowBuilder builder = new(spamDetectionExecutor);
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
.AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
.WithDefault(handleUncertainExecutor) // Clean default case
)
// Continue building the rest of the workflow
The switch-case pattern scales much better as the number of routing decisions grows, and the default case provides a safety net for unexpected values.
Running the Example
When you run this workflow with ambiguous email content:
Email marked as uncertain: This email contains promotional language but might be from a legitimate business contact, requiring human review for proper classification.
Try changing the email content to something clearly spam or clearly legitimate to see the different routing paths in action.
Complete Implementation
For the complete working implementation, see this sample in the Agent Framework repository.
Building on Conditional Edges
The previous conditional edges example demonstrated two-way routing (spam vs. legitimate emails). However, many real-world scenarios require more sophisticated decision trees. Switch-case edges provide a cleaner, more maintainable solution when you need to route to multiple destinations based on different conditions.
What You'll Build Next
You'll extend the email processing workflow to handle three decision paths:
- NotSpam → Email Assistant → Send Email
- Spam → Mark as Spam
- Uncertain → Flag for Manual Review (default case)
The key improvement is using a single switch-case edge group instead of multiple individual conditional edges, making the workflow easier to understand and maintain as decision complexity grows.
Concepts Covered
Enhanced Data Models
Update your data models to support the three-way classification:
from typing import Literal
class DetectionResultAgent(BaseModel):
"""Structured output returned by the spam detection agent."""
# The agent classifies the email into one of three categories
spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
reason: str
class EmailResponse(BaseModel):
"""Structured output returned by the email assistant agent."""
response: str
@dataclass
class DetectionResult:
"""Internal typed payload used for routing and downstream handling."""
spam_decision: str
reason: str
email_id: str
@dataclass
class Email:
"""In memory record of the email content stored in shared state."""
email_id: str
email_content: str
Switch-Case Condition Factory
Create a reusable condition factory that generates predicates for each spam decision:
def get_case(expected_decision: str):
"""Factory that returns a predicate matching a specific spam_decision value."""
def condition(message: Any) -> bool:
# Only match when the upstream payload is a DetectionResult with the expected decision
return isinstance(message, DetectionResult) and message.spam_decision == expected_decision
return condition
This factory approach:
- Reduces Code Duplication: One function generates all condition predicates
- Ensures Consistency: All conditions follow the same pattern
- Simplifies Maintenance: Changes to condition logic happen in one place
Workflow Executors with Shared State
Implement executors that use shared state to avoid passing large email content through every workflow step:
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Store email content once and pass around a lightweight ID reference."""
# Persist the raw email content in shared state
new_email = Email(email_id=str(uuid4()), email_content=email_text)
await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
# Forward email to spam detection agent
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
)
@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
"""Transform agent response into a typed DetectionResult with email ID."""
# Parse the agent's structured JSON output
parsed = DetectionResultAgent.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
# Create typed message for switch-case routing
await ctx.send_message(DetectionResult(
spam_decision=parsed.spam_decision,
reason=parsed.reason,
email_id=email_id
))
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Handle NotSpam emails by forwarding to the email assistant."""
# Guard against misrouting
if detection.spam_decision != "NotSpam":
raise RuntimeError("This executor should only handle NotSpam messages.")
# Retrieve original email content from shared state
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Parse email assistant response and yield final output."""
parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle confirmed spam emails."""
if detection.spam_decision == "Spam":
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
raise RuntimeError("This executor should only handle Spam messages.")
@executor(id="handle_uncertain")
async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle uncertain classifications that need manual review."""
if detection.spam_decision == "Uncertain":
# Include original content for human review
email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.yield_output(
f"Email marked as uncertain: {detection.reason}. Email content: {getattr(email, 'email_content', '')}"
)
else:
raise RuntimeError("This executor should only handle Uncertain messages.")
Create Enhanced AI Agent
Update the spam detection agent to be less confident and return three-way classifications:
async def main():
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Enhanced spam detection agent with three-way classification
spam_detection_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Be less confident in your assessments. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=DetectionResultAgent,
),
id="spam_detection_agent",
)
# Email assistant remains the same
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
Build Workflow with Switch-Case Edge Group
Replace multiple conditional edges with a single switch-case group:
# Build workflow using switch-case for cleaner three-way routing
workflow = (
WorkflowBuilder()
.set_start_executor(store_email)
.add_edge(store_email, spam_detection_agent)
.add_edge(spam_detection_agent, to_detection_result)
.add_switch_case_edge_group(
to_detection_result,
[
# Explicit cases for specific decisions
Case(condition=get_case("NotSpam"), target=submit_to_email_assistant),
Case(condition=get_case("Spam"), target=handle_spam),
# Default case catches anything that doesn't match above
Default(target=handle_uncertain),
],
)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
.build()
)
Execute and Test
Run the workflow with ambiguous email content that demonstrates the three-way routing:
# Use ambiguous email content that might trigger uncertain classification
email = (
"Hey there, I noticed you might be interested in our latest offer—no pressure, but it expires soon. "
"Let me know if you'd like more details."
)
# Execute and display results
events = await workflow.run(email)
outputs = events.get_outputs()
if outputs:
for output in outputs:
print(f"Workflow output: {output}")
Key Advantages of Switch-Case Edges
- Cleaner Syntax: One edge group instead of multiple conditional edges
- Ordered Evaluation: Cases are evaluated sequentially, stopping at the first match
- Guaranteed Routing: The default case ensures messages never get stuck
- Better Maintainability: Adding new cases requires minimal changes
- Type Safety: Each executor validates its input to catch routing errors
Comparison: Conditional vs. Switch-Case
Before (Conditional Edges):
.add_edge(detector, handler_a, condition=lambda x: x.result == "A")
.add_edge(detector, handler_b, condition=lambda x: x.result == "B")
.add_edge(detector, handler_c, condition=lambda x: x.result == "C")
After (Switch-Case):
.add_switch_case_edge_group(
detector,
[
Case(condition=lambda x: x.result == "A", target=handler_a),
Case(condition=lambda x: x.result == "B", target=handler_b),
Default(target=handler_c), # Catches everything else
],
)
The switch-case pattern scales much better as the number of routing decisions grows, and the default case provides a safety net for unexpected values.
Switch-Case Sample Code
For the complete working implementation, see the switch_case_edge_group.py sample in the Agent Framework repository.
Multi-Selection Edges
Beyond Switch-Case: Multi-Selection Routing
While switch-case edges route messages to exactly one destination, real-world workflows often need to trigger multiple parallel operations based on data characteristics. Partitioned edges (implemented as fan-out edges with partitioners) enable sophisticated fan-out patterns where a single message can activate multiple downstream executors simultaneously.
Advanced Email Processing Workflow
Building on the switch-case example, you'll create an enhanced email processing system that demonstrates sophisticated routing logic:
- Spam emails → Single spam handler (like switch-case)
- Legitimate emails → Always trigger email assistant + Conditionally trigger summarizer for long emails
- Uncertain emails → Single uncertain handler (like switch-case)
- Database persistence → Triggered for both short emails and summarized long emails
This pattern enables parallel processing pipelines that adapt to content characteristics.
Concepts Covered
Data Models for Multi-Selection
Extend the data models to support email length analysis and summarization:
/// <summary>
/// Represents the result of enhanced email analysis with additional metadata.
/// </summary>
public sealed class AnalysisResult
{
[JsonPropertyName("spam_decision")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public SpamDecision spamDecision { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
// Additional properties for sophisticated routing
[JsonIgnore]
public int EmailLength { get; set; }
[JsonIgnore]
public string EmailSummary { get; set; } = string.Empty;
[JsonIgnore]
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email assistant.
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
/// <summary>
/// Represents the response from the email summary agent.
/// </summary>
public sealed class EmailSummary
{
[JsonPropertyName("summary")]
public string Summary { get; set; } = string.Empty;
}
/// <summary>
/// A custom workflow event for database operations.
/// </summary>
internal sealed class DatabaseEvent(string message) : WorkflowEvent(message) { }
/// <summary>
/// Constants for email processing thresholds.
/// </summary>
public static class EmailProcessingConstants
{
public const int LongEmailThreshold = 100;
}
Target Assigner Function: The Heart of Multi-Selection
The target assigner function determines which executors should receive each message:
/// <summary>
/// Creates a target assigner for routing messages based on the analysis result.
/// </summary>
/// <returns>A function that takes an analysis result and returns the target partitions.</returns>
private static Func<AnalysisResult?, int, IEnumerable<int>> GetTargetAssigner()
{
return (analysisResult, targetCount) =>
{
if (analysisResult is not null)
{
if (analysisResult.spamDecision == SpamDecision.Spam)
{
return [0]; // Route only to spam handler (index 0)
}
else if (analysisResult.spamDecision == SpamDecision.NotSpam)
{
// Always route to email assistant (index 1)
List<int> targets = [1];
// Conditionally add summarizer for long emails (index 2)
if (analysisResult.EmailLength > EmailProcessingConstants.LongEmailThreshold)
{
targets.Add(2);
}
return targets;
}
else // Uncertain
{
return [3]; // Route only to uncertain handler (index 3)
}
}
throw new ArgumentException("Invalid analysis result.");
};
}
Key Features of the Target Assigner Function
- Dynamic Target Selection: Returns a list of executor indices to activate
- Content-Aware Routing: Makes decisions based on message properties like email length
- Parallel Processing: Multiple targets can execute simultaneously
- Conditional Logic: Complex branching based on multiple criteria
Enhanced Workflow Executors
Implement executors that handle the advanced analysis and routing:
/// <summary>
/// Executor that analyzes emails using an AI agent with enhanced analysis.
/// </summary>
internal sealed class EmailAnalysisExecutor : Executor<ChatMessage, AnalysisResult>
{
private readonly AIAgent _emailAnalysisAgent;
public EmailAnalysisExecutor(AIAgent emailAnalysisAgent) : base("EmailAnalysisExecutor")
{
this._emailAnalysisAgent = emailAnalysisAgent;
}
public override async ValueTask<AnalysisResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Generate a random email ID and store the email content
var newEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(newEmail.EmailId, newEmail, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent for enhanced analysis
var response = await this._emailAnalysisAgent.RunAsync(message);
var analysisResult = JsonSerializer.Deserialize<AnalysisResult>(response.Text);
// Enrich with metadata for routing decisions
analysisResult!.EmailId = newEmail.EmailId;
analysisResult.EmailLength = newEmail.EmailContent.Length;
return analysisResult;
}
}
/// <summary>
/// Executor that assists with email responses using an AI agent.
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<AnalysisResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
this._emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
throw new ArgumentException("This executor should only handle non-spam messages.");
}
// Retrieve the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Invoke the agent to draft a response
var response = await this._emailAssistantAgent.RunAsync(email!.EmailContent);
var emailResponse = JsonSerializer.Deserialize<EmailResponse>(response.Text);
return emailResponse!;
}
}
/// <summary>
/// Executor that summarizes emails using an AI agent for long emails.
/// </summary>
internal sealed class EmailSummaryExecutor : Executor<AnalysisResult, AnalysisResult>
{
private readonly AIAgent _emailSummaryAgent;
public EmailSummaryExecutor(AIAgent emailSummaryAgent) : base("EmailSummaryExecutor")
{
this._emailSummaryAgent = emailSummaryAgent;
}
public override async ValueTask<AnalysisResult> HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Read the email content from shared state
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
// Generate summary for long emails
var response = await this._emailSummaryAgent.RunAsync(email!.EmailContent);
var emailSummary = JsonSerializer.Deserialize<EmailSummary>(response.Text);
// Enrich the analysis result with the summary
message.EmailSummary = emailSummary!.Summary;
return message;
}
}
/// <summary>
/// Executor that sends emails.
/// </summary>
internal sealed class SendEmailExecutor : Executor<EmailResponse>
{
public SendEmailExecutor() : base("SendEmailExecutor") { }
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
await context.YieldOutputAsync($"Email sent: {message.Response}");
}
/// <summary>
/// Executor that handles spam messages.
/// </summary>
internal sealed class HandleSpamExecutor : Executor<AnalysisResult>
{
public HandleSpamExecutor() : base("HandleSpamExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Spam)
{
await context.YieldOutputAsync($"Email marked as spam: {message.Reason}");
}
else
{
throw new ArgumentException("This executor should only handle spam messages.");
}
}
}
/// <summary>
/// Executor that handles uncertain messages requiring manual review.
/// </summary>
internal sealed class HandleUncertainExecutor : Executor<AnalysisResult>
{
public HandleUncertainExecutor() : base("HandleUncertainExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.spamDecision == SpamDecision.Uncertain)
{
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await context.YieldOutputAsync($"Email marked as uncertain: {message.Reason}. Email content: {email?.EmailContent}");
}
else
{
throw new ArgumentException("This executor should only handle uncertain spam decisions.");
}
}
}
/// <summary>
/// Executor that handles database access with custom events.
/// </summary>
internal sealed class DatabaseAccessExecutor : Executor<AnalysisResult>
{
public DatabaseAccessExecutor() : base("DatabaseAccessExecutor") { }
public override async ValueTask HandleAsync(AnalysisResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Simulate database operations
await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope);
await Task.Delay(100); // Simulate database access delay
// Emit custom database event for monitoring
await context.AddEventAsync(new DatabaseEvent($"Email {message.EmailId} saved to database."));
}
}
Enhanced AI Agents
Create agents for analysis, assistance, and summarization:
/// <summary>
/// Create an enhanced email analysis agent.
/// </summary>
/// <returns>A ChatClientAgent configured for comprehensive email analysis</returns>
private static ChatClientAgent GetEmailAnalysisAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are a spam detection assistant that identifies spam emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<AnalysisResult>()
}
});
/// <summary>
/// Creates an email assistant agent.
/// </summary>
/// <returns>A ChatClientAgent configured for email assistance</returns>
private static ChatClientAgent GetEmailAssistantAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an email assistant that helps users draft responses to emails with professionalism.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
});
/// <summary>
/// Creates an agent that summarizes emails.
/// </summary>
/// <returns>A ChatClientAgent configured for email summarization</returns>
private static ChatClientAgent GetEmailSummaryAgent(IChatClient chatClient) =>
new(chatClient, new ChatClientAgentOptions(instructions: "You are an assistant that helps users summarize emails.")
{
ChatOptions = new()
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailSummary>()
}
});
Multi-Selection Workflow Construction
Construct the workflow with sophisticated routing and parallel processing:
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()).GetChatClient(deploymentName).AsIChatClient();
// Create agents
AIAgent emailAnalysisAgent = GetEmailAnalysisAgent(chatClient);
AIAgent emailAssistantAgent = GetEmailAssistantAgent(chatClient);
AIAgent emailSummaryAgent = GetEmailSummaryAgent(chatClient);
// Create executors
var emailAnalysisExecutor = new EmailAnalysisExecutor(emailAnalysisAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var emailSummaryExecutor = new EmailSummaryExecutor(emailSummaryAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();
var databaseAccessExecutor = new DatabaseAccessExecutor();
// Build the workflow with multi-selection fan-out
WorkflowBuilder builder = new(emailAnalysisExecutor);
builder.AddFanOutEdge(
emailAnalysisExecutor,
targets: [
handleSpamExecutor, // Index 0: Spam handler
emailAssistantExecutor, // Index 1: Email assistant (always for NotSpam)
emailSummaryExecutor, // Index 2: Summarizer (conditionally for long NotSpam)
handleUncertainExecutor, // Index 3: Uncertain handler
],
targetSelector: GetTargetAssigner()
)
// Email assistant branch
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
// Database persistence: conditional routing
.AddEdge<AnalysisResult>(
emailAnalysisExecutor,
databaseAccessExecutor,
condition: analysisResult => analysisResult?.EmailLength <= EmailProcessingConstants.LongEmailThreshold) // Short emails
.AddEdge(emailSummaryExecutor, databaseAccessExecutor) // Long emails with summary
.WithOutputFrom(handleUncertainExecutor, handleSpamExecutor, sendEmailExecutor);
var workflow = builder.Build();
// Read a moderately long email to trigger both assistant and summarizer
string email = Resources.Read("email.txt");
// Execute the workflow with custom event handling
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, email));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"Output: {outputEvent}");
}
if (evt is DatabaseEvent databaseEvent)
{
Console.WriteLine($"Database: {databaseEvent}");
}
}
}
}
Pattern Comparison: Multi-Selection vs. Switch-Case
Switch-Case Pattern (Previous):
// One input → exactly one output
builder.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
.AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
.WithDefault(handleUncertainExecutor)
)
Multi-Selection Pattern:
// One input → one or more outputs (dynamic fan-out)
builder.AddFanOutEdge(
emailAnalysisExecutor,
targets: [handleSpamExecutor, emailAssistantExecutor, emailSummaryExecutor, handleUncertainExecutor],
targetSelector: GetTargetAssigner() // Returns list of target indices
)
Key Advantages of Multi-Selection Edges
- Parallel Processing: Multiple branches can execute simultaneously
- Conditional Fan-out: Number of targets varies based on content
- Content-Aware Routing: Decisions based on message properties, not just type
- Efficient Resource Usage: Only necessary branches are activated
- Complex Business Logic: Supports sophisticated routing scenarios
Running the Multi-Selection Example
When you run this workflow with a long email:
Output: Email sent: [Professional response generated by AI]
Database: Email abc123 saved to database.
When you run with a short email, the summarizer is skipped:
Output: Email sent: [Professional response generated by AI]
Database: Email def456 saved to database.
Real-World Use Cases
- Email Systems: Route to reply assistant + archive + analytics (conditionally)
- Content Processing: Trigger transcription + translation + analysis (based on content type)
- Order Processing: Route to fulfillment + billing + notifications (based on order properties)
- Data Pipelines: Trigger different analytics flows based on data characteristics
Multi-Selection Complete Implementation
For the complete working implementation, see this sample in the Agent Framework repository.
Beyond Switch-Case: Multi-Selection Routing
While switch-case edges route messages to exactly one destination, real-world workflows often need to trigger multiple parallel operations based on data characteristics. Partitioned edges (implemented as multi-selection edge groups) enable sophisticated fan-out patterns where a single message can activate multiple downstream executors simultaneously.
Advanced Email Processing Workflow
Building on the switch-case example, you'll create an enhanced email processing system that demonstrates sophisticated routing logic:
- Spam emails → Single spam handler (like switch-case)
- Legitimate emails → Always trigger email assistant + Conditionally trigger summarizer for long emails
- Uncertain emails → Single uncertain handler (like switch-case)
- Database persistence → Triggered for both short emails and summarized long emails
This pattern enables parallel processing pipelines that adapt to content characteristics.
Concepts Covered
Enhanced Data Models for Multi-Selection
Extend the data models to support email length analysis and summarization:
class AnalysisResultAgent(BaseModel):
"""Enhanced structured output from email analysis agent."""
spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
reason: str
class EmailResponse(BaseModel):
"""Response from email assistant."""
response: str
class EmailSummaryModel(BaseModel):
"""Summary generated by email summary agent."""
summary: str
@dataclass
class AnalysisResult:
"""Internal analysis result with email metadata for routing decisions."""
spam_decision: str
reason: str
email_length: int # Used for conditional routing
email_summary: str # Populated by summary agent
email_id: str
@dataclass
class Email:
"""Email content stored in shared state."""
email_id: str
email_content: str
# Custom event for database operations
class DatabaseEvent(WorkflowEvent):
"""Custom event for tracking database operations."""
pass
Selection Function: The Heart of Multi-Selection
The selection function determines which executors should receive each message:
LONG_EMAIL_THRESHOLD = 100
def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str]:
"""Intelligent routing based on spam decision and email characteristics."""
# Target order: [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain]
handle_spam_id, submit_to_email_assistant_id, summarize_email_id, handle_uncertain_id = target_ids
if analysis.spam_decision == "Spam":
# Route only to spam handler
return [handle_spam_id]
elif analysis.spam_decision == "NotSpam":
# Always route to email assistant
targets = [submit_to_email_assistant_id]
# Conditionally add summarizer for long emails
if analysis.email_length > LONG_EMAIL_THRESHOLD:
targets.append(summarize_email_id)
return targets
else: # Uncertain
# Route only to uncertain handler
return [handle_uncertain_id]
Key Features of Selection Functions
- Dynamic Target Selection: Returns a list of executor IDs to activate
- Content-Aware Routing: Makes decisions based on message properties
- Parallel Processing: Multiple targets can execute simultaneously
- Conditional Logic: Complex branching based on multiple criteria
Multi-Selection Workflow Executors
Implement executors that handle the enhanced analysis and routing:
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Store email and initiate analysis."""
new_email = Email(email_id=str(uuid4()), email_content=email_text)
await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
)
@executor(id="to_analysis_result")
async def to_analysis_result(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
"""Transform agent response into enriched analysis result."""
parsed = AnalysisResultAgent.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")
# Create enriched analysis result with email length for routing decisions
await ctx.send_message(
AnalysisResult(
spam_decision=parsed.spam_decision,
reason=parsed.reason,
email_length=len(email.email_content), # Key for conditional routing
email_summary="",
email_id=email_id,
)
)
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Handle legitimate emails by forwarding to email assistant."""
if analysis.spam_decision != "NotSpam":
raise RuntimeError("This executor should only handle NotSpam messages.")
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Final step for email assistant branch."""
parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="summarize_email")
async def summarize_email(analysis: AnalysisResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Generate summary for long emails (parallel branch)."""
# Only called for long NotSpam emails by selection function
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
)
@executor(id="merge_summary")
async def merge_summary(response: AgentExecutorResponse, ctx: WorkflowContext[AnalysisResult]) -> None:
"""Merge summary back into analysis result for database persistence."""
summary = EmailSummaryModel.model_validate_json(response.agent_run_response.text)
email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{email_id}")
# Create analysis result with summary for database storage
await ctx.send_message(
AnalysisResult(
spam_decision="NotSpam",
reason="",
email_length=len(email.email_content),
email_summary=summary.summary, # Now includes summary
email_id=email_id,
)
)
@executor(id="handle_spam")
async def handle_spam(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle spam emails (single target like switch-case)."""
if analysis.spam_decision == "Spam":
await ctx.yield_output(f"Email marked as spam: {analysis.reason}")
else:
raise RuntimeError("This executor should only handle Spam messages.")
@executor(id="handle_uncertain")
async def handle_uncertain(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Handle uncertain emails (single target like switch-case)."""
if analysis.spam_decision == "Uncertain":
email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{analysis.email_id}")
await ctx.yield_output(
f"Email marked as uncertain: {analysis.reason}. Email content: {getattr(email, 'email_content', '')}"
)
else:
raise RuntimeError("This executor should only handle Uncertain messages.")
@executor(id="database_access")
async def database_access(analysis: AnalysisResult, ctx: WorkflowContext[Never, str]) -> None:
"""Simulate database persistence with custom events."""
await asyncio.sleep(0.05) # Simulate DB operation
await ctx.add_event(DatabaseEvent(f"Email {analysis.email_id} saved to database."))
Enhanced AI Agents
Create agents for analysis, assistance, and summarization:
async def main() -> None:
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Enhanced analysis agent
email_analysis_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
"and 'reason' (string)."
),
response_format=AnalysisResultAgent,
),
id="email_analysis_agent",
)
# Email assistant (same as before)
email_assistant_agent = AgentExecutor(
chat_client.create_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism."
),
response_format=EmailResponse,
),
id="email_assistant_agent",
)
# New: Email summary agent for long emails
email_summary_agent = AgentExecutor(
chat_client.create_agent(
instructions="You are an assistant that helps users summarize emails.",
response_format=EmailSummaryModel,
),
id="email_summary_agent",
)
Build Multi-Selection Workflow
Construct the workflow with sophisticated routing and parallel processing:
workflow = (
WorkflowBuilder()
.set_start_executor(store_email)
.add_edge(store_email, email_analysis_agent)
.add_edge(email_analysis_agent, to_analysis_result)
# Multi-selection edge group: intelligent fan-out based on content
.add_multi_selection_edge_group(
to_analysis_result,
[handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain],
selection_func=select_targets,
)
# Email assistant branch (always for NotSpam)
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
# Summary branch (only for long NotSpam emails)
.add_edge(summarize_email, email_summary_agent)
.add_edge(email_summary_agent, merge_summary)
# Database persistence: conditional routing
.add_edge(to_analysis_result, database_access,
condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD) # Short emails
.add_edge(merge_summary, database_access) # Long emails with summary
.build()
)
Execution with Event Streaming
Run the workflow and observe parallel execution through custom events:
# Use a moderately long email to trigger both assistant and summarizer
email = """
Hello team, here are the updates for this week:
1. Project Alpha is on track and we should have the first milestone completed by Friday.
2. The client presentation has been scheduled for next Tuesday at 2 PM.
3. Please review the Q4 budget allocation and provide feedback by Wednesday.
Let me know if you have any questions or concerns.
Best regards,
Alex
"""
# Stream events to see parallel execution
async for event in workflow.run_stream(email):
if isinstance(event, DatabaseEvent):
print(f"Database: {event}")
elif isinstance(event, WorkflowOutputEvent):
print(f"Output: {event.data}")
Multi-Selection vs. Switch-Case Comparison
Switch-Case Pattern (Previous):
# One input → exactly one output
.add_switch_case_edge_group(
source,
[
Case(condition=lambda x: x.result == "A", target=handler_a),
Case(condition=lambda x: x.result == "B", target=handler_b),
Default(target=handler_c),
],
)
Multi-Selection Pattern:
# One input → one or more outputs (dynamic fan-out)
.add_multi_selection_edge_group(
source,
[handler_a, handler_b, handler_c, handler_d],
selection_func=intelligent_router, # Returns list of target IDs
)
C# Multi-Selection Benefits
- Parallel Processing: Multiple branches can execute simultaneously
- Conditional Fan-out: Number of targets varies based on content
- Content-Aware Routing: Decisions based on message properties, not just type
- Efficient Resource Usage: Only necessary branches are activated
- Complex Business Logic: Supports sophisticated routing scenarios
C# Real-World Applications
- Email Systems: Route to reply assistant + archive + analytics (conditionally)
- Content Processing: Trigger transcription + translation + analysis (based on content type)
- Order Processing: Route to fulfillment + billing + notifications (based on order properties)
- Data Pipelines: Trigger different analytics flows based on data characteristics
Multi-Selection Sample Code
For the complete working implementation, see the multi_selection_edge_group.py sample in the Agent Framework repository.