Microsoft代理框架工作流编排 - 群聊

群组聊天编排对多个参与者之间的协作对话建模,由经理协调确定发言者选择和对话流程。 此模式非常适合需要迭代优化、协作解决问题或多透视分析的方案。

群聊和其他模式之间的差异

与其他多代理模式相比,群聊业务流程具有不同的特征:

  • 集中协调:与代理直接转移控制权的交接模式不同,群聊使用经理来协调下一次说话的人员
  • 迭代优化:代理可以在多个轮中查看并互相借鉴彼此的反馈进行改进
  • 灵活说话人选择:经理可以使用各种策略(轮循机制、基于提示的自定义逻辑)来选择说话人
  • 共享上下文:所有代理都会看到完整的对话历史记录,从而实现协作优化

学习内容

  • 如何为组协作创建专用代理
  • 如何配置说话人选择策略
  • 如何使用迭代代理优化生成工作流
  • 如何使用自定义管理器自定义聊天流

设置 Azure OpenAI 客户端

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
    .GetChatClient(deploymentName)
    .AsIChatClient();

定义代理

在组会话中为不同角色创建专用代理:

// Create a copywriter agent
ChatClientAgent writer = new(client,
    "You are a creative copywriter. Generate catchy slogans and marketing copy. Be concise and impactful.",
    "CopyWriter",
    "A creative copywriter agent");

// Create a reviewer agent
ChatClientAgent reviewer = new(client,
    "You are a marketing reviewer. Evaluate slogans for clarity, impact, and brand alignment. " +
    "Provide constructive feedback or approval.",
    "Reviewer",
    "A marketing review agent");

使用 Round-Robin 管理器配置群组聊天

使用 AgentWorkflowBuilder以下命令生成群组聊天工作流:

// Build group chat with round-robin speaker selection
// The manager factory receives the list of agents and returns a configured manager
var workflow = AgentWorkflowBuilder
    .CreateGroupChatBuilderWith(agents => 
        new RoundRobinGroupChatManager(agents) 
        { 
            MaximumIterationCount = 5  // Maximum number of turns
        })
    .AddParticipants(writer, reviewer)
    .Build();

运行群组聊天工作流

执行工作流并观察迭代对话:

// Start the group chat
var messages = new List<ChatMessage> { 
    new(ChatRole.User, "Create a slogan for an eco-friendly electric vehicle.") 
};

StreamingRun run = await InProcessExecution.StreamAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is AgentRunUpdateEvent update)
    {
        // Process streaming agent responses
        AgentRunResponse response = update.AsResponse();
        foreach (ChatMessage message in response.Messages)
        {
            Console.WriteLine($"[{update.ExecutorId}]: {message.Text}");
        }
    }
    else if (evt is WorkflowOutputEvent output)
    {
        // Workflow completed
        var conversationHistory = output.As<List<ChatMessage>>();
        Console.WriteLine("\n=== Final Conversation ===");
        foreach (var message in conversationHistory)
        {
            Console.WriteLine($"{message.AuthorName}: {message.Text}");
        }
        break;
    }
}

示例交互

[CopyWriter]: "Green Dreams, Zero Emissions" - Drive the future with style and sustainability.

[Reviewer]: The slogan is good, but "Green Dreams" might be a bit abstract. Consider something 
more direct like "Pure Power, Zero Impact" to emphasize both performance and environmental benefit.

[CopyWriter]: "Pure Power, Zero Impact" - Experience electric excellence without compromise.

[Reviewer]: Excellent! This slogan is clear, impactful, and directly communicates the key benefits. 
The tagline reinforces the message perfectly. Approved for use.

[CopyWriter]: Thank you! The final slogan is: "Pure Power, Zero Impact" - Experience electric 
excellence without compromise.

设置聊天客户端

from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

# Initialize the Azure OpenAI chat client
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

定义代理

创建具有不同角色的专用代理:

from agent_framework import ChatAgent

# Create a researcher agent
researcher = ChatAgent(
    name="Researcher",
    description="Collects relevant background information.",
    instructions="Gather concise facts that help answer the question. Be brief and factual.",
    chat_client=chat_client,
)

# Create a writer agent
writer = ChatAgent(
    name="Writer",
    description="Synthesizes polished answers using gathered information.",
    instructions="Compose clear, structured answers using any notes provided. Be comprehensive.",
    chat_client=chat_client,
)

使用简单选择器配置群组聊天

使用自定义说话人选择逻辑生成群组聊天:

from agent_framework import GroupChatBuilder, GroupChatStateSnapshot

def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
    """Alternate between researcher and writer for collaborative refinement.

    Args:
        state: Contains task, participants, conversation, history, and round_index

    Returns:
        Name of next speaker, or None to finish
    """
    round_idx = state["round_index"]
    history = state["history"]

    # Finish after 4 turns (researcher → writer → researcher → writer)
    if round_idx >= 4:
        return None

    # Alternate speakers
    last_speaker = history[-1].speaker if history else None
    if last_speaker == "Researcher":
        return "Writer"
    return "Researcher"

# Build the group chat workflow
workflow = (
    GroupChatBuilder()
    .set_select_speakers_func(select_next_speaker, display_name="Orchestrator")
    .participants([researcher, writer])
    .build()
)

使用 Agent-Based 管理器配置群组聊天

或者,使用基于代理的管理器进行智能扬声器选择。 经理是一个完整的ChatAgent,具有访问工具、上下文和可观测性的权限。

# Create coordinator agent for speaker selection
coordinator = ChatAgent(
    name="Coordinator",
    description="Coordinates multi-agent collaboration by selecting speakers",
    instructions="""
You coordinate a team conversation to solve the user's task.

Review the conversation history and select the next participant to speak.

Guidelines:
- Start with Researcher to gather information
- Then have Writer synthesize the final answer
- Only finish after both have contributed meaningfully
- Allow for multiple rounds of information gathering if needed
""",
    chat_client=chat_client,
)

# Build group chat with agent-based manager
workflow = (
    GroupChatBuilder()
    .set_manager(coordinator, display_name="Orchestrator")
    .with_termination_condition(lambda messages: sum(1 for msg in messages if msg.role == Role.ASSISTANT) >= 4)
    .participants([researcher, writer])
    .build()
)

运行群组聊天工作流

执行工作流和处理事件:

from typing import cast
from agent_framework import AgentRunUpdateEvent, Role, WorkflowOutputEvent

task = "What are the key benefits of async/await in Python?"

print(f"Task: {task}\n")
print("=" * 80)

final_conversation: list[ChatMessage] = []
last_executor_id: str | None = None

# Run the workflow
async for event in workflow.run_stream(task):
    if isinstance(event, AgentRunUpdateEvent):
        # Print streaming agent updates
        eid = event.executor_id
        if eid != last_executor_id:
            if last_executor_id is not None:
                print()
            print(f"[{eid}]:", end=" ", flush=True)
            last_executor_id = eid
        print(event.data, end="", flush=True)
    elif isinstance(event, WorkflowOutputEvent):
        # Workflow completed - data is a list of ChatMessage
        final_conversation = cast(list[ChatMessage], event.data)

if final_conversation:
    print("\n\n" + "=" * 80)
    print("Final Conversation:")
    for msg in final_conversation:
        author = getattr(msg, "author_name", "Unknown")
        text = getattr(msg, "text", str(msg))
        print(f"\n[{author}]\n{text}")
        print("-" * 80)

print("\nWorkflow completed.")

示例交互

Task: What are the key benefits of async/await in Python?

================================================================================

[Researcher]: Async/await in Python provides non-blocking I/O operations, enabling 
concurrent execution without threading overhead. Key benefits include improved 
performance for I/O-bound tasks, better resource utilization, and simplified 
concurrent code structure using native coroutines.

[Writer]: The key benefits of async/await in Python are:

1. **Non-blocking Operations**: Allows I/O operations to run concurrently without 
   blocking the main thread, significantly improving performance for network 
   requests, file I/O, and database queries.

2. **Resource Efficiency**: Avoids the overhead of thread creation and context 
   switching, making it more memory-efficient than traditional threading.

3. **Simplified Concurrency**: Provides a clean, synchronous-looking syntax for 
   asynchronous code, making concurrent programs easier to write and maintain.

4. **Scalability**: Enables handling thousands of concurrent connections with 
   minimal resource consumption, ideal for high-performance web servers and APIs.

--------------------------------------------------------------------------------

Workflow completed.

关键概念

  • 集中管理器:群组聊天使用经理协调说话人选择和流
  • AgentWorkflowBuilder.CreateGroupChatBuilderWith():使用管理器工厂函数创建工作流
  • RoundRobinGroupChatManager:在轮流机制中轮换发言者的内置管理器
  • MaximumIterationCount:控制终止前代理的最大轮次数
  • 自定义管理器:扩展 RoundRobinGroupChatManager 或实现自定义逻辑
  • 迭代优化:代理评审和改进彼此的贡献
  • 共享上下文:所有参与者都看到完整的对话历史记录
  • 灵活管理器策略:在简单的选择器、基于代理的经理或自定义逻辑之间进行选择
  • GroupChatBuilder:创建具有可配置扬声器选择的工作流
  • set_select_speakers_func():为说话人选择定义自定义 Python 函数
  • set_manager():使用基于代理的管理器进行智能扬声器协调
  • GroupChatStateSnapshot:为选择决策提供聊天状态
  • 迭代协作:代理基于彼此的贡献而构建
  • 事件流处理:实时处理 AgentRunUpdateEventWorkflowOutputEvent
  • list[ChatMessage] 输出:所有业务流程返回聊天消息列表

高级:自定义扬声器选择

可以通过创建自定义群组聊天管理器来实现自定义管理器逻辑:

public class ApprovalBasedManager : RoundRobinGroupChatManager
{
    private readonly string _approverName;

    public ApprovalBasedManager(IReadOnlyList<AIAgent> agents, string approverName) 
        : base(agents)
    {
        _approverName = approverName;
    }

    // Override to add custom termination logic
    protected override ValueTask<bool> ShouldTerminateAsync(
        IReadOnlyList<ChatMessage> history, 
        CancellationToken cancellationToken = default)
    {
        var last = history.LastOrDefault();
        bool shouldTerminate = last?.AuthorName == _approverName &&
            last.Text?.Contains("approve", StringComparison.OrdinalIgnoreCase) == true;

        return ValueTask.FromResult(shouldTerminate);
    }
}

// Use custom manager in workflow
var workflow = AgentWorkflowBuilder
    .CreateGroupChatBuilderWith(agents => 
        new ApprovalBasedManager(agents, "Reviewer") 
        { 
            MaximumIterationCount = 10 
        })
    .AddParticipants(writer, reviewer)
    .Build();

可以根据聊天状态实现复杂的选择逻辑:

def smart_selector(state: GroupChatStateSnapshot) -> str | None:
    """Select speakers based on conversation content and context."""
    round_idx = state["round_index"]
    conversation = state["conversation"]
    history = state["history"]

    # Maximum 10 rounds
    if round_idx >= 10:
        return None

    # First round: always start with researcher
    if round_idx == 0:
        return "Researcher"

    # Check last message content
    last_message = conversation[-1] if conversation else None
    last_text = getattr(last_message, "text", "").lower()

    # If researcher asked a question, let writer respond
    if "?" in last_text and history[-1].speaker == "Researcher":
        return "Writer"

    # If writer provided info, let researcher validate or extend
    if history[-1].speaker == "Writer":
        return "Researcher"

    # Default alternation
    return "Writer" if history[-1].speaker == "Researcher" else "Researcher"

workflow = (
    GroupChatBuilder()
    .set_select_speakers_func(smart_selector, display_name="SmartOrchestrator")
    .participants([researcher, writer])
    .build()
)

何时使用群组聊天

群组聊天管理很适合:

  • 迭代优化:多轮评审和改进
  • 协作问题解决:具有互补专业知识的代理协同工作
  • 内容创建:用于创建文档的编写者-审阅者工作流
  • 多角度分析:从相同输入获取不同视角的观点
  • 质量保证:自动评审和审批流程

在以下情况下考虑替代方法:

  • 您需要严格的顺序处理(使用顺序编排)
  • 代理应完全独立工作(使用并发协调)
  • 需要代理间的直接切换(使用 Handoff 编排)
  • 需要复杂的动态规划(使用 Magentic 编排)

后续步骤