다음을 통해 공유


AutoGen에서 Microsoft 에이전트 프레임워크로 마이그레이션 가이드

AutoGen에서 Microsoft Agent Framework Python SDK로 마이그레이션하기 위한 포괄적인 가이드입니다.

목차

배경

AutoGen 은 LLM(대규모 언어 모델)을 사용하여 AI 에이전트 및 다중 에이전트 시스템을 빌드하기 위한 프레임워크입니다. Microsoft Research에서 연구 프로젝트로 시작하여 GroupChat 및 이벤트 기반 에이전트 런타임과 같은 다중 에이전트 오케스트레이션의 여러 개념을 개척했습니다. 이 프로젝트는 오픈 소스 커뮤니티의 유익한 협업이었으며 외부 기여자들로부터 많은 중요한 기능이 제공되었습니다.

Microsoft Agent Framework 는 LLM을 사용하여 AI 에이전트 및 워크플로를 빌드하기 위한 새로운 다국어 SDK입니다. AutoGen에서 개척한 아이디어의 중요한 진화를 나타내며 실제 사용에서 배운 교훈을 통합합니다. Microsoft의 핵심 AutoGen 및 의미 체계 커널 팀에서 개발했으며 앞으로 AI 애플리케이션을 빌드하기 위한 새로운 기반이 되도록 설계되었습니다.

이 가이드에서는 실제 마이그레이션 경로에 대해 설명합니다. 즉, 동일하게 유지되는 내용과 변경 내용을 한눈에 살펴보는 것으로 시작합니다. 그런 다음, 구체적인 코드를 나란히 사용하여 모델 클라이언트 설정, 단일 에이전트 기능 및 마지막으로 다중 에이전트 오케스트레이션을 다룹니다. 이 과정에서 에이전트 프레임워크 리포지토리에서 실행 가능한 샘플에 대한 링크를 통해 각 단계의 유효성을 검사할 수 있습니다.

주요 유사점 및 차이점

동일하게 유지되는 내용

기초는 익숙합니다. 모델 클라이언트를 중심으로 에이전트를 만들고, 지침을 제공하고, 도구를 연결합니다. 두 라이브러리 모두 함수 스타일 도구, 토큰 스트리밍, 다중 모드 콘텐츠 및 비동기 I/O를 지원합니다.

# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")

# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")

주요 차이점

  1. 오케스트레이션 스타일: AutoGen은 이벤트 기반 코어를 상위 수준 Team과 페어링합니다. 에이전트 프레임워크는 입력이 준비되면 에지를 따라 데이터를 라우팅하고 실행기를 활성화하는 형식화된 그래프 기반을 중심으로 Workflow 합니다.

  2. 도구: AutoGen은 함수를 .로 FunctionTool래핑합니다. 에이전트 프레임워크는 @ai_function스키마를 자동으로 유추하고 코드 인터프리터 및 웹 검색과 같은 호스트된 도구를 추가합니다.

  3. 에이전트 동작: AssistantAgent 증가 max_tool_iterations하지 않는 한 단일 턴입니다. ChatAgent 는 기본적으로 다중 턴이며 최종 답변을 반환할 때까지 도구를 계속 호출합니다.

  4. 런타임: AutoGen은 포함된 실험적 분산 런타임을 제공합니다. 에이전트 프레임워크는 현재 단일 프로세스 구성에 중점을 둡니다. 분산 실행이 계획되어 있습니다.

모델 클라이언트 만들기 및 구성

두 프레임워크는 유사하지만 동일하지 않은 API를 사용하여 주요 AI 공급자를 위한 모델 클라이언트를 제공합니다.

특징 AutoGen 에이전트 프레임워크
OpenAI 클라이언트 OpenAIChatCompletionClient OpenAIChatClient
OpenAI 응답 클라이언트 ❌ 사용할 수 없음 OpenAIResponsesClient
Azure OpenAI AzureOpenAIChatCompletionClient AzureOpenAIChatClient
Azure OpenAI 응답 ❌ 사용할 수 없음 AzureOpenAIResponsesClient
Azure AI AzureAIChatCompletionClient AzureAIAgentClient
인위적 AnthropicChatCompletionClient 🚧 계획
Ollama OllamaChatCompletionClient 🚧 계획
Caching ChatCompletionCache 래퍼 🚧 계획

AutoGen 모델 클라이언트

from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient

# OpenAI
client = OpenAIChatCompletionClient(
    model="gpt-5",
    api_key="your-key"
)

# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
    azure_endpoint="https://your-endpoint.openai.azure.com/",
    azure_deployment="gpt-5",
    api_version="2024-12-01",
    api_key="your-key"
)

에이전트 프레임워크 ChatClients

from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient

# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")

# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")

자세한 예제는 다음을 참조하세요.

응답 API 지원(에이전트 프레임워크 전용)

에이전트 프레임워크는 AzureOpenAIResponsesClientOpenAIResponsesClient AutoGen에서 사용할 수 없는 추론 모델 및 구조적 응답에 대한 특수한 지원을 제공합니다.

from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient

# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")

# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")

응답 API 예제는 다음을 참조하세요.

Single-Agent 기능 매핑

이 섹션에서는 AutoGen과 Agent Framework 간에 단일 에이전트 기능을 매핑합니다. 클라이언트가 있는 상태에서 에이전트를 만들고, 도구를 연결하고, 비 스트리밍 및 스트리밍 실행 중에서 선택합니다.

기본 에이전트 만들기 및 실행

모델 클라이언트가 구성되면 다음 단계는 에이전트를 만드는 것입니다. 두 프레임워크 모두 비슷한 에이전트 추상화는 제공하지만 기본 동작과 구성 옵션은 서로 다릅니다.

AutoGen AssistantAgent

from autogen_agentchat.agents import AssistantAgent

agent = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message="You are a helpful assistant.",
    tools=[my_tool],
    max_tool_iterations=1  # Single-turn by default
)

# Execution
result = await agent.run(task="What's the weather?")

에이전트 프레임워크 ChatAgent

from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient

# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

@ai_function
def get_time() -> str:
    """Get current time."""
    return "Current time: 2:30 PM"

# Create client
client = OpenAIChatClient(model_id="gpt-5")

async def example():
    # Direct creation
    agent = ChatAgent(
        name="assistant",
        chat_client=client,
        instructions="You are a helpful assistant.",
        tools=[get_weather]  # Multi-turn by default
    )

    # Factory method (more convenient)
    agent = client.create_agent(
        name="assistant",
        instructions="You are a helpful assistant.",
        tools=[get_weather]
    )

    # Execution with runtime tool configuration
    result = await agent.run(
        "What's the weather?",
        tools=[get_time],  # Can add tools at runtime
        tool_choice="auto"
    )

주요 차이점:

  • 기본 동작: ChatAgent 도구 호출을 통해 자동으로 반복되지만 AssistantAgent 명시적 max_tool_iterations 설정이 필요합니다.
  • 런타임 구성: ChatAgent.run() 호출별 사용자 지정에 대한 수락 toolstool_choice 매개 변수
  • 팩터리 메서드: 에이전트 프레임워크는 채팅 클라이언트에서 직접 편리한 팩터리 메서드를 제공합니다.
  • 상태 관리: ChatAgent 상태 비정상 상태이며 호출 간의 대화 기록을 유지 관리하지 않습니다. 상태의 일부로 대화 기록을 유지 관리하는 것과는 다릅니다 AssistantAgent .

AgentThread를 사용하여 대화 상태 관리

대화를 계속하려면 대화 ChatAgent기록을 관리하는 데 사용합니다 AgentThread .

# Assume we have an agent from previous examples
async def conversation_example():
    # Create a new thread that will be reused
    thread = agent.get_new_thread()

    # First interaction - thread is empty
    result1 = await agent.run("What's 2+2?", thread=thread)
    print(result1.text)  # "4"

    # Continue conversation - thread contains previous messages
    result2 = await agent.run("What about that number times 10?", thread=thread)
    print(result2.text)  # "40" (understands "that number" refers to 4)

    # AgentThread can use external storage, similar to ChatCompletionContext in AutoGen

기본적으로 상태 비지정: 빠른 데모

# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text)  # for example, "4"

r2 = await agent.run("What about that number times 10?")
print(r2.text)  # Likely ambiguous without prior context; cannot be "40"

# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text)  # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text)  # "40"

스레드 관리 예제는 다음을 참조하세요.

OpenAI Assistant 에이전트 동등성

두 프레임워크 모두 OpenAI Assistant API 통합을 제공합니다.

# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient

OpenAI Assistant 예제는 다음을 참조하세요.

스트리밍 지원

두 프레임워크 모두 클라이언트와 에이전트에서 실시간으로 토큰을 스트리밍하여 UI의 응답성을 유지합니다.

자동 생성 스트리밍

# Model client streaming
async for chunk in client.create_stream(messages):
    if isinstance(chunk, str):
        print(chunk, end="")

# Agent streaming
async for event in agent.run_stream(task="Hello"):
    if isinstance(event, ModelClientStreamingChunkEvent):
        print(event.content, end="")
    elif isinstance(event, TaskResult):
        print("Final result received")

에이전트 프레임워크 스트리밍

# Assume we have client, agent, and tools from previous examples
async def streaming_example():
    # Chat client streaming
    async for chunk in client.get_streaming_response("Hello", tools=tools):
        if chunk.text:
            print(chunk.text, end="")

    # Agent streaming
    async for chunk in agent.run_stream("Hello"):
        if chunk.text:
            print(chunk.text, end="", flush=True)

팁: 에이전트 프레임워크에서 클라이언트와 에이전트는 모두 동일한 업데이트 모양을 생성합니다. 두 경우 모두 읽을 chunk.text 수 있습니다.

메시지 형식 및 만들기

효과적인 에이전트 통신에는 메시지 작동 방식을 이해하는 것이 중요합니다. 두 프레임워크는 별도의 메시지 클래스를 사용하는 AutoGen과 통합 메시지 시스템을 사용하는 에이전트 프레임워크를 사용하여 메시지 생성 및 처리에 대해 서로 다른 접근 방식을 제공합니다.

자동 생성 메시지 유형

from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage

# Text message
text_msg = TextMessage(content="Hello", source="user")

# Multi-modal message
multi_modal_msg = MultiModalMessage(
    content=["Describe this image", image_data],
    source="user"
)

# Convert to model format for use with model clients
user_message = text_msg.to_model_message()

에이전트 프레임워크 메시지 형식

from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64

# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")

# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"

# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
    role=Role.USER,
    contents=[
        TextContent(text="Describe this image"),
        DataContent(uri=image_uri, media_type="image/jpeg")
    ]
)

주요 차이점:

  • AutoGen은 필드와 함께 별도의 메시지 클래스(TextMessage, MultiModalMessage)를 source 사용합니다.
  • 에이전트 프레임워크는 형식화된 콘텐츠 개체 및 필드와 통합 ChatMessage 된 개체를 role 사용합니다.
  • 에이전트 프레임워크 메시지는 문자열 원본 대신 열거형(USER, ASSISTANT, SYSTEM, TOOL)을 사용합니다 Role .

도구 만들기 및 통합

도구는 텍스트 생성을 넘어 에이전트 기능을 확장합니다. 프레임워크는 더 자동화된 스키마 생성을 제공하는 에이전트 프레임워크를 사용하여 도구 만들기에 다양한 접근 방식을 사용합니다.

AutoGen FunctionTool

from autogen_core.tools import FunctionTool

async def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Manual tool creation
tool = FunctionTool(
    func=get_weather,
    description="Get weather information"
)

# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])

에이전트 프레임워크 @ai_function

from agent_framework import ai_function
from typing import Annotated
from pydantic import Field

@ai_function
def get_weather(
    location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])

자세한 예제는 다음을 참조하세요.

호스트된 도구(에이전트 프레임워크 전용)

에이전트 프레임워크는 AutoGen에서 사용할 수 없는 호스트된 도구를 제공합니다.

from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient

# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")

# Code execution tool
code_tool = HostedCodeInterpreterTool()

# Web search tool
search_tool = HostedWebSearchTool()

agent = ChatAgent(
    name="researcher",
    chat_client=client,
    tools=[code_tool, search_tool]
)

자세한 예제는 다음을 참조하세요.

요구 사항 및 주의 사항:

  • 호스트된 도구는 지원되는 모델/계정에서만 사용할 수 있습니다. 이러한 도구를 사용하도록 설정하기 전에 공급자에 대한 자격 및 모델 지원을 확인합니다.
  • 구성은 공급자에 따라 다릅니다. 설정 및 사용 권한에 대한 각 샘플의 필수 구성 요소를 따릅니다.
  • 모든 모델이 모든 호스트된 도구(예: 웹 검색 및 코드 인터프리터)를 지원하는 것은 아닙니다. 사용자 환경에서 호환되는 모델을 선택합니다.

비고

AutoGen은 로컬 코드 실행 도구를 지원하지만 이 기능은 향후 Agent Framework 버전에 대해 계획되어 있습니다.

주요 차이점: 에이전트 프레임워크는 에이전트 수준에서 자동으로 도구 반복을 처리합니다. AutoGen의 max_tool_iterations 매개 변수와 달리 에이전트 프레임워크 에이전트는 무한 루프를 방지하기 위한 기본 제공 안전 메커니즘을 사용하여 기본적으로 완료될 때까지 도구 실행을 계속합니다.

MCP 서버 지원

고급 도구 통합의 경우 두 프레임워크는 MCP(모델 컨텍스트 프로토콜)를 지원하여 에이전트가 외부 서비스 및 데이터 원본과 상호 작용할 수 있도록 합니다. 에이전트 프레임워크는 보다 포괄적인 기본 제공 지원을 제공합니다.

AutoGen MCP 지원

AutoGen에는 확장을 통한 기본 MCP 지원이 있습니다(특정 구현 세부 정보는 버전에 따라 다름).

에이전트 프레임워크 MCP 지원

from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

# Stdio MCP server
mcp_tool = MCPStdioTool(
    name="filesystem",
    command="uvx mcp-server-filesystem",
    args=["/allowed/directory"]
)

# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
    name="http_mcp",
    url="http://localhost:8000/sse"
)

# WebSocket MCP
ws_mcp = MCPWebsocketTool(
    name="websocket_mcp",
    url="ws://localhost:8000/ws"
)

agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])

MCP 예제는 다음을 참조하세요.

에이전트-도구 패턴

한 가지 강력한 패턴은 에이전트 자체를 도구로 사용하여 계층적 에이전트 아키텍처를 사용하도록 설정하는 것입니다. 두 프레임워크는 서로 다른 구현으로 이 패턴을 지원합니다.

AutoGen AgentTool

from autogen_agentchat.tools import AgentTool

# Create specialized agent
writer = AssistantAgent(
    name="writer",
    model_client=client,
    system_message="You are a creative writer."
)

# Wrap as tool
writer_tool = AgentTool(agent=writer)

# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
    model="gpt-5",
    parallel_tool_calls=False
)
coordinator = AssistantAgent(
    name="coordinator",
    model_client=coordinator_client,
    tools=[writer_tool]
)

Agent Framework as_tool()

from agent_framework import ChatAgent

# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
    name="writer",
    chat_client=client,
    instructions="You are a creative writer."
)

# Convert to tool
writer_tool = writer.as_tool(
    name="creative_writer",
    description="Generate creative content",
    arg_name="request",
    arg_description="What to write"
)

# Use in coordinator
coordinator = ChatAgent(
    name="coordinator",
    chat_client=client,
    tools=[writer_tool]
)

명시적 마이그레이션 참고 사항: AutoGen에서 동일한 에이전트 인스턴스를 호출할 때 동시성 문제를 방지하기 위해 에이전트를 도구로 래핑할 때 코디네이터의 모델 클라이언트에서 설정합니다 parallel_tool_calls=False . 에이전트 프레임워크 as_tool() 에서는 에이전트가 기본적으로 상태 비저장이므로 병렬 도구 호출을 사용하지 않도록 설정하지 않아도 됩니다.

미들웨어(에이전트 프레임워크 기능)

에이전트 프레임워크는 AutoGen에 부족한 미들웨어 기능을 도입합니다. 미들웨어를 사용하면 로깅, 보안 및 성능 모니터링과 같은 강력한 교차 절삭 문제를 수행할 수 있습니다.

from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable

# Assume we have client from previous examples
async def logging_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
    print(f"Agent {context.agent.name} starting")
    await next(context)
    print(f"Agent {context.agent.name} completed")

async def security_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
    if "password" in str(context.arguments):
        print("Blocking function call with sensitive data")
        return  # Don't call next()
    await next(context)

agent = ChatAgent(
    name="secure_agent",
    chat_client=client,
    middleware=[logging_middleware, security_middleware]
)

이점:

  • 보안: 입력 유효성 검사 및 콘텐츠 필터링
  • 관찰 가능성: 로깅, 메트릭 및 추적
  • 성능: 캐싱 및 속도 제한
  • 오류 처리: 정상 성능 저하 및 재시도 논리

자세한 미들웨어 예제는 다음을 참조하세요.

사용자 지정 에이전트

모델 지원 에이전트를 전혀 원하지 않는 경우가 있습니다. 사용자 지정 논리를 사용하여 결정적 또는 API 지원 에이전트를 원할 수 있습니다. 두 프레임워크 모두 사용자 지정 에이전트 빌드를 지원하지만 패턴은 다릅니다.

AutoGen: Subclass BaseChatAgent

from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken

class StaticAgent(BaseChatAgent):
    def __init__(self, name: str = "static", description: str = "Static responder") -> None:
        super().__init__(name, description)

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:  # Which message types this agent produces
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # Always return a static response
        return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))

Notes:

  • 채팅 메시지를 사용하여 on_messages(...) 구현 Response 하고 반환합니다.
  • 선택적으로 실행 간에 내부 상태를 지우도록 구현 on_reset(...) 합니다.

에이전트 프레임워크: BaseAgent 확장(스레드 인식)

from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
    AgentRunResponse,
    AgentRunResponseUpdate,
    AgentThread,
    BaseAgent,
    ChatMessage,
    Role,
    TextContent,
)

class StaticAgent(BaseAgent):
    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        # Build a static reply
        reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])

        # Persist conversation to the provided AgentThread (if any)
        if thread is not None:
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

        return AgentRunResponse(messages=[reply])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        # Stream the same static response in a single chunk for simplicity
        yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)

        # Notify thread of input and the complete response once streaming ends
        if thread is not None:
            reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

Notes:

  • AgentThread 는 대화 상태를 외부적으로 유지 관리합니다. 사용 agent.get_new_thread() 하 고 전달 합니다 run/run_stream.
  • 스레드가 교환의 양면을 가지도록 호출 self._notify_thread_of_new_messages(thread, input_messages, response_messages) 합니다.
  • 전체 샘플 참조: 사용자 지정 에이전트

다음으로, 프레임워크가 가장 다른 영역인 다중 에이전트 오케스트레이션을 살펴보겠습니다.

다중 에이전트 기능 매핑

프로그래밍 모델 개요

다중 에이전트 프로그래밍 모델은 두 프레임워크 간의 가장 중요한 차이점을 나타냅니다.

AutoGen의 이중 모델 접근 방식

AutoGen은 다음 두 가지 프로그래밍 모델을 제공합니다.

  1. autogen-core: 낮은 수준의 이벤트 기반 프로그래밍 및 RoutedAgent 메시지 구독
  2. Team 추상화: 상위 수준 실행 중심 모델 autogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
        # Handle specific message types
        pass

# High-level Team (easier but limited)
team = RoundRobinGroupChat(
    participants=[agent1, agent2],
    termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")

도전:

  • 하위 수준 모델은 대부분의 사용자에게 너무 복잡합니다.
  • 상위 수준 모델은 복잡한 동작에 대해 제한될 수 있습니다.
  • 두 모델 간의 브리징은 구현 복잡성을 더합니다.

에이전트 프레임워크의 통합 워크플로 모델

에이전트 프레임워크는 두 방법 중 가장 좋은 방법을 결합하는 단일 Workflow 추상화 기능을 제공합니다.

from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
    response = await agent1.run(input_msg)
    await ctx.send_message(response.text)

@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
    response = await agent2.run(input_msg)
    await ctx.yield_output(response.text)  # Final output

# Build typed data flow graph
workflow = (WorkflowBuilder()
           .add_edge(agent1_executor, agent2_executor)
           .set_start_executor(agent1_executor)
           .build())

# Example usage (would be in async context)
# result = await workflow.run("Initial input")

자세한 워크플로 예제는 다음을 참조하세요.

이점:

  • 통합 모델: 모든 복잡성 수준에 대한 단일 추상화
  • 형식 안전성: 강력한 형식의 입력 및 출력
  • 그래프 시각화: 데이터 흐름 표현 지우기
  • 유연한 컴퍼지션: 에이전트, 함수 및 하위 워크플로 혼합

워크플로 및 GraphFlow

에이전트 프레임워크의 Workflow 추상화는 AutoGen의 실험적 GraphFlow 기능에서 영감을 받았지만 디자인 철학의 상당한 발전을 나타냅니다.

  • GraphFlow: 에지가 전환되고 메시지가 모든 에이전트에 브로드캐스트되는 제어 흐름 전환은 브로드캐스트된 메시지 콘텐츠에서 조건화됩니다.
  • 워크플로: 메시지가 특정 에지를 통해 라우팅되고 실행기가 에지에 의해 활성화되는 데이터 흐름 기반이며, 동시 실행을 지원합니다.

시각적 개체 개요

아래 다이어그램은 AutoGen의 제어 흐름 GraphFlow(왼쪽)와 에이전트 프레임워크의 데이터 흐름 워크플로(오른쪽)를 대조합니다. GraphFlow는 에이전트를 조건부 전환 및 브로드캐스트가 있는 노드로 모델화합니다. 형식화된 에지에 의해 연결된 워크플로 모델 실행기(에이전트, 함수 또는 하위 워크플로) 또한 요청/응답 일시 중지 및 검사점을 지원합니다.

flowchart LR

  subgraph AutoGenGraphFlow
    direction TB
    U[User / Task] --> A[Agent A]
    A -->|success| B[Agent B]
    A -->|retry| C[Agent C]
    A -. broadcast .- B
    A -. broadcast .- C
  end

  subgraph AgentFrameworkWorkflow
    direction TB
    I[Input] --> E1[Executor 1]
    E1 -->|"str"| E2[Executor 2]
    E1 -->|"image"| E3[Executor 3]
    E3 -->|"str"| E2
    E2 --> OUT[(Final Output)]
  end

  R[Request / Response Gate]
  E2 -. request .-> R
  R -. resume .-> E2

  CP[Checkpoint]
  E1 -. save .-> CP
  CP -. load .-> E1

실제로 다음을 수행합니다.

  • GraphFlow는 에이전트를 노드로 사용하고 메시지를 브로드캐스트합니다. 에지는 조건부 전환을 나타냅니다.
  • 워크플로는 입력된 메시지를 가장자리를 따라 라우팅합니다. 노드(실행기)는 에이전트, 순수 함수 또는 하위 워크플로일 수 있습니다.
  • 요청/응답을 사용하면 외부 입력에 대해 워크플로를 일시 중지할 수 있습니다. 검사점은 진행 상태를 유지하며 다시 시작을 사용하도록 설정합니다.

코드 비교

1) 순차 + 조건부
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow

writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)

graph = (
    DiGraphBuilder()
    .add_node(writer).add_node(reviewer).add_node(editor)
    .add_edge(writer, reviewer)  # always
    .add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
    .set_entry_point(writer)
).build()

team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"Draft: {task}")

@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
    decision = "approve" if "solar" in draft.lower() else "revise"
    await ctx.send_message(f"{decision}:{draft}")

@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    if msg.startswith("approve:"):
        await ctx.yield_output(msg.split(":", 1)[1])
    else:
        await ctx.yield_output("Needs revision")

workflow_seq = (
    WorkflowBuilder()
    .add_edge(writer_exec, reviewer_exec)
    .add_edge(reviewer_exec, editor_exec)
    .set_start_executor(writer_exec)
    .build()
)
2) 팬아웃 + 조인(ALL vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d

# ALL (default): D runs after both B and C
g_all = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D).add_edge(C, D)
    .set_entry_point(A)
).build()

# ANY: D runs when either B or C completes
g_any = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D, activation_group="join_d", activation_condition="any")
    .add_edge(C, D, activation_group="join_d", activation_condition="any")
    .set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B:{task}", target_id="B")
    await ctx.send_message(f"C:{task}", target_id="C")

@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B_done:{text}")

@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"C_done:{text}")

@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"First: {msg}")  # ANY join (first arrival)

@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
    state = await ctx.get_executor_state() or {"items": []}
    state["items"].append(msg)
    await ctx.set_executor_state(state)
    if len(state["items"]) >= 2:
        await ctx.yield_output(" | ".join(state["items"]))  # ALL join

wf_any = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_any).add_edge(branch_c, join_any)
    .set_start_executor(start)
    .build()
)

wf_all = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_all).add_edge(branch_c, join_all)
    .set_start_executor(start)
    .build()
)
3) 대상 라우팅(브로드캐스트 없음)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
    # Route selectively using target_id
    if task.startswith("image:"):
        await ctx.send_message(task.removeprefix("image:"), target_id="vision")
    else:
        await ctx.send_message(task, target_id="writer")

@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Draft: {text}")

@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Caption: {image_ref}")

workflow = (
    WorkflowBuilder()
    .add_edge(ingest, write)
    .add_edge(ingest, caption)
    .set_start_executor(ingest)
    .build()
)

# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")

주의해야 할 사항:

  • GraphFlow는 메시지를 브로드캐스트하고 조건부 전환을 사용합니다. 조인 동작은 대상 쪽 activation 및 에지activation_group/activation_condition당(예: 두 가장자리를 함께 그룹화)을 통해 구성됩니다.join_dactivation_condition="any"
  • 워크플로는 데이터를 명시적으로 라우팅합니다. 를 사용하여 target_id 다운스트림 실행기를 선택합니다. 조인 동작은 수신 실행기(예: 첫 번째 입력 시 수율 및 모두 대기) 또는 오케스트레이션 작성기/집계를 통해 상주합니다.
  • 워크플로의 실행기는 자유 형식입니다. 즉, 래핑, ChatAgent함수 또는 하위 워크플로를 동일한 그래프 내에서 혼합합니다.

주요 차이점

아래 표에는 AutoGen의 GraphFlow와 에이전트 프레임워크 워크플로 간의 기본적인 차이점이 요약되어 있습니다.

측면 AutoGen GraphFlow 에이전트 프레임워크 워크플로
흐름 유형 제어 흐름(에지는 전환임) 데이터 흐름(에지 경로 메시지)
노드 형식 에이전트만 에이전트, 함수, 하위 워크플로
활성화 메시지 브로드캐스트 에지 기반 활성화
형식 안전성 한정 전체에서 강력한 입력
작성 가능성 한정 고도로 구성 가능

중첩 패턴

AutoGen 팀 중첩

# Inner team
inner_team = RoundRobinGroupChat(
    participants=[specialist1, specialist2],
    termination_condition=StopAfterNMessages(3)
)

# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
    participants=[coordinator, inner_team, reviewer],  # Team as participant
    termination_condition=StopAfterNMessages(10)
)

# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")

AutoGen 중첩 특성:

  • 중첩 팀은 외부 팀에서 모든 메시지를 받습니다.
  • 중첩된 팀 메시지는 모든 외부 팀 참가자에게 브로드캐스트됩니다.
  • 모든 수준의 공유 메시지 컨텍스트

에이전트 프레임워크 워크플로 중첩

from agent_framework import WorkflowExecutor, WorkflowBuilder

# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor

# Create sub-workflow
sub_workflow = (WorkflowBuilder()
               .add_edge(specialist1_executor, specialist2_executor)
               .set_start_executor(specialist1_executor)
               .build())

# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
    workflow=sub_workflow,
    id="sub_process"
)

# Use in parent workflow
parent_workflow = (WorkflowBuilder()
                  .add_edge(coordinator_executor, sub_workflow_executor)
                  .add_edge(sub_workflow_executor, reviewer_executor)
                  .set_start_executor(coordinator_executor)
                  .build())

에이전트 프레임워크 중첩 특성:

  • 를 통한 격리된 입력/출력 WorkflowExecutor
  • 메시지 브로드캐스팅 없음 - 특정 연결을 통한 데이터 흐름
  • 각 워크플로 수준에 대한 독립 상태 관리

그룹 채팅 패턴

그룹 채팅 패턴을 사용하면 여러 에이전트가 복잡한 작업에 대해 공동 작업할 수 있습니다. 일반적인 패턴이 프레임워크 간에 변환되는 방법은 다음과 같습니다.

RoundRobinGroupChat 패턴

AutoGen 구현:

from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages

team = RoundRobinGroupChat(
    participants=[agent1, agent2, agent3],
    termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")

에이전트 프레임워크 구현:

from agent_framework import SequentialBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()

# Example usage (would be in async context)
async def sequential_example():
    # Each agent appends to shared conversation
    async for event in workflow.run_stream("Discuss this topic"):
        if isinstance(event, WorkflowOutputEvent):
            conversation_history = event.data  # list[ChatMessage]

자세한 오케스트레이션 예제는 다음을 참조하세요.

에이전트 프레임워크는 동시 실행 패턴의 경우 다음을 제공합니다.

from agent_framework import ConcurrentBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
           .participants([agent1, agent2, agent3])
           .build())

# Example usage (would be in async context)
async def concurrent_example():
    # All agents process the input concurrently
    async for event in workflow.run_stream("Process this in parallel"):
        if isinstance(event, WorkflowOutputEvent):
            results = event.data  # Combined results from all agents

동시 실행 예제는 다음을 참조하세요.

MagenticOneGroupChat 패턴

AutoGen 구현:

from autogen_agentchat.teams import MagenticOneGroupChat

team = MagenticOneGroupChat(
    participants=[researcher, coder, executor],
    model_client=coordinator_client,
    termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")

에이전트 프레임워크 구현:

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    ChatMessage,
    MagenticBuilder,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create a manager agent for orchestration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator that coordinates the workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    chat_client=OpenAIChatClient(),
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher, coder=coder)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=20,
        max_stall_count=3,
        max_reset_count=2,
    )
    .build()
)

# Example usage (would be in async context)
async def magentic_example():
    output: str | None = None
    async for event in workflow.run_stream("Complex research task"):
        if isinstance(event, AgentRunUpdateEvent):
            props = event.data.additional_properties if event.data else None
            event_type = props.get("magentic_event_type") if props else None

            if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
                text = event.data.text if event.data else ""
                print(f"[ORCHESTRATOR]: {text}")
            elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
                agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
                if event.data and event.data.text:
                    print(f"[{agent_id}]: {event.data.text}", end="")

        elif isinstance(event, WorkflowOutputEvent):
            output_messages = cast(list[ChatMessage], event.data)
            if output_messages:
                output = output_messages[-1].text

에이전트 프레임워크 사용자 지정 옵션:

Magentic 워크플로는 광범위한 사용자 지정 옵션을 제공합니다.

  • 관리자 구성: 사용자 지정 지침 및 모델 설정과 함께 ChatAgent 사용
  • 라운드 제한: max_round_count, , max_stall_countmax_reset_count
  • 이벤트 스트리밍: 메타데이터와 함께 magentic_event_type 사용 AgentRunUpdateEvent
  • 에이전트 전문화: 에이전트당 사용자 지정 지침 및 도구
  • 휴먼 인 더 루프: 계획 검토, 도구 승인 및 중단 개입
# Advanced customization example with human-in-the-loop
from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    MagenticBuilder,
    MagenticHumanInterventionDecision,
    MagenticHumanInterventionKind,
    MagenticHumanInterventionReply,
    MagenticHumanInterventionRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create manager agent with custom configuration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator for complex tasks",
    instructions="Custom orchestration instructions...",
    chat_client=OpenAIChatClient(model_id="gpt-4o"),
)

workflow = (
    MagenticBuilder()
    .participants(
        researcher=researcher_agent,
        coder=coder_agent,
        analyst=analyst_agent,
    )
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=15,      # Limit total rounds
        max_stall_count=2,       # Trigger stall handling
        max_reset_count=1,       # Allow one reset on failure
    )
    .with_plan_review()           # Enable human plan review
    .with_human_input_on_stall()  # Enable human intervention on stalls
    .build()
)

# Handle human intervention requests during execution
async for event in workflow.run_stream("Complex task"):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)
        if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
            # Review and approve the plan
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.APPROVE
            )
            async for ev in workflow.send_responses_streaming({event.request_id: reply}):
                pass  # Handle continuation

자세한 Magentic 예제는 다음을 참조하세요.

미래 패턴

에이전트 프레임워크 로드맵에는 현재 개발 중인 몇 가지 AutoGen 패턴이 포함되어 있습니다.

  • Swarm 패턴: 핸드오프 기반 에이전트 조정
  • SelectorGroupChat: LLM 기반 스피커 선택

요청 응답을 사용하여 휴먼 인 더 루프

에이전트 프레임워크의 Workflow 새로운 주요 기능은 요청 및 응답의 개념으로, 워크플로가 실행을 일시 중지하고 계속하기 전에 외부 입력을 기다릴 수 있도록 합니다. 이 기능은 AutoGen의 Team 추상화에 존재하지 않으며 정교한 휴먼 인 더 루프 패턴을 가능하게 합니다.

자동 생성 제한 사항

AutoGen의 Team 추상화는 시작되면 지속적으로 실행되며 사용자 입력에 대한 실행을 일시 중지하는 기본 제공 메커니즘을 제공하지 않습니다. 모든 휴먼 인 더 루프 기능을 사용하려면 프레임워크 외부의 사용자 지정 구현이 필요합니다.

에이전트 프레임워크 Request-Response API

Agent Framework는 모든 실행기가 데코레이터를 사용하여 ctx.request_info() 요청을 보내고 응답을 처리할 수 있는 기본 제공 요청 응답 @response_handler 기능을 제공합니다.

from agent_framework import (
    RequestInfoEvent, WorkflowBuilder, WorkflowContext, 
    Executor, handler, response_handler
)
from dataclasses import dataclass

# Assume we have agent_executor defined elsewhere

# Define typed request payload
@dataclass
class ApprovalRequest:
    """Request human approval for agent output."""
    content: str = ""
    agent_name: str = ""

# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
    
    @handler
    async def review_content(
        self,
        agent_response: str,
        ctx: WorkflowContext
    ) -> None:
        # Request human input with structured data
        approval_request = ApprovalRequest(
            content=agent_response,
            agent_name="writer_agent"
        )
        await ctx.request_info(request_data=approval_request, response_type=str)
    
    @response_handler
    async def handle_approval_response(
        self,
        original_request: ApprovalRequest,
        decision: str,
        ctx: WorkflowContext
    ) -> None:
        decision_lower = decision.strip().lower()
        original_content = original_request.content

        if decision_lower == "approved":
            await ctx.yield_output(f"APPROVED: {original_content}")
        else:
            await ctx.yield_output(f"REVISION NEEDED: {decision}")

# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")

workflow = (WorkflowBuilder()
           .add_edge(agent_executor, reviewer)
           .set_start_executor(agent_executor)
           .build())

휴먼 인 더 루프 워크플로 실행

에이전트 프레임워크는 일시 중지-다시 시작 주기를 처리하는 스트리밍 API를 제공합니다.

from agent_framework import RequestInfoEvent, WorkflowOutputEvent

# Assume we have workflow defined from previous examples
async def run_with_human_input():
    pending_responses = None
    completed = False

    while not completed:
        # First iteration uses run_stream, subsequent use send_responses_streaming
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("initial input")
        )

        events = [event async for event in stream]
        pending_responses = None

        # Collect human requests and outputs
        for event in events:
            if isinstance(event, RequestInfoEvent):
                # Display request to human and collect response
                request_data = event.data  # ApprovalRequest instance
                print(f"Review needed: {request_data.content}")

                human_response = input("Enter 'approved' or revision notes: ")
                pending_responses = {event.request_id: human_response}

            elif isinstance(event, WorkflowOutputEvent):
                print(f"Final result: {event.data}")
                completed = True

휴먼 인 더 루프 워크플로 예제는 다음을 참조하세요.

워크플로 검사점 지정 및 재개

AutoGen의 Workflow 추상화에 비해 에이전트 프레임워크의 Team 또 다른 주요 이점은 검사점 지정 및 실행 재개에 대한 기본 제공 지원입니다. 이렇게 하면 나중에 검사점에서 워크플로를 일시 중지, 유지 및 다시 시작하여 내결함성을 제공하고 장기 실행 또는 비동기 워크플로를 사용할 수 있습니다.

자동 생성 제한 사항

AutoGen의 Team 추상화는 기본 제공 검사점 기능을 제공하지 않습니다. 지속성 또는 복구 메커니즘은 외부에서 구현되어야 하며, 종종 복잡한 상태 관리 및 serialization 논리가 필요한 경우가 많습니다.

에이전트 프레임워크 검사점

에이전트 프레임워크는 전체 검사점 FileCheckpointStorage 및 메서드를 with_checkpointing()WorkflowBuilder제공합니다. 검사점 캡처:

  • 실행기 상태: 다음을 사용하는 각 실행자에 대한 로컬 상태 ctx.set_executor_state()
  • 공유 상태: 다음을 사용하여 실행기 간 상태 ctx.set_shared_state()
  • 메시지 큐: 실행기 간에 보류 중인 메시지
  • 워크플로 위치: 현재 실행 진행률 및 다음 단계
from agent_framework import (
    FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
    Executor, handler
)
from typing_extensions import Never

class ProcessingExecutor(Executor):
    @handler
    async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
        # Process the data
        result = f"Processed: {data.upper()}"
        print(f"Processing: '{data}' -> '{result}'")

        # Persist executor-local state
        prev_state = await ctx.get_executor_state() or {}
        count = prev_state.get("count", 0) + 1
        await ctx.set_executor_state({
            "count": count,
            "last_input": data,
            "last_output": result
        })

        # Persist shared state for other executors
        await ctx.set_shared_state("original_input", data)
        await ctx.set_shared_state("processed_output", result)

        await ctx.send_message(result)

class FinalizeExecutor(Executor):
    @handler
    async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
        result = f"Final: {data}"
        await ctx.yield_output(result)

# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")

# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
           .add_edge(processing_executor, finalize_executor)
           .set_start_executor(processing_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
           .build())

# Example usage (would be in async context)
async def checkpoint_example():
    # Run workflow - checkpoints are created automatically
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")

검사점에서 다시 시작하기

에이전트 프레임워크는 특정 검사점에서 나열, 검사 및 다시 시작하는 API를 제공합니다.

from typing_extensions import Never

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    WorkflowContext,
    WorkflowBuilder,
    get_checkpoint_summary,
    handler,
)

class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        result = text.upper()
        await ctx.send_message(result)

class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
        result = text[::-1]
        await ctx.yield_output(result)

def create_workflow(checkpoint_storage: FileCheckpointStorage):
    """Create a workflow with two executors and checkpointing."""
    upper_executor = UpperCaseExecutor(id="upper")
    reverse_executor = ReverseExecutor(id="reverse")

    return (WorkflowBuilder()
           .add_edge(upper_executor, reverse_executor)
           .set_start_executor(upper_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)
           .build())

# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

async def checkpoint_resume_example():
    # List available checkpoints
    checkpoints = await checkpoint_storage.list_checkpoints()

    # Display checkpoint information
    for checkpoint in checkpoints:
        summary = get_checkpoint_summary(checkpoint)
        print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")

    # Resume from a specific checkpoint
    if checkpoints:
        chosen_checkpoint_id = checkpoints[0].checkpoint_id

        # Create new workflow instance and resume
        new_workflow = create_workflow(checkpoint_storage)
        async for event in new_workflow.run_stream(
            checkpoint_id=chosen_checkpoint_id,
            checkpoint_storage=checkpoint_storage
        ):
            print(f"Resumed event: {event}")

고급 검사점 기능

휴먼 인 더 루프 통합을 사용하는 검사점:

검사점은 휴먼 인 더 루프 워크플로와 원활하게 작동하므로 사용자 입력에 대해 워크플로를 일시 중지하고 나중에 다시 시작합니다. 보류 중인 요청이 포함된 검사점에서 다시 시작하면 해당 요청이 이벤트로 다시 내보내됩니다.

# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
    # Resume from checkpoint - pending requests will be re-emitted
    request_info_events = []
    async for event in workflow.run_stream(
        checkpoint_id=checkpoint_id,
        checkpoint_storage=checkpoint_storage
    ):
        if isinstance(event, RequestInfoEvent):
            request_info_events.append(event)

    # Handle re-emitted pending request
    responses = {}
    for event in request_info_events:
        response = handle_request(event.data)
        responses[event.request_id] = response

    # Send response back to workflow
    async for event in workflow.send_responses_streaming(responses):
        print(f"Event: {event}")

주요 이점

AutoGen에 비해 에이전트 프레임워크의 검사점은 다음을 제공합니다.

  • 자동 지속성: 수동 상태 관리가 필요하지 않음
  • 세분화된 복구: 모든 슈퍼스텝 경계에서 다시 시작
  • 상태 격리: 별도의 실행기-로컬 및 공유 상태
  • 휴먼 인 더 루프 통합: 사용자 입력을 사용하여 원활한 일시 중지-다시 시작
  • 내결함성: 오류 또는 중단으로부터 강력한 복구

실제 예제

포괄적인 검사점 예제는 다음을 참조하세요.


Observability

AutoGen 및 에이전트 프레임워크는 모두 관찰 기능을 제공하지만 다양한 접근 방식과 기능을 제공합니다.

AutoGen 관찰 가능성

AutoGen은 다음을 위한 계측을 사용하여 OpenTelemetry 에 대한 기본 지원을 제공합니다.

  • 런타임 추적: SingleThreadedAgentRuntimeGrpcWorkerAgentRuntime
  • 도구 실행: BaseToolexecute_tool GenAI 의미 체계 규칙에 따른 범위 사용
  • 에이전트 작업: BaseChatAgentcreate_agentinvoke_agent 범위
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime

# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)

# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)

에이전트 프레임워크 관찰 가능성

에이전트 프레임워크는 다음과 같은 여러 방법을 통해 포괄적인 관찰 가능성을 제공합니다.

  • 제로 코드 설정: 환경 변수를 통한 자동 계측
  • 수동 구성: 사용자 지정 매개 변수를 사용하여 프로그래밍 방식 설정
  • 다양한 원격 분석: 에이전트, 워크플로 및 도구 실행 추적
  • 콘솔 출력: 기본 제공 콘솔 로깅 및 시각화
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient

# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317

# Or manual setup
setup_observability(
    otlp_endpoint="http://localhost:4317"
)

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

async def observability_example():
    # Observability is automatically applied to all agents and workflows
    agent = ChatAgent(name="assistant", chat_client=client)
    result = await agent.run("Hello")  # Automatically traced

주요 차이점:

  • 설치 복잡성: 에이전트 프레임워크는 더 간단한 제로 코드 설정 옵션을 제공합니다.
  • 범위: 에이전트 프레임워크는 워크플로 수준 관찰 가능성을 포함하여 더 광범위한 범위를 제공합니다.
  • 시각화: 에이전트 프레임워크에는 기본 제공 콘솔 출력 및 개발 UI가 포함됩니다.
  • 구성: 에이전트 프레임워크는 보다 유연한 구성 옵션을 제공합니다.

자세한 관찰 가능성 예제는 다음을 참조하세요.


결론

이 마이그레이션 가이드에서는 기본 에이전트 만들기부터 복잡한 다중 에이전트 워크플로에 이르기까지 모든 것을 다루는 AutoGen과 Microsoft Agent Framework 간의 포괄적인 매핑을 제공합니다. 마이그레이션에 대한 주요 내용:

  • 에이전트 프레임워크에서 유사한 API 및 향상된 기능을 사용하여 단일 에이전트 마이그레이션은 간단합니다.
  • 다중 에이전트 패턴은 이벤트 기반에서 데이터 흐름 기반 아키텍처로의 접근 방식을 재고해야 하지만 GraphFlow에 이미 익숙한 경우 전환이 더 쉬워질 것입니다.
  • 에이전트 프레임워크는 미들웨어, 호스트된 도구 및 형식화된 워크플로와 같은 추가 기능을 제공합니다.

추가 예제 및 자세한 구현 지침은 Agent Framework 샘플 디렉터리를 참조하세요.

추가 샘플 범주

에이전트 프레임워크는 다른 여러 중요한 영역에 샘플을 제공합니다.

다음 단계