다음을 통해 공유


사용자 지정 에이전트

Microsoft Agent Framework는 클래스에서 AIAgent 상속하고 필요한 메서드를 구현하여 사용자 지정 에이전트를 빌드할 수 있습니다.

이 문서에서는 사용자 입력을 대문자로 다시 반복하는 간단한 사용자 지정 에이전트를 빌드하는 방법을 보여 줍니다. 대부분의 경우 자체 에이전트를 빌드하는 데는 더 복잡한 논리와 AI 서비스와의 통합이 포함됩니다.

시작하기

필요한 NuGet 패키지를 프로젝트에 추가합니다.

dotnet add package Microsoft.Agents.AI.Abstractions --prerelease

사용자 지정 에이전트 만들기

에이전트 스레드

사용자 지정 에이전트를 만들려면 메시지 기록 및 에이전트가 유지 관리해야 하는 다른 상태를 포함하여 단일 대화의 상태를 추적하는 데 사용되는 스레드도 필요합니다.

쉽게 시작할 수 있도록 공통 스레드 스토리지 메커니즘을 구현하는 다양한 기본 클래스에서 상속할 수 있습니다.

  1. InMemoryAgentThread - 채팅 기록을 메모리에 저장하고 JSON으로 serialize할 수 있습니다.
  2. ServiceIdAgentThread - 채팅 기록을 저장하지 않지만 채팅 기록을 외부에 저장할 수 있는 스레드와 ID를 연결할 수 있습니다.

이 예제에서는 사용자 지정 스레드의 InMemoryAgentThread 기본 클래스로 사용합니다.

internal sealed class CustomAgentThread : InMemoryAgentThread
{
    internal CustomAgentThread() : base() { }
    internal CustomAgentThread(JsonElement serializedThreadState, JsonSerializerOptions? jsonSerializerOptions = null)
        : base(serializedThreadState, jsonSerializerOptions) { }
}

에이전트 클래스

다음으로, AIAgent 클래스를 상속하여 에이전트 클래스 자체를 생성하려고 합니다.

internal sealed class UpperCaseParrotAgent : AIAgent
{
}

스레드 생성

스레드는 항상 에이전트 클래스에서 두 개의 팩터리 메서드를 통해 만들어집니다. 이렇게 하면 에이전트가 스레드를 만들고 역직렬화하는 방법을 제어할 수 있습니다. 따라서 에이전트는 생성 시 스레드에 필요한 추가 상태 또는 동작을 연결할 수 있습니다.

다음 두 가지 메서드를 구현해야 합니다.

    public override AgentThread GetNewThread() => new CustomAgentThread();

    public override AgentThread DeserializeThread(JsonElement serializedThread, JsonSerializerOptions? jsonSerializerOptions = null)
        => new CustomAgentThread(serializedThread, jsonSerializerOptions);

핵심 에이전트 논리

에이전트의 핵심 논리는 입력 메시지를 가져와서 텍스트를 대문자로 변환하고 응답 메시지로 반환하는 것입니다.

이 논리를 포함하도록 다음 메서드를 추가하려고 합니다. 입력 메시지의 다양한 측면을 올바른 응답 메시지로 수정해야 하므로 입력 메시지를 복제하고 있습니다. 예를 들어 역할은 반드시 Assistant로 변경해야 합니다.

    private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
        {
            var messageClone = x.Clone();
            messageClone.Role = ChatRole.Assistant;
            messageClone.MessageId = Guid.NewGuid().ToString();
            messageClone.AuthorName = agentName;
            messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
            {
                AdditionalProperties = tc.AdditionalProperties,
                Annotations = tc.Annotations,
                RawRepresentation = tc.RawRepresentation
            } : c).ToList();
            return messageClone;
        });

에이전트 실행 메서드

마지막으로 에이전트를 실행하는 데 사용되는 두 가지 핵심 메서드를 구현해야 합니다. 하나는 비 스트리밍용이고 다른 하나는 스트리밍용입니다.

두 방법 모두 스레드가 제공되었는지 확인하고 그렇지 않은 경우 새 스레드를 만들어야 합니다. 그런 다음, 스레드를 호출 NotifyThreadOfNewMessagesAsync하여 새 메시지로 업데이트할 수 있습니다. 이렇게 하지 않으면 사용자는 에이전트와 멀티 턴 대화를 할 수 없으며 각 실행은 새로운 상호 작용이 됩니다.

    public override async Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
    {
        thread ??= this.GetNewThread();
        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
        await NotifyThreadOfNewMessagesAsync(thread, messages.Concat(responseMessages), cancellationToken);
        return new AgentRunResponse
        {
            AgentId = this.Id,
            ResponseId = Guid.NewGuid().ToString(),
            Messages = responseMessages
        };
    }

    public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        thread ??= this.GetNewThread();
        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
        await NotifyThreadOfNewMessagesAsync(thread, messages.Concat(responseMessages), cancellationToken);
        foreach (var message in responseMessages)
        {
            yield return new AgentRunResponseUpdate
            {
                AgentId = this.Id,
                AuthorName = this.DisplayName,
                Role = ChatRole.Assistant,
                Contents = message.Contents,
                ResponseId = Guid.NewGuid().ToString(),
                MessageId = Guid.NewGuid().ToString()
            };
        }
    }

에이전트 사용

메서드가 AIAgent 모두 올바르게 구현된 경우 에이전트는 표준이 되며 표준 AIAgent 에이전트 작업을 지원합니다.

에이전트를 실행하고 상호 작용하는 방법에 대한 자세한 내용은 에이전트 시작 자습서 를 참조하세요.

Microsoft Agent Framework는 클래스에서 BaseAgent 상속하고 필요한 메서드를 구현하여 사용자 지정 에이전트를 빌드할 수 있습니다.

이 문서에서는 접두사를 사용하여 사용자 입력을 다시 에코하는 간단한 사용자 지정 에이전트를 빌드하는 방법을 보여줍니다. 대부분의 경우 자체 에이전트를 빌드하는 데는 더 복잡한 논리와 AI 서비스와의 통합이 포함됩니다.

시작하기

필요한 Python 패키지를 프로젝트에 추가합니다.

pip install agent-framework-core --pre

사용자 지정 에이전트 만들기

에이전트 프로토콜

프레임워크는 모든 에이전트가 AgentProtocol 구현해야 하는 인터페이스를 정의하는 프로토콜을 제공합니다. 사용자 지정 에이전트는 이 프로토콜을 직접 구현하거나 편의를 위해 클래스를 확장할 BaseAgent 수 있습니다.

from agent_framework import AgentProtocol, AgentRunResponse, AgentRunResponseUpdate, AgentThread, ChatMessage
from collections.abc import AsyncIterable
from typing import Any

class MyCustomAgent(AgentProtocol):
    """A custom agent that implements the AgentProtocol directly."""

    @property
    def id(self) -> str:
        """Returns the ID of the agent."""
        ...

    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        """Execute the agent and return a complete response."""
        ...

    def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        """Execute the agent and yield streaming response updates."""
        ...

BaseAgent 사용

일반적인 기능을 제공하고 구현을 간소화하는 BaseAgent 클래스를 확장하는 것을 권장합니다.

from agent_framework import (
    BaseAgent,
    AgentRunResponse,
    AgentRunResponseUpdate,
    AgentThread,
    ChatMessage,
    Role,
    TextContent,
)
from collections.abc import AsyncIterable
from typing import Any


class EchoAgent(BaseAgent):
    """A simple custom agent that echoes user messages with a prefix."""

    echo_prefix: str = "Echo: "

    def __init__(
        self,
        *,
        name: str | None = None,
        description: str | None = None,
        echo_prefix: str = "Echo: ",
        **kwargs: Any,
    ) -> None:
        """Initialize the EchoAgent.

        Args:
            name: The name of the agent.
            description: The description of the agent.
            echo_prefix: The prefix to add to echoed messages.
            **kwargs: Additional keyword arguments passed to BaseAgent.
        """
        super().__init__(
            name=name,
            description=description,
            echo_prefix=echo_prefix,
            **kwargs,
        )

    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        """Execute the agent and return a complete response.

        Args:
            messages: The message(s) to process.
            thread: The conversation thread (optional).
            **kwargs: Additional keyword arguments.

        Returns:
            An AgentRunResponse containing the agent's reply.
        """
        # Normalize input messages to a list
        normalized_messages = self._normalize_messages(messages)

        if not normalized_messages:
            response_message = ChatMessage(
                role=Role.ASSISTANT,
                contents=[TextContent(text="Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
            )
        else:
            # For simplicity, echo the last user message
            last_message = normalized_messages[-1]
            if last_message.text:
                echo_text = f"{self.echo_prefix}{last_message.text}"
            else:
                echo_text = f"{self.echo_prefix}[Non-text message received]"

            response_message = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=echo_text)])

        # Notify the thread of new messages if provided
        if thread is not None:
            await self._notify_thread_of_new_messages(thread, normalized_messages, response_message)

        return AgentRunResponse(messages=[response_message])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        """Execute the agent and yield streaming response updates.

        Args:
            messages: The message(s) to process.
            thread: The conversation thread (optional).
            **kwargs: Additional keyword arguments.

        Yields:
            AgentRunResponseUpdate objects containing chunks of the response.
        """
        # Normalize input messages to a list
        normalized_messages = self._normalize_messages(messages)

        if not normalized_messages:
            response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
        else:
            # For simplicity, echo the last user message
            last_message = normalized_messages[-1]
            if last_message.text:
                response_text = f"{self.echo_prefix}{last_message.text}"
            else:
                response_text = f"{self.echo_prefix}[Non-text message received]"

        # Simulate streaming by yielding the response word by word
        words = response_text.split()
        for i, word in enumerate(words):
            # Add space before word except for the first one
            chunk_text = f" {word}" if i > 0 else word

            yield AgentRunResponseUpdate(
                contents=[TextContent(text=chunk_text)],
                role=Role.ASSISTANT,
            )

            # Small delay to simulate streaming
            await asyncio.sleep(0.1)

        # Notify the thread of the complete response if provided
        if thread is not None:
            complete_response = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=response_text)])
            await self._notify_thread_of_new_messages(thread, normalized_messages, complete_response)

에이전트 사용

에이전트 메서드가 모두 올바르게 구현되면 에이전트는 모든 표준 에이전트 작업을 지원합니다.

에이전트를 실행하고 상호 작용하는 방법에 대한 자세한 내용은 에이전트 시작 자습서 를 참조하세요.

다음 단계