Microsoft代理框架工作流编排 - 移交

交接协调允许代理根据上下文或用户请求将控制权转移给另一个代理。 每个代理都可以通过适当的专业知识将对话“移交”到另一个代理,确保正确的代理处理任务的每个部分。 这在客户支持、专家系统或任何需要动态委派的方案中特别有用。

交接业务流程

Handoff 与工具化代理之间的差异

虽然代理作为工具通常被视为多代理模式,乍一看可能与转换类似,但两者之间存在根本差异:

  • 控制流:在移交业务流程中,根据定义的规则在代理之间显式传递控制。 每个代理都可以决定将整个任务移交给另一个代理。 没有中央权威管理工作流。 相比之下,代理即工具涉及将子任务委托给其他代理的主代理,一旦代理完成子任务,控制就会返回到主代理。
  • 任务所有权:在交接中,接收移交的代理拥有任务的完全所有权。 在代理即工具中,主代理将保留任务的总体责任,而其他代理则被视为辅助特定子任务的工具。
  • 上下文管理:在交接业务流程中,对话完全移交给另一个代理。 接收代理对到目前为止已完成的工作有完整的了解。 在代理即工具中,主代理管理整个上下文,并且可能仅根据需要向工具代理提供相关信息。

学习内容

  • 如何为不同的域创建专用代理
  • 如何配置代理之间的移交规则
  • 如何使用动态代理路由生成交互式工作流
  • 如何使用代理切换处理多轮次对话
  • 如何为敏感操作(HITL)实施工具审批
  • 如何对持久化交接工作流使用检查点机制

在交接业务流程中,代理可以基于上下文将控制权传输到彼此,从而允许动态路由和专业知识处理。

设置 Azure OpenAI 客户端

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
    .GetChatClient(deploymentName)
    .AsIChatClient();

定义专用代理

创建领域专用代理和用于路由的分诊代理:

// 2) Create specialized agents
ChatClientAgent historyTutor = new(client,
    "You provide assistance with historical queries. Explain important events and context clearly. Only respond about history.",
    "history_tutor",
    "Specialist agent for historical questions");

ChatClientAgent mathTutor = new(client,
    "You provide help with math problems. Explain your reasoning at each step and include examples. Only respond about math.",
    "math_tutor",
    "Specialist agent for math questions");

ChatClientAgent triageAgent = new(client,
    "You determine which agent to use based on the user's homework question. ALWAYS handoff to another agent.",
    "triage_agent",
    "Routes messages to the appropriate specialist agent");

配置交接规则

定义哪些代理可以移交给其他代理:

// 3) Build handoff workflow with routing rules
var workflow = AgentWorkflowBuilder.StartHandoffWith(triageAgent)
    .WithHandoffs(triageAgent, [mathTutor, historyTutor]) // Triage can route to either specialist
    .WithHandoff(mathTutor, triageAgent)                  // Math tutor can return to triage
    .WithHandoff(historyTutor, triageAgent)               // History tutor can return to triage
    .Build();

运行交互式移交工作流

使用动态代理切换处理多轮次对话:

// 4) Process multi-turn conversations
List<ChatMessage> messages = new();

while (true)
{
    Console.Write("Q: ");
    string userInput = Console.ReadLine()!;
    messages.Add(new(ChatRole.User, userInput));

    // Execute workflow and process events
    StreamingRun run = await InProcessExecution.StreamAsync(workflow, messages);
    await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

    List<ChatMessage> newMessages = new();
    await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
    {
        if (evt is AgentRunUpdateEvent e)
        {
            Console.WriteLine($"{e.ExecutorId}: {e.Data}");
        }
        else if (evt is WorkflowOutputEvent outputEvt)
        {
            newMessages = (List<ChatMessage>)outputEvt.Data!;
            break;
        }
    }

    // Add new messages to conversation history
    messages.AddRange(newMessages.Skip(messages.Count));
}

示例交互

Q: What is the derivative of x^2?
triage_agent: This is a math question. I'll hand this off to the math tutor.
math_tutor: The derivative of x^2 is 2x. Using the power rule, we bring down the exponent (2) and multiply it by the coefficient (1), then reduce the exponent by 1: d/dx(x^2) = 2x^(2-1) = 2x.

Q: Tell me about World War 2
triage_agent: This is a history question. I'll hand this off to the history tutor.
history_tutor: World War 2 was a global conflict from 1939 to 1945. It began when Germany invaded Poland and involved most of the world's nations. Key events included the Holocaust, Pearl Harbor attack, D-Day invasion, and ended with atomic bombs on Japan.

Q: Can you help me with calculus integration?
triage_agent: This is another math question. I'll route this to the math tutor.
math_tutor: I'd be happy to help with calculus integration! Integration is the reverse of differentiation. The basic power rule for integration is: ∫x^n dx = x^(n+1)/(n+1) + C, where C is the constant of integration.

设置聊天客户端

from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

# Initialize the Azure OpenAI chat client
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

定义专用代理

使用协调器来为路由创建特定于域的代理:

# Create triage/coordinator agent
triage_agent = chat_client.create_agent(
    instructions=(
        "You are frontline support triage. Read the latest user message and decide whether "
        "to hand off to refund_agent, order_agent, or support_agent. Provide a brief natural-language "
        "response for the user. When delegation is required, call the matching handoff tool "
        "(`handoff_to_refund_agent`, `handoff_to_order_agent`, or `handoff_to_support_agent`)."
    ),
    name="triage_agent",
)

# Create specialist agents
refund_agent = chat_client.create_agent(
    instructions=(
        "You handle refund workflows. Ask for any order identifiers you require and outline the refund steps."
    ),
    name="refund_agent",
)

order_agent = chat_client.create_agent(
    instructions=(
        "You resolve shipping and fulfillment issues. Clarify the delivery problem and describe the actions "
        "you will take to remedy it."
    ),
    name="order_agent",
)

support_agent = chat_client.create_agent(
    instructions=(
        "You are a general support agent. Offer empathetic troubleshooting and gather missing details if the "
        "issue does not match other specialists."
    ),
    name="support_agent",
)

配置交接规则

使用 HandoffBuilder 构建移交工作流:

from agent_framework import HandoffBuilder

# Build the handoff workflow
workflow = (
    HandoffBuilder(
        name="customer_support_handoff",
        participants=[triage_agent, refund_agent, order_agent, support_agent],
    )
    .set_coordinator("triage_agent")
    .with_termination_condition(
        # Terminate after a certain number of user messages
        lambda conv: sum(1 for msg in conv if msg.role.value == "user") >= 10
    )
    .build()
)

对于更高级的路由,可以配置专家到专家的交接:

# Enable return-to-previous and add specialist-to-specialist handoffs
workflow = (
    HandoffBuilder(
        name="advanced_handoff",
        participants=[coordinator, technical, account, billing],
    )
    .set_coordinator(coordinator)
    .add_handoff(coordinator, [technical, account, billing])  # Coordinator routes to all specialists
    .add_handoff(technical, [billing, account])  # Technical can route to billing or account
    .add_handoff(account, [technical, billing])  # Account can route to technical or billing
    .add_handoff(billing, [technical, account])  # Billing can route to technical or account
    .enable_return_to_previous(True)  # User inputs route directly to current specialist
    .build()
)

运行交互式移交工作流

使用用户输入请求管理多轮会话:

from agent_framework import RequestInfoEvent, HandoffUserInputRequest, WorkflowOutputEvent

# Start workflow with initial user message
events = [event async for event in workflow.run_stream("I need help with my order")]

# Process events and collect pending input requests
pending_requests = []
for event in events:
    if isinstance(event, RequestInfoEvent):
        pending_requests.append(event)
        request_data = event.data
        print(f"Agent {request_data.awaiting_agent_id} is awaiting your input")
        for msg in request_data.conversation[-3:]:
            print(f"{msg.author_name}: {msg.text}")

# Interactive loop: respond to requests
while pending_requests:
    user_input = input("You: ")

    # Send responses to all pending requests
    responses = {req.request_id: user_input for req in pending_requests}
    events = [event async for event in workflow.send_responses_streaming(responses)]

    # Process new events
    pending_requests = []
    for event in events:
        if isinstance(event, RequestInfoEvent):
            pending_requests.append(event)
        elif isinstance(event, WorkflowOutputEvent):
            print("Workflow completed!")
            conversation = event.data
            for msg in conversation:
                print(f"{msg.author_name}: {msg.text}")

高级:交接工作流中的工具审批

交接工作流可以包含代理,这些代理携带的工具在执行前需要人工批准。 这对于处理退款、进行购买或执行不可逆操作等敏感操作很有用。

定义需要审批的工具

from typing import Annotated
from agent_framework import ai_function

@ai_function(approval_mode="always_require")
def submit_refund(
    refund_description: Annotated[str, "Description of the refund reason"],
    amount: Annotated[str, "Refund amount"],
    order_id: Annotated[str, "Order ID for the refund"],
) -> str:
    """Submit a refund request for manual review before processing."""
    return f"Refund recorded for order {order_id} (amount: {amount}): {refund_description}"

使用 Approval-Required 工具创建代理

from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

client = AzureOpenAIChatClient(credential=AzureCliCredential())

triage_agent = client.create_agent(
    name="triage_agent",
    instructions=(
        "You are a customer service triage agent. Listen to customer issues and determine "
        "if they need refund help or order tracking. Use handoff_to_refund_agent or "
        "handoff_to_order_agent to transfer them."
    ),
)

refund_agent = client.create_agent(
    name="refund_agent",
    instructions=(
        "You are a refund specialist. Help customers with refund requests. "
        "When the user confirms they want a refund and supplies order details, "
        "call submit_refund to record the request."
    ),
    tools=[submit_refund],
)

order_agent = client.create_agent(
    name="order_agent",
    instructions="You are an order tracking specialist. Help customers track their orders.",
)

处理用户输入和工具审批请求

from agent_framework import (
    FunctionApprovalRequestContent,
    HandoffBuilder,
    HandoffUserInputRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)

workflow = (
    HandoffBuilder(
        name="support_with_approvals",
        participants=[triage_agent, refund_agent, order_agent],
    )
    .set_coordinator("triage_agent")
    .build()
)

pending_requests: list[RequestInfoEvent] = []

# Start workflow
async for event in workflow.run_stream("My order 12345 arrived damaged. I need a refund."):
    if isinstance(event, RequestInfoEvent):
        pending_requests.append(event)

# Process pending requests - could be user input OR tool approval
while pending_requests:
    responses: dict[str, object] = {}

    for request in pending_requests:
        if isinstance(request.data, HandoffUserInputRequest):
            # Agent needs user input
            print(f"Agent {request.data.awaiting_agent_id} asks:")
            for msg in request.data.conversation[-2:]:
                print(f"  {msg.author_name}: {msg.text}")

            user_input = input("You: ")
            responses[request.request_id] = user_input

        elif isinstance(request.data, FunctionApprovalRequestContent):
            # Agent wants to call a tool that requires approval
            func_call = request.data.function_call
            args = func_call.parse_arguments() or {}

            print(f"\nTool approval requested: {func_call.name}")
            print(f"Arguments: {args}")

            approval = input("Approve? (y/n): ").strip().lower() == "y"
            responses[request.request_id] = request.data.create_response(approved=approval)

    # Send all responses and collect new requests
    pending_requests = []
    async for event in workflow.send_responses_streaming(responses):
        if isinstance(event, RequestInfoEvent):
            pending_requests.append(event)
        elif isinstance(event, WorkflowOutputEvent):
            print("\nWorkflow completed!")

持久化工作流检查点

对于长时间运行的工作流,由于工具审批可能在数小时或几天后进行,建议使用检查点机制:

from agent_framework import FileCheckpointStorage

storage = FileCheckpointStorage(storage_path="./checkpoints")

workflow = (
    HandoffBuilder(
        name="durable_support",
        participants=[triage_agent, refund_agent, order_agent],
    )
    .set_coordinator("triage_agent")
    .with_checkpointing(storage)
    .build()
)

# Initial run - workflow pauses when approval is needed
pending_requests = []
async for event in workflow.run_stream("I need a refund for order 12345"):
    if isinstance(event, RequestInfoEvent):
        pending_requests.append(event)

# Process can exit here - checkpoint is saved automatically

# Later: Resume from checkpoint and provide approval
checkpoints = await storage.list_checkpoints()
latest = sorted(checkpoints, key=lambda c: c.timestamp, reverse=True)[0]

# Step 1: Restore checkpoint to reload pending requests
restored_requests = []
async for event in workflow.run_stream(checkpoint_id=latest.checkpoint_id):
    if isinstance(event, RequestInfoEvent):
        restored_requests.append(event)

# Step 2: Send responses
responses = {}
for req in restored_requests:
    if isinstance(req.data, FunctionApprovalRequestContent):
        responses[req.request_id] = req.data.create_response(approved=True)
    elif isinstance(req.data, HandoffUserInputRequest):
        responses[req.request_id] = "Yes, please process the refund."

async for event in workflow.send_responses_streaming(responses):
    if isinstance(event, WorkflowOutputEvent):
        print("Refund workflow completed!")

## Sample Interaction

```plaintext
User: I need help with my order

triage_agent: I'd be happy to help you with your order. Could you please provide more details about the issue?

User: My order 1234 arrived damaged

triage_agent: I'm sorry to hear that your order arrived damaged. I will connect you with a specialist.

support_agent: I'm sorry about the damaged order. To assist you better, could you please:
- Describe the damage
- Would you prefer a replacement or refund?

User: I'd like a refund

triage_agent: I'll connect you with the refund specialist.

refund_agent: I'll process your refund for order 1234. Here's what will happen next:
1. Verification of the damaged items
2. Refund request submission
3. Return instructions if needed
4. Refund processing within 5-10 business days

Could you provide photos of the damage to expedite the process?

关键概念

  • 动态路由:代理可以决定哪个代理应根据上下文处理下一次交互
  • AgentWorkflowBuilder.StartHandoffWith():定义启动工作流的初始代理
  • WithHandoff()WithHandoffs():配置特定代理之间的移交规则
  • 上下文保存:在所有交接中维护完整对话历史记录
  • 多轮次支持:支持使用无缝代理切换的持续对话
  • 专业知识:每个代理专注于自己的领域,同时通过交接进行协作
  • 动态路由:代理可以决定哪个代理应根据上下文处理下一次交互
  • HandoffBuilder:使用自动移交工具注册创建工作流
  • set_coordinator():定义代理首先接收用户输入
  • add_handoff():配置代理之间的特定交接关系
  • enable_return_to_previous():将用户输入直接路由到当前专家,跳过协调器重新评估
  • 上下文保存:在所有交接中维护完整对话历史记录
  • 请求/响应周期:工作流请求用户输入、处理响应并继续,直到满足终止条件
  • 工具审批:用于需要人工审批的敏感操作
  • FunctionApprovalRequestContent:在代理调用需要审批的工具时发出,用于响应 create_response(approved=...)
  • 检查点:使用 with_checkpointing() 实现可以在进程重启时暂停和恢复的持久工作流
  • 专业知识:每个代理专注于自己的领域,同时通过交接进行协作

后续步骤