群组聊天编排对多个参与者之间的协作对话建模,由经理协调确定发言者选择和对话流程。 此模式非常适合需要迭代优化、协作解决问题或多透视分析的方案。
群聊和其他模式之间的差异
与其他多代理模式相比,群聊业务流程具有不同的特征:
- 集中协调:与代理直接转移控制权的交接模式不同,群聊使用经理来协调下一次说话的人员
- 迭代优化:代理可以在多个轮中查看并互相借鉴彼此的反馈进行改进
- 灵活说话人选择:经理可以使用各种策略(轮循机制、基于提示的自定义逻辑)来选择说话人
- 共享上下文:所有代理都会看到完整的对话历史记录,从而实现协作优化
学习内容
- 如何为组协作创建专用代理
- 如何配置说话人选择策略
- 如何使用迭代代理优化生成工作流
- 如何使用自定义管理器自定义聊天流
设置 Azure OpenAI 客户端
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName)
.AsIChatClient();
定义代理
在组会话中为不同角色创建专用代理:
// Create a copywriter agent
ChatClientAgent writer = new(client,
"You are a creative copywriter. Generate catchy slogans and marketing copy. Be concise and impactful.",
"CopyWriter",
"A creative copywriter agent");
// Create a reviewer agent
ChatClientAgent reviewer = new(client,
"You are a marketing reviewer. Evaluate slogans for clarity, impact, and brand alignment. " +
"Provide constructive feedback or approval.",
"Reviewer",
"A marketing review agent");
使用 Round-Robin 管理器配置群组聊天
使用 AgentWorkflowBuilder以下命令生成群组聊天工作流:
// Build group chat with round-robin speaker selection
// The manager factory receives the list of agents and returns a configured manager
var workflow = AgentWorkflowBuilder
.CreateGroupChatBuilderWith(agents =>
new RoundRobinGroupChatManager(agents)
{
MaximumIterationCount = 5 // Maximum number of turns
})
.AddParticipants(writer, reviewer)
.Build();
运行群组聊天工作流
执行工作流并观察迭代对话:
// Start the group chat
var messages = new List<ChatMessage> {
new(ChatRole.User, "Create a slogan for an eco-friendly electric vehicle.")
};
StreamingRun run = await InProcessExecution.StreamAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is AgentRunUpdateEvent update)
{
// Process streaming agent responses
AgentRunResponse response = update.AsResponse();
foreach (ChatMessage message in response.Messages)
{
Console.WriteLine($"[{update.ExecutorId}]: {message.Text}");
}
}
else if (evt is WorkflowOutputEvent output)
{
// Workflow completed
var conversationHistory = output.As<List<ChatMessage>>();
Console.WriteLine("\n=== Final Conversation ===");
foreach (var message in conversationHistory)
{
Console.WriteLine($"{message.AuthorName}: {message.Text}");
}
break;
}
}
示例交互
[CopyWriter]: "Green Dreams, Zero Emissions" - Drive the future with style and sustainability.
[Reviewer]: The slogan is good, but "Green Dreams" might be a bit abstract. Consider something
more direct like "Pure Power, Zero Impact" to emphasize both performance and environmental benefit.
[CopyWriter]: "Pure Power, Zero Impact" - Experience electric excellence without compromise.
[Reviewer]: Excellent! This slogan is clear, impactful, and directly communicates the key benefits.
The tagline reinforces the message perfectly. Approved for use.
[CopyWriter]: Thank you! The final slogan is: "Pure Power, Zero Impact" - Experience electric
excellence without compromise.
设置聊天客户端
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
# Initialize the Azure OpenAI chat client
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
定义代理
创建具有不同角色的专用代理:
from agent_framework import ChatAgent
# Create a researcher agent
researcher = ChatAgent(
name="Researcher",
description="Collects relevant background information.",
instructions="Gather concise facts that help answer the question. Be brief and factual.",
chat_client=chat_client,
)
# Create a writer agent
writer = ChatAgent(
name="Writer",
description="Synthesizes polished answers using gathered information.",
instructions="Compose clear, structured answers using any notes provided. Be comprehensive.",
chat_client=chat_client,
)
使用简单选择器配置群组聊天
使用自定义说话人选择逻辑生成群组聊天:
from agent_framework import GroupChatBuilder, GroupChatStateSnapshot
def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
"""Alternate between researcher and writer for collaborative refinement.
Args:
state: Contains task, participants, conversation, history, and round_index
Returns:
Name of next speaker, or None to finish
"""
round_idx = state["round_index"]
history = state["history"]
# Finish after 4 turns (researcher → writer → researcher → writer)
if round_idx >= 4:
return None
# Alternate speakers
last_speaker = history[-1].speaker if history else None
if last_speaker == "Researcher":
return "Writer"
return "Researcher"
# Build the group chat workflow
workflow = (
GroupChatBuilder()
.set_select_speakers_func(select_next_speaker, display_name="Orchestrator")
.participants([researcher, writer])
.build()
)
使用 Agent-Based 管理器配置群组聊天
或者,使用基于代理的管理器进行智能扬声器选择。 经理是一个完整的ChatAgent,具有访问工具、上下文和可观测性的权限。
# Create coordinator agent for speaker selection
coordinator = ChatAgent(
name="Coordinator",
description="Coordinates multi-agent collaboration by selecting speakers",
instructions="""
You coordinate a team conversation to solve the user's task.
Review the conversation history and select the next participant to speak.
Guidelines:
- Start with Researcher to gather information
- Then have Writer synthesize the final answer
- Only finish after both have contributed meaningfully
- Allow for multiple rounds of information gathering if needed
""",
chat_client=chat_client,
)
# Build group chat with agent-based manager
workflow = (
GroupChatBuilder()
.set_manager(coordinator, display_name="Orchestrator")
.with_termination_condition(lambda messages: sum(1 for msg in messages if msg.role == Role.ASSISTANT) >= 4)
.participants([researcher, writer])
.build()
)
运行群组聊天工作流
执行工作流和处理事件:
from typing import cast
from agent_framework import AgentRunUpdateEvent, Role, WorkflowOutputEvent
task = "What are the key benefits of async/await in Python?"
print(f"Task: {task}\n")
print("=" * 80)
final_conversation: list[ChatMessage] = []
last_executor_id: str | None = None
# Run the workflow
async for event in workflow.run_stream(task):
if isinstance(event, AgentRunUpdateEvent):
# Print streaming agent updates
eid = event.executor_id
if eid != last_executor_id:
if last_executor_id is not None:
print()
print(f"[{eid}]:", end=" ", flush=True)
last_executor_id = eid
print(event.data, end="", flush=True)
elif isinstance(event, WorkflowOutputEvent):
# Workflow completed - data is a list of ChatMessage
final_conversation = cast(list[ChatMessage], event.data)
if final_conversation:
print("\n\n" + "=" * 80)
print("Final Conversation:")
for msg in final_conversation:
author = getattr(msg, "author_name", "Unknown")
text = getattr(msg, "text", str(msg))
print(f"\n[{author}]\n{text}")
print("-" * 80)
print("\nWorkflow completed.")
示例交互
Task: What are the key benefits of async/await in Python?
================================================================================
[Researcher]: Async/await in Python provides non-blocking I/O operations, enabling
concurrent execution without threading overhead. Key benefits include improved
performance for I/O-bound tasks, better resource utilization, and simplified
concurrent code structure using native coroutines.
[Writer]: The key benefits of async/await in Python are:
1. **Non-blocking Operations**: Allows I/O operations to run concurrently without
blocking the main thread, significantly improving performance for network
requests, file I/O, and database queries.
2. **Resource Efficiency**: Avoids the overhead of thread creation and context
switching, making it more memory-efficient than traditional threading.
3. **Simplified Concurrency**: Provides a clean, synchronous-looking syntax for
asynchronous code, making concurrent programs easier to write and maintain.
4. **Scalability**: Enables handling thousands of concurrent connections with
minimal resource consumption, ideal for high-performance web servers and APIs.
--------------------------------------------------------------------------------
Workflow completed.
关键概念
- 集中管理器:群组聊天使用经理协调说话人选择和流
- AgentWorkflowBuilder.CreateGroupChatBuilderWith():使用管理器工厂函数创建工作流
- RoundRobinGroupChatManager:在轮流机制中轮换发言者的内置管理器
- MaximumIterationCount:控制终止前代理的最大轮次数
-
自定义管理器:扩展
RoundRobinGroupChatManager或实现自定义逻辑 - 迭代优化:代理评审和改进彼此的贡献
- 共享上下文:所有参与者都看到完整的对话历史记录
- 灵活管理器策略:在简单的选择器、基于代理的经理或自定义逻辑之间进行选择
- GroupChatBuilder:创建具有可配置扬声器选择的工作流
- set_select_speakers_func():为说话人选择定义自定义 Python 函数
- set_manager():使用基于代理的管理器进行智能扬声器协调
- GroupChatStateSnapshot:为选择决策提供聊天状态
- 迭代协作:代理基于彼此的贡献而构建
-
事件流处理:实时处理
AgentRunUpdateEvent和WorkflowOutputEvent - list[ChatMessage] 输出:所有业务流程返回聊天消息列表
高级:自定义扬声器选择
可以通过创建自定义群组聊天管理器来实现自定义管理器逻辑:
public class ApprovalBasedManager : RoundRobinGroupChatManager
{
private readonly string _approverName;
public ApprovalBasedManager(IReadOnlyList<AIAgent> agents, string approverName)
: base(agents)
{
_approverName = approverName;
}
// Override to add custom termination logic
protected override ValueTask<bool> ShouldTerminateAsync(
IReadOnlyList<ChatMessage> history,
CancellationToken cancellationToken = default)
{
var last = history.LastOrDefault();
bool shouldTerminate = last?.AuthorName == _approverName &&
last.Text?.Contains("approve", StringComparison.OrdinalIgnoreCase) == true;
return ValueTask.FromResult(shouldTerminate);
}
}
// Use custom manager in workflow
var workflow = AgentWorkflowBuilder
.CreateGroupChatBuilderWith(agents =>
new ApprovalBasedManager(agents, "Reviewer")
{
MaximumIterationCount = 10
})
.AddParticipants(writer, reviewer)
.Build();
可以根据聊天状态实现复杂的选择逻辑:
def smart_selector(state: GroupChatStateSnapshot) -> str | None:
"""Select speakers based on conversation content and context."""
round_idx = state["round_index"]
conversation = state["conversation"]
history = state["history"]
# Maximum 10 rounds
if round_idx >= 10:
return None
# First round: always start with researcher
if round_idx == 0:
return "Researcher"
# Check last message content
last_message = conversation[-1] if conversation else None
last_text = getattr(last_message, "text", "").lower()
# If researcher asked a question, let writer respond
if "?" in last_text and history[-1].speaker == "Researcher":
return "Writer"
# If writer provided info, let researcher validate or extend
if history[-1].speaker == "Writer":
return "Researcher"
# Default alternation
return "Writer" if history[-1].speaker == "Researcher" else "Researcher"
workflow = (
GroupChatBuilder()
.set_select_speakers_func(smart_selector, display_name="SmartOrchestrator")
.participants([researcher, writer])
.build()
)
何时使用群组聊天
群组聊天管理很适合:
- 迭代优化:多轮评审和改进
- 协作问题解决:具有互补专业知识的代理协同工作
- 内容创建:用于创建文档的编写者-审阅者工作流
- 多角度分析:从相同输入获取不同视角的观点
- 质量保证:自动评审和审批流程
在以下情况下考虑替代方法:
- 您需要严格的顺序处理(使用顺序编排)
- 代理应完全独立工作(使用并发协调)
- 需要代理间的直接切换(使用 Handoff 编排)
- 需要复杂的动态规划(使用 Magentic 编排)