Microsoft Agent Framework 支持从 AIAgent 类继承并实现所需方法来生成自定义代理。
本文档演示如何构建一个简单的自定义代理程序,将用户输入转换为大写并重复输出。 在大多数情况下,构建自己的代理将涉及更复杂的逻辑和与 AI 服务的集成。
入门
将所需的 NuGet 包添加到项目。
dotnet add package Microsoft.Agents.AI.Abstractions --prerelease
创建自定义代理
代理线程
若要创建自定义代理,还需要一个线程,该线程用于跟踪单个会话的状态,包括消息历史记录,以及代理需要维护的任何其他状态。
为了便于入门,可以从实现常见线程存储机制的各种基类继承。
-
InMemoryAgentThread- 将聊天历史记录存储在内存中,并且可以序列化为 JSON。 -
ServiceIdAgentThread- 不存储任何聊天历史记录,但允许您将 ID 与线程相关联,在该线程下,聊天历史记录可以存储在外部。
对于此示例,我们将使用 InMemoryAgentThread 自定义线程的基类。
internal sealed class CustomAgentThread : InMemoryAgentThread
{
internal CustomAgentThread() : base() { }
internal CustomAgentThread(JsonElement serializedThreadState, JsonSerializerOptions? jsonSerializerOptions = null)
: base(serializedThreadState, jsonSerializerOptions) { }
}
Agent 类
接下来,我们希望通过从 AIAgent 类继承来创建代理类本身。
internal sealed class UpperCaseParrotAgent : AIAgent
{
}
创建线程
线程始终通过代理类上的两个工厂方法创建。 这允许代理控制线程的创建和反序列化方式。 因此,代理可以在构造时将任何其他状态或行为附加到线程。
需要实现两种方法:
public override AgentThread GetNewThread() => new CustomAgentThread();
public override AgentThread DeserializeThread(JsonElement serializedThread, JsonSerializerOptions? jsonSerializerOptions = null)
=> new CustomAgentThread(serializedThread, jsonSerializerOptions);
核心代理逻辑
代理的核心逻辑是获取任何输入消息,将其文本转换为大写,并将其作为响应消息返回。
我们希望添加以下方法来包含此逻辑。
我们正在克隆输入消息,因为输入消息的各个方面必须修改为有效的响应消息。 例如,角色必须更改为 Assistant。
private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
{
var messageClone = x.Clone();
messageClone.Role = ChatRole.Assistant;
messageClone.MessageId = Guid.NewGuid().ToString();
messageClone.AuthorName = agentName;
messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
{
AdditionalProperties = tc.AdditionalProperties,
Annotations = tc.Annotations,
RawRepresentation = tc.RawRepresentation
} : c).ToList();
return messageClone;
});
代理运行方法
最后,我们需要实现用于运行代理的两种核心方法。 一个用于非流式处理,一个用于流式处理。
对于这两种方法,我们需要确保提供了一个线程,如果不是,则创建一个新线程。
然后,可以通过调用 NotifyThreadOfNewMessagesAsync 来用新消息更新线程。
如果不执行此作,用户将无法与代理进行多轮次对话,并且每次运行都将是一个新的交互。
public override async Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
thread ??= this.GetNewThread();
List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
await NotifyThreadOfNewMessagesAsync(thread, messages.Concat(responseMessages), cancellationToken);
return new AgentRunResponse
{
AgentId = this.Id,
ResponseId = Guid.NewGuid().ToString(),
Messages = responseMessages
};
}
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
thread ??= this.GetNewThread();
List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
await NotifyThreadOfNewMessagesAsync(thread, messages.Concat(responseMessages), cancellationToken);
foreach (var message in responseMessages)
{
yield return new AgentRunResponseUpdate
{
AgentId = this.Id,
AuthorName = this.DisplayName,
Role = ChatRole.Assistant,
Contents = message.Contents,
ResponseId = Guid.NewGuid().ToString(),
MessageId = Guid.NewGuid().ToString()
};
}
}
使用代理
AIAgent如果方法都正确实现,则代理是标准AIAgent代理并支持标准代理操作。
有关如何运行和与代理交互的详细信息,请参阅 代理入门教程 。
Microsoft Agent Framework 支持从 BaseAgent 类继承并实现所需方法来生成自定义代理。
本文档演示如何构建一个简单的自定义代理,该代理会带前缀地回显用户输入。 在大多数情况下,构建自己的代理将涉及更复杂的逻辑和与 AI 服务的集成。
入门
将所需的 Python 包添加到项目。
pip install agent-framework-core --pre
创建自定义代理
代理协议
框架提供的协议 AgentProtocol 定义了所有代理必须实现的接口。 自定义代理程序可以直接实现此协议或为方便起见扩展 BaseAgent 类。
from agent_framework import AgentProtocol, AgentRunResponse, AgentRunResponseUpdate, AgentThread, ChatMessage
from collections.abc import AsyncIterable
from typing import Any
class MyCustomAgent(AgentProtocol):
"""A custom agent that implements the AgentProtocol directly."""
@property
def id(self) -> str:
"""Returns the ID of the agent."""
...
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
"""Execute the agent and return a complete response."""
...
def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
"""Execute the agent and yield streaming response updates."""
...
使用 BaseAgent
建议的方法是扩展 BaseAgent 类,该类提供常见功能和简化实现:
from agent_framework import (
BaseAgent,
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
ChatMessage,
Role,
TextContent,
)
from collections.abc import AsyncIterable
from typing import Any
class EchoAgent(BaseAgent):
"""A simple custom agent that echoes user messages with a prefix."""
echo_prefix: str = "Echo: "
def __init__(
self,
*,
name: str | None = None,
description: str | None = None,
echo_prefix: str = "Echo: ",
**kwargs: Any,
) -> None:
"""Initialize the EchoAgent.
Args:
name: The name of the agent.
description: The description of the agent.
echo_prefix: The prefix to add to echoed messages.
**kwargs: Additional keyword arguments passed to BaseAgent.
"""
super().__init__(
name=name,
description=description,
echo_prefix=echo_prefix,
**kwargs,
)
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
"""Execute the agent and return a complete response.
Args:
messages: The message(s) to process.
thread: The conversation thread (optional).
**kwargs: Additional keyword arguments.
Returns:
An AgentRunResponse containing the agent's reply.
"""
# Normalize input messages to a list
normalized_messages = self._normalize_messages(messages)
if not normalized_messages:
response_message = ChatMessage(
role=Role.ASSISTANT,
contents=[TextContent(text="Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
)
else:
# For simplicity, echo the last user message
last_message = normalized_messages[-1]
if last_message.text:
echo_text = f"{self.echo_prefix}{last_message.text}"
else:
echo_text = f"{self.echo_prefix}[Non-text message received]"
response_message = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=echo_text)])
# Notify the thread of new messages if provided
if thread is not None:
await self._notify_thread_of_new_messages(thread, normalized_messages, response_message)
return AgentRunResponse(messages=[response_message])
async def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
"""Execute the agent and yield streaming response updates.
Args:
messages: The message(s) to process.
thread: The conversation thread (optional).
**kwargs: Additional keyword arguments.
Yields:
AgentRunResponseUpdate objects containing chunks of the response.
"""
# Normalize input messages to a list
normalized_messages = self._normalize_messages(messages)
if not normalized_messages:
response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
else:
# For simplicity, echo the last user message
last_message = normalized_messages[-1]
if last_message.text:
response_text = f"{self.echo_prefix}{last_message.text}"
else:
response_text = f"{self.echo_prefix}[Non-text message received]"
# Simulate streaming by yielding the response word by word
words = response_text.split()
for i, word in enumerate(words):
# Add space before word except for the first one
chunk_text = f" {word}" if i > 0 else word
yield AgentRunResponseUpdate(
contents=[TextContent(text=chunk_text)],
role=Role.ASSISTANT,
)
# Small delay to simulate streaming
await asyncio.sleep(0.1)
# Notify the thread of the complete response if provided
if thread is not None:
complete_response = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=response_text)])
await self._notify_thread_of_new_messages(thread, normalized_messages, complete_response)
使用代理
如果代理方法都正确实现,那么代理将支持所有标准代理操作。
有关如何运行和与代理交互的详细信息,请参阅 代理入门教程 。