다음을 통해 공유


Microsoft 에이전트 프레임워크 워크플로우 - 에이전트와 작업하기

이 페이지에서는 Microsoft 에이전트 프레임워크 워크플로 내에서 에이전트를 사용하는 방법에 대한 개요를 제공합니다.

개요

워크플로에 인텔리전스를 추가하려면 워크플로 실행의 일부로 AI 에이전트를 활용할 수 있습니다. AI 에이전트를 워크플로에 쉽게 통합할 수 있으므로 이전에는 달성하기 어려웠던 복잡하고 지능적인 솔루션을 만들 수 있습니다.

워크플로에 에이전트 직접 추가

에지를 통해 워크플로에 에이전트를 추가할 수 있습니다.

using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// Create the agents first
AIAgent agentA = new ChatClientAgent(chatClient, instructions);
AIAgent agentB = new ChatClientAgent(chatClient, instructions);

// Build a workflow with the agents
WorkflowBuilder builder = new(agentA);
builder.AddEdge(agentA, agentB);
Workflow<ChatMessage> workflow = builder.Build<ChatMessage>();

워크플로 실행

위에서 만든 워크플로 내에서 에이전트는 실제로 워크플로의 다른 부분과 에이전트 통신을 처리하는 실행기 내부에 래핑됩니다. 실행기는 다음 세 가지 메시지 유형을 처리할 수 있습니다.

  • ChatMessage: 단일 채팅 메시지
  • List<ChatMessage>: 채팅 메시지 목록
  • TurnToken: 새 턴의 시작을 알리는 턴 토큰

실행기는 에이전트가 을 받을 TurnToken때까지 응답하도록 트리거하지 않습니다. TurnToken 이전에 수신된 모든 메시지는 버퍼링되고, TurnToken 수신 시 에이전트에게 전송됩니다.

StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, "Hello World!"));
// Must send the turn token to trigger the agents. The agents are wrapped as executors.
// When they receive messages, they will cache the messages and only start processing
// when they receive a TurnToken. The turn token will be passed from one agent to the next.
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    // The agents will run in streaming mode and an AgentRunUpdateEvent
    // will be emitted as new chunks are generated.
    if (evt is AgentRunUpdateEvent agentRunUpdate)
    {
        Console.WriteLine($"{agentRunUpdate.ExecutorId}: {agentRunUpdate.Data}");
    }
}

기본 제공 에이전트 실행기 사용

에지를 통해 워크플로에 에이전트를 추가할 수 있습니다.

from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential

# Create the agents first
chat_client = AzureChatClient(credential=AzureCliCredential())
writer_agent: ChatAgent = chat_client.create_agent(
    instructions=(
        "You are an excellent content writer. You create new content and edit contents based on the feedback."
    ),
    name="writer_agent",
)
reviewer_agent = chat_client.create_agent(
    instructions=(
        "You are an excellent content reviewer."
        "Provide actionable feedback to the writer about the provided content."
        "Provide the feedback in the most concise manner possible."
    ),
    name="reviewer_agent",
)

# Build a workflow with the agents
builder = WorkflowBuilder()
builder.set_start_executor(writer_agent)
builder.add_edge(writer_agent, reviewer_agent)
workflow = builder.build()

워크플로 실행

위에서 만든 워크플로 내에서 에이전트는 실제로 워크플로의 다른 부분과 에이전트 통신을 처리하는 실행기 내부에 래핑됩니다. 실행기는 다음 세 가지 메시지 유형을 처리할 수 있습니다.

  • str: 문자열 형식의 단일 채팅 메시지
  • ChatMessage: 단일 채팅 메시지
  • List<ChatMessage>: 채팅 메시지 목록

실행기가 이러한 형식 중 하나의 메시지를 받을 때마다 에이전트가 응답하도록 트리거되고 응답 형식이 개체가 AgentExecutorResponse 됩니다. 이 클래스에는 다음을 포함하여 에이전트의 응답에 대한 유용한 정보가 포함되어 있습니다.

  • executor_id: 이 응답을 생성한 실행기의 ID입니다.
  • agent_run_response: 에이전트의 전체 응답
  • full_conversation: 이 시점까지의 전체 대화 기록

워크플로를 실행할 때 에이전트의 응답과 관련된 두 가지 가능한 이벤트 유형을 내보낼 수 있습니다.

  • AgentRunUpdateEvent 스트리밍 모드에서 생성되는 에이전트 응답의 청크를 포함합니다.
  • AgentRunEvent 비 스트리밍 모드에서 에이전트의 전체 응답을 포함합니다.

기본적으로 에이전트는 스트리밍 모드에서 실행되는 실행기에 래핑됩니다. 사용자 지정 실행기를 만들어 이 동작을 사용자 지정할 수 있습니다. 자세한 내용은 다음 섹션을 참조하세요.

last_executor_id = None
async for event in workflow.run_streaming("Write a short blog post about AI agents."):
    if isinstance(event, AgentRunUpdateEvent):
        if event.executor_id != last_executor_id:
            if last_executor_id is not None:
                print()
            print(f"{event.executor_id}:", end=" ", flush=True)
            last_executor_id = event.executor_id
        print(event.data, end="", flush=True)

사용자 지정 에이전트 실행기 사용

경우에 따라 AI 에이전트를 워크플로에 통합하는 방법을 사용자 지정하려고 할 수 있습니다. 사용자 지정 실행기를 만들어 이 작업을 수행할 수 있습니다. 이렇게 하면 다음을 제어할 수 있습니다.

  • 에이전트 호출: 스트리밍 또는 비 스트리밍
  • 사용자 지정 메시지 유형을 포함하여 에이전트가 처리할 메시지 형식
  • 초기화 및 정리를 포함하여 에이전트의 수명 주기
  • 에이전트 스레드 및 기타 리소스 사용
  • 사용자 지정 이벤트를 포함하여 에이전트 실행 중에 내보내는 추가 이벤트
  • 공유 상태 및 요청/응답과 같은 다른 워크플로 기능과의 통합
internal sealed class CustomAgentExecutor : Executor<CustomInput, CustomOutput>("CustomAgentExecutor")
{
    private readonly AIAgent _agent;

    /// <summary>
    /// Creates a new instance of the <see cref="CustomAgentExecutor"/> class.
    /// </summary>
    /// <param name="agent">The AI agent used for custom processing</param>
    public CustomAgentExecutor(AIAgent agent) : base("CustomAgentExecutor")
    {
        this._agent = agent;
    }

    public async ValueTask<CustomOutput> HandleAsync(CustomInput message, IWorkflowContext context)
    {
        // Retrieve any shared states if needed
        var sharedState = await context.ReadStateAsync<SharedStateType>("sharedStateId", scopeName: "SharedStateScope");

        // Render the input for the agent
        var agentInput = RenderInput(message, sharedState);

        // Invoke the agent
        // Assume the agent is configured with structured outputs with type `CustomOutput`
        var response = await this._agent.RunAsync(agentInput);
        var customOutput = JsonSerializer.Deserialize<CustomOutput>(response.Text);

        return customOutput;
    }
}
from agent_framework import (
    ChatAgent,
    ChatMessage,
    Executor,
    WorkflowContext,
    handler
)

class Writer(Executor):

    agent: ChatAgent

    def __init__(self, chat_client: AzureChatClient, id: str = "writer"):
        # Create a domain specific agent using your configured AzureChatClient.
        agent = chat_client.create_agent(
            instructions=(
                "You are an excellent content writer. You create new content and edit contents based on the feedback."
            ),
        )
        # Associate the agent with this executor node. The base Executor stores it on self.agent.
        super().__init__(agent=agent, id=id)

    @handler
    async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None:
        """Handles a single chat message and forwards the accumulated messages to the next executor in the workflow."""
        # Invoke the agent with the incoming message and get the response
        messages: list[ChatMessage] = [message]
        response = await self.agent.run(messages)
        # Accumulate messages and send them to the next executor in the workflow.
        messages.extend(response.messages)
        await ctx.send_message(messages)

다음 단계