이 문서에서는 Microsoft 에이전트 프레임워크에서 워크플로를 에이전트로 사용하는 방법에 대한 개요를 제공합니다.
개요
경우에 따라 여러 에이전트, 사용자 지정 실행기 및 복잡한 논리를 사용하여 정교한 워크플로를 빌드했지만 다른 에이전트와 마찬가지로 사용하려고 합니다. 이것이 바로 워크플로 에이전트가 수행할 수 있는 작업입니다. 워크플로를 Agent로 래핑하면 간단한 채팅 에이전트에서 사용하는 익숙한 API를 통해 상호작용할 수 있습니다.
주요 이점
- 통합 인터페이스: 단순 에이전트와 동일한 API를 사용하여 복잡한 워크플로와 상호 작용
- API 호환성: 에이전트 인터페이스를 지원하는 기존 시스템과 워크플로 통합
- 구성성: 더 큰 에이전트 시스템 또는 기타 워크플로에서 워크플로 에이전트를 구성 요소로 사용
- 스레드 관리: 대화 상태, 검사점 및 다시 시작에 에이전트 스레드 활용
- 스트리밍 지원: 워크플로가 실행됨에 따라 실시간 업데이트 가져오기
작동 방식
워크플로를 에이전트로 변환하는 경우:
- 워크플로의 유효성을 검사하여 시작 실행기가 채팅 메시지를 수락할 수 있는지 확인합니다.
- 대화 상태 및 검사점을 관리하기 위해 스레드가 만들어집니다.
- 입력 메시지는 워크플로의 시작 실행기로 라우팅됩니다.
- 워크플로 이벤트가 에이전트 응답 업데이트로 변환됩니다.
- 외부 입력 요청(원본
RequestInfoExecutor)이 함수 호출로 표시됩니다.
요구 사항
워크플로를 에이전트로 사용하려면 워크플로의 시작 실행기가 입력으로 처리 IEnumerable<ChatMessage> 할 수 있어야 합니다.
ChatClientAgent 또는 다른 에이전트 기반 실행기를 사용할 때 이 조건은 자동으로 충족됩니다.
워크플로 에이전트 만들기
확장 메서드를 AsAgent() 사용하여 호환되는 워크플로를 에이전트로 변환합니다.
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
// First, build your workflow
var workflow = AgentWorkflowBuilder
.CreateSequentialPipeline(researchAgent, writerAgent, reviewerAgent)
.Build();
// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAgent(
id: "content-pipeline",
name: "Content Pipeline Agent",
description: "A multi-agent workflow that researches, writes, and reviews content"
);
AsAgent 매개 변수
| 매개 변수 | 유형 | Description |
|---|---|---|
id |
string? |
에이전트에 대한 선택적 고유 식별자입니다. 제공되지 않은 경우 자동으로 생성됩니다. |
name |
string? |
에이전트의 선택적 표시 이름입니다. |
description |
string? |
에이전트의 용도에 대한 선택적 설명입니다. |
checkpointManager |
CheckpointManager? |
세션 간 지속성을 위한 선택적 검사점 관리자입니다. |
executionEnvironment |
IWorkflowExecutionEnvironment? |
선택적 실행 환경입니다. 기본값은 워크플로 구성에 따라 InProcessExecution.OffThread 또는 InProcessExecution.Concurrent로 설정됩니다. |
워크플로 에이전트 사용
스레드 만들기
워크플로 에이전트와의 각 대화에서 상태를 관리하려면 스레드가 필요합니다.
// Create a new thread for the conversation
AgentThread thread = workflowAgent.GetNewThread();
스트리밍이 아닌 실행
전체 응답을 원하는 간단한 사용 사례의 경우:
var messages = new List<ChatMessage>
{
new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};
AgentRunResponse response = await workflowAgent.RunAsync(messages, thread);
foreach (ChatMessage message in response.Messages)
{
Console.WriteLine($"{message.AuthorName}: {message.Text}");
}
스트리밍 실행
워크플로가 실행됨에 따라 실시간 업데이트의 경우:
var messages = new List<ChatMessage>
{
new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};
await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
// Process streaming updates from each agent in the workflow
if (!string.IsNullOrEmpty(update.Text))
{
Console.Write(update.Text);
}
}
외부 입력 요청 처리
워크플로에 외부 입력(사용 RequestInfoExecutor)을 요청하는 실행기가 포함된 경우 이러한 요청은 에이전트 응답에서 함수 호출로 표시됩니다.
await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
// Check for function call requests
foreach (AIContent content in update.Contents)
{
if (content is FunctionCallContent functionCall)
{
// Handle the external input request
Console.WriteLine($"Workflow requests input: {functionCall.Name}");
Console.WriteLine($"Request data: {functionCall.Arguments}");
// Provide the response in the next message
}
}
}
스레드 직렬화 및 재개
워크플로 에이전트 스레드는 지속성을 위해 직렬화되고 나중에 다시 시작될 수 있습니다.
// Serialize the thread state
JsonElement serializedThread = thread.Serialize();
// Store serializedThread to your persistence layer...
// Later, resume the thread
AgentThread resumedThread = workflowAgent.DeserializeThread(serializedThread);
// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedThread))
{
Console.Write(update.Text);
}
워크플로 에이전트를 사용하여 검사점 지정
검사점을 사용하도록 설정하여 프로세스 다시 시작에서 워크플로 상태를 유지합니다.
// Create a checkpoint manager with your storage backend
var checkpointManager = new CheckpointManager(new FileCheckpointStorage("./checkpoints"));
// Create workflow agent with checkpointing enabled
AIAgent workflowAgent = workflow.AsAgent(
id: "persistent-workflow",
name: "Persistent Workflow Agent",
checkpointManager: checkpointManager
);
요구 사항
워크플로를 에이전트로 사용하려면 워크플로의 시작 실행기가 입력으로 처리 list[ChatMessage] 할 수 있어야 합니다. 이를 ChatAgent나 AgentExecutor를 사용할 때 자동으로 충족합니다.
워크플로 에이전트 만들기
호환되는 워크플로를 호출 as_agent() 하여 에이전트로 변환합니다.
from agent_framework import WorkflowBuilder, ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
# Create your chat client and agents
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
researcher = ChatAgent(
name="Researcher",
instructions="Research and gather information on the given topic.",
chat_client=chat_client,
)
writer = ChatAgent(
name="Writer",
instructions="Write clear, engaging content based on research.",
chat_client=chat_client,
)
# Build your workflow
workflow = (
WorkflowBuilder()
.set_start_executor(researcher)
.add_edge(researcher, writer)
.build()
)
# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")
as_agent 매개 변수
| 매개 변수 | 유형 | Description |
|---|---|---|
name |
str | None |
에이전트의 선택적 표시 이름입니다. 제공되지 않은 경우 자동으로 생성됩니다. |
워크플로 에이전트 사용
스레드 만들기
워크플로 에이전트와의 각 대화에서 상태를 관리하려면 스레드가 필요합니다.
# Create a new thread for the conversation
thread = workflow_agent.get_new_thread()
스트리밍이 아닌 실행
전체 응답을 원하는 간단한 사용 사례의 경우:
from agent_framework import ChatMessage, Role
messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]
response = await workflow_agent.run(messages, thread=thread)
for message in response.messages:
print(f"{message.author_name}: {message.text}")
스트리밍 실행
워크플로가 실행됨에 따라 실시간 업데이트의 경우:
messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]
async for update in workflow_agent.run_stream(messages, thread=thread):
# Process streaming updates from each agent in the workflow
if update.text:
print(update.text, end="", flush=True)
외부 입력 요청 처리
워크플로에 외부 입력(사용 RequestInfoExecutor)을 요청하는 실행기가 포함되어 있으면 이러한 요청이 함수 호출로 표시됩니다. 워크플로 에이전트는 보류 중인 요청을 추적하고 계속하기 전에 응답을 예상합니다.
from agent_framework import (
FunctionCallContent,
FunctionApprovalRequestContent,
FunctionApprovalResponseContent,
)
async for update in workflow_agent.run_stream(messages, thread=thread):
for content in update.contents:
if isinstance(content, FunctionApprovalRequestContent):
# The workflow is requesting external input
request_id = content.id
function_call = content.function_call
print(f"Workflow requests input: {function_call.name}")
print(f"Request data: {function_call.arguments}")
# Store the request_id to provide a response later
# Check for pending requests
if workflow_agent.pending_requests:
print(f"Pending requests: {list(workflow_agent.pending_requests.keys())}")
보류 중인 요청에 대한 응답 제공
외부 입력 요청 후 워크플로 실행을 계속하려면 다음을 수행합니다.
# Create a response for the pending request
response_content = FunctionApprovalResponseContent(
id=request_id,
function_call=function_call,
approved=True,
)
response_message = ChatMessage(
role=Role.USER,
contents=[response_content],
)
# Continue the workflow with the response
async for update in workflow_agent.run_stream([response_message], thread=thread):
if update.text:
print(update.text, end="", flush=True)
완성된 예시
다음은 스트리밍 출력이 있는 워크플로 에이전트를 보여주는 전체 예제입니다.
import asyncio
from agent_framework import (
ChatAgent,
ChatMessage,
Role,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework._workflows import SequentialBuilder
from azure.identity import AzureCliCredential
async def main():
# Set up the chat client
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Create specialized agents
researcher = ChatAgent(
name="Researcher",
instructions="Research the given topic and provide key facts.",
chat_client=chat_client,
)
writer = ChatAgent(
name="Writer",
instructions="Write engaging content based on the research provided.",
chat_client=chat_client,
)
reviewer = ChatAgent(
name="Reviewer",
instructions="Review the content and provide a final polished version.",
chat_client=chat_client,
)
# Build a sequential workflow
workflow = (
SequentialBuilder()
.add_agents([researcher, writer, reviewer])
.build()
)
# Convert to a workflow agent
workflow_agent = workflow.as_agent(name="Content Creation Pipeline")
# Create a thread and run the workflow
thread = workflow_agent.get_new_thread()
messages = [ChatMessage(role=Role.USER, content="Write about quantum computing")]
print("Starting workflow...")
print("=" * 60)
current_author = None
async for update in workflow_agent.run_stream(messages, thread=thread):
# Show when different agents are responding
if update.author_name and update.author_name != current_author:
if current_author:
print("\n" + "-" * 40)
print(f"\n[{update.author_name}]:")
current_author = update.author_name
if update.text:
print(update.text, end="", flush=True)
print("\n" + "=" * 60)
print("Workflow completed!")
if __name__ == "__main__":
asyncio.run(main())
이벤트 변환 이해
워크플로가 에이전트로 실행되면 워크플로 이벤트가 에이전트 응답으로 변환됩니다. 응답 유형은 사용하는 방법에 따라 달라집니다.
-
run(): 워크플로가 완료된 후 전체 결과를 포함하는AgentRunResponse을 반환 -
run_stream()AgentRunResponseUpdate: 워크플로가 실행되면 개체를 생성하여 실시간 업데이트를 제공합니다.
실행하는 동안 내부 워크플로 이벤트는 다음과 같이 에이전트 응답에 매핑됩니다.
| 워크플로 이벤트 | 에이전트 응답 |
|---|---|
AgentRunUpdateEvent |
(스트리밍)으로 AgentRunResponseUpdate 전달되거나 (비 스트리밍)으로 AgentRunResponse 집계됨 |
RequestInfoEvent |
FunctionCallContent 및 FunctionApprovalRequestContent로 변환됨 |
| 기타 이벤트 |
raw_representation 관찰성을 위해 포함됨 |
이 변환을 사용하면 필요할 때 자세한 워크플로 정보에 액세스할 수 있는 동시에 표준 에이전트 인터페이스를 사용할 수 있습니다.
사용 사례
1. 복잡한 에이전트 파이프라인
다중 에이전트 워크플로를 애플리케이션에서 사용할 단일 에이전트로 래핑합니다.
User Request --> [Workflow Agent] --> Final Response
|
+-- Researcher Agent
+-- Writer Agent
+-- Reviewer Agent
2. 에이전트 구성
더 큰 시스템에서 워크플로 에이전트를 구성 요소로 사용합니다.
- 워크플로 에이전트는 다른 에이전트가 도구로 사용할 수 있습니다.
- 여러 워크플로 에이전트를 함께 오케스트레이션할 수 있습니다.
- 워크플로 에이전트는 다른 워크플로 내에 중첩될 수 있습니다.
3. API 통합
표준 에이전트 인터페이스를 예상하는 API를 통해 복잡한 워크플로를 노출하고, 이를 활용할 수 있도록 합니다.
- 정교한 백 엔드 워크플로를 사용하는 채팅 인터페이스
- 기존 에이전트 기반 시스템과 통합
- 간단한 에이전트에서 복잡한 워크플로로 점진적 마이그레이션