自定义代理

Microsoft Agent Framework 支持从 AIAgent 类继承并实现所需方法来生成自定义代理。

本文档演示如何构建一个简单的自定义代理程序,将用户输入转换为大写并重复输出。 在大多数情况下,构建自己的代理将涉及更复杂的逻辑和与 AI 服务的集成。

入门

将所需的 NuGet 包添加到项目。

dotnet add package Microsoft.Agents.AI.Abstractions --prerelease

创建自定义代理

代理线程

若要创建自定义代理,还需要一个线程,该线程用于跟踪单个会话的状态,包括消息历史记录,以及代理需要维护的任何其他状态。

为了便于入门,可以从实现常见线程存储机制的各种基类继承。

  1. InMemoryAgentThread - 将聊天历史记录存储在内存中,并且可以序列化为 JSON。
  2. ServiceIdAgentThread - 不存储任何聊天历史记录,但允许您将 ID 与线程相关联,在该线程下,聊天历史记录可以存储在外部。

对于此示例,我们将使用 InMemoryAgentThread 自定义线程的基类。

internal sealed class CustomAgentThread : InMemoryAgentThread
{
    internal CustomAgentThread() : base() { }
    internal CustomAgentThread(JsonElement serializedThreadState, JsonSerializerOptions? jsonSerializerOptions = null)
        : base(serializedThreadState, jsonSerializerOptions) { }
}

Agent 类

接下来,我们希望通过从 AIAgent 类继承来创建代理类本身。

internal sealed class UpperCaseParrotAgent : AIAgent
{
}

创建线程

线程始终通过代理类上的两个工厂方法创建。 这允许代理控制线程的创建和反序列化方式。 因此,代理可以在构造时将任何其他状态或行为附加到线程。

需要实现两种方法:

    public override AgentThread GetNewThread() => new CustomAgentThread();

    public override AgentThread DeserializeThread(JsonElement serializedThread, JsonSerializerOptions? jsonSerializerOptions = null)
        => new CustomAgentThread(serializedThread, jsonSerializerOptions);

核心代理逻辑

代理的核心逻辑是获取任何输入消息,将其文本转换为大写,并将其作为响应消息返回。

我们希望添加以下方法来包含此逻辑。 我们正在克隆输入消息,因为输入消息的各个方面必须修改为有效的响应消息。 例如,角色必须更改为 Assistant

    private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
        {
            var messageClone = x.Clone();
            messageClone.Role = ChatRole.Assistant;
            messageClone.MessageId = Guid.NewGuid().ToString();
            messageClone.AuthorName = agentName;
            messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
            {
                AdditionalProperties = tc.AdditionalProperties,
                Annotations = tc.Annotations,
                RawRepresentation = tc.RawRepresentation
            } : c).ToList();
            return messageClone;
        });

代理运行方法

最后,我们需要实现用于运行代理的两种核心方法。 一个用于非流式处理,一个用于流式处理。

对于这两种方法,我们需要确保提供了一个线程,如果不是,则创建一个新线程。 然后,可以通过调用 NotifyThreadOfNewMessagesAsync 来用新消息更新线程。 如果不执行此作,用户将无法与代理进行多轮次对话,并且每次运行都将是一个新的交互。

    public override async Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
    {
        thread ??= this.GetNewThread();
        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
        await NotifyThreadOfNewMessagesAsync(thread, messages.Concat(responseMessages), cancellationToken);
        return new AgentRunResponse
        {
            AgentId = this.Id,
            ResponseId = Guid.NewGuid().ToString(),
            Messages = responseMessages
        };
    }

    public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        thread ??= this.GetNewThread();
        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
        await NotifyThreadOfNewMessagesAsync(thread, messages.Concat(responseMessages), cancellationToken);
        foreach (var message in responseMessages)
        {
            yield return new AgentRunResponseUpdate
            {
                AgentId = this.Id,
                AuthorName = this.DisplayName,
                Role = ChatRole.Assistant,
                Contents = message.Contents,
                ResponseId = Guid.NewGuid().ToString(),
                MessageId = Guid.NewGuid().ToString()
            };
        }
    }

使用代理

AIAgent如果方法都正确实现,则代理是标准AIAgent代理并支持标准代理操作。

有关如何运行和与代理交互的详细信息,请参阅 代理入门教程

Microsoft Agent Framework 支持从 BaseAgent 类继承并实现所需方法来生成自定义代理。

本文档演示如何构建一个简单的自定义代理,该代理会带前缀地回显用户输入。 在大多数情况下,构建自己的代理将涉及更复杂的逻辑和与 AI 服务的集成。

入门

将所需的 Python 包添加到项目。

pip install agent-framework-core --pre

创建自定义代理

代理协议

框架提供的协议 AgentProtocol 定义了所有代理必须实现的接口。 自定义代理程序可以直接实现此协议或为方便起见扩展 BaseAgent 类。

from agent_framework import AgentProtocol, AgentRunResponse, AgentRunResponseUpdate, AgentThread, ChatMessage
from collections.abc import AsyncIterable
from typing import Any

class MyCustomAgent(AgentProtocol):
    """A custom agent that implements the AgentProtocol directly."""

    @property
    def id(self) -> str:
        """Returns the ID of the agent."""
        ...

    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        """Execute the agent and return a complete response."""
        ...

    def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        """Execute the agent and yield streaming response updates."""
        ...

使用 BaseAgent

建议的方法是扩展 BaseAgent 类,该类提供常见功能和简化实现:

from agent_framework import (
    BaseAgent,
    AgentRunResponse,
    AgentRunResponseUpdate,
    AgentThread,
    ChatMessage,
    Role,
    TextContent,
)
from collections.abc import AsyncIterable
from typing import Any


class EchoAgent(BaseAgent):
    """A simple custom agent that echoes user messages with a prefix."""

    echo_prefix: str = "Echo: "

    def __init__(
        self,
        *,
        name: str | None = None,
        description: str | None = None,
        echo_prefix: str = "Echo: ",
        **kwargs: Any,
    ) -> None:
        """Initialize the EchoAgent.

        Args:
            name: The name of the agent.
            description: The description of the agent.
            echo_prefix: The prefix to add to echoed messages.
            **kwargs: Additional keyword arguments passed to BaseAgent.
        """
        super().__init__(
            name=name,
            description=description,
            echo_prefix=echo_prefix,
            **kwargs,
        )

    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        """Execute the agent and return a complete response.

        Args:
            messages: The message(s) to process.
            thread: The conversation thread (optional).
            **kwargs: Additional keyword arguments.

        Returns:
            An AgentRunResponse containing the agent's reply.
        """
        # Normalize input messages to a list
        normalized_messages = self._normalize_messages(messages)

        if not normalized_messages:
            response_message = ChatMessage(
                role=Role.ASSISTANT,
                contents=[TextContent(text="Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
            )
        else:
            # For simplicity, echo the last user message
            last_message = normalized_messages[-1]
            if last_message.text:
                echo_text = f"{self.echo_prefix}{last_message.text}"
            else:
                echo_text = f"{self.echo_prefix}[Non-text message received]"

            response_message = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=echo_text)])

        # Notify the thread of new messages if provided
        if thread is not None:
            await self._notify_thread_of_new_messages(thread, normalized_messages, response_message)

        return AgentRunResponse(messages=[response_message])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        """Execute the agent and yield streaming response updates.

        Args:
            messages: The message(s) to process.
            thread: The conversation thread (optional).
            **kwargs: Additional keyword arguments.

        Yields:
            AgentRunResponseUpdate objects containing chunks of the response.
        """
        # Normalize input messages to a list
        normalized_messages = self._normalize_messages(messages)

        if not normalized_messages:
            response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
        else:
            # For simplicity, echo the last user message
            last_message = normalized_messages[-1]
            if last_message.text:
                response_text = f"{self.echo_prefix}{last_message.text}"
            else:
                response_text = f"{self.echo_prefix}[Non-text message received]"

        # Simulate streaming by yielding the response word by word
        words = response_text.split()
        for i, word in enumerate(words):
            # Add space before word except for the first one
            chunk_text = f" {word}" if i > 0 else word

            yield AgentRunResponseUpdate(
                contents=[TextContent(text=chunk_text)],
                role=Role.ASSISTANT,
            )

            # Small delay to simulate streaming
            await asyncio.sleep(0.1)

        # Notify the thread of the complete response if provided
        if thread is not None:
            complete_response = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=response_text)])
            await self._notify_thread_of_new_messages(thread, normalized_messages, complete_response)

使用代理

如果代理方法都正确实现,那么代理将支持所有标准代理操作。

有关如何运行和与代理交互的详细信息,请参阅 代理入门教程

后续步骤