Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Una guía completa para migrar de AutoGen al SDK de Python de Microsoft Agent Framework.
Tabla de contenido
- Antecedentes
- Diferencias y similitudes clave
- Creación y configuración de cliente de modelos
-
Asignación de características deSingle-Agent
- Creación y ejecución básicas del agente
- Administración del estado de conversación con AgentThread
- Equivalencia del agente de OpenAI Assistant
- Compatibilidad con streaming
- Tipos de mensaje y creación
- Creación e integración de herramientas
- Herramientas hospedadas (exclusivo de Agent Framework)
- Compatibilidad con el servidor MCP
- Patrón de agente como herramienta
- Middleware (característica del marco de agente)
- Agentes personalizados
- Asignación de características multiagente
- Observabilidad
- Conclusión
Contexto
AutoGen es un marco para crear agentes de INTELIGENCIA ARTIFICIAL y sistemas multiagente mediante modelos de lenguaje grande (LLM). Se inició como un proyecto de investigación en Microsoft Research y fue pionero en varios conceptos en la orquestación multiagente, como GroupChat y el entorno de ejecución del agente controlado por eventos. El proyecto ha sido una colaboración fructífero de la comunidad de código abierto y muchas características importantes procedentes de colaboradores externos.
Microsoft Agent Framework es un nuevo SDK de varios lenguajes para compilar agentes y flujos de trabajo de inteligencia artificial mediante LLM. Representa una evolución significativa de las ideas pioneras en AutoGen e incorpora lecciones aprendidas del uso real. Está desarrollado por los equipos principales de AutoGen y Kernel semántico en Microsoft, y está diseñado para ser una nueva base para crear aplicaciones de inteligencia artificial en el futuro.
En esta guía se describe una ruta de migración práctica: comienza escribiendo lo que permanece igual y qué cambios se producen de un vistazo. A continuación, abarca la configuración del cliente del modelo, las características de un solo agente y, por último, la orquestación multiagente con código concreto en paralelo. A lo largo del proceso, los vínculos a ejemplos ejecutables en el repositorio de Agent Framework le ayudan a validar cada paso.
Diferencias y similitudes clave
Qué sigue siendo el mismo
Los fundamentos son familiares. Todavía se crean agentes en torno a un cliente de modelo, se proporcionan instrucciones y se adjuntan herramientas. Ambas bibliotecas admiten herramientas de estilo de función, streaming de tokens, contenido multiplataforma y E/S asincrónica.
# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")
# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")
Diferencias clave
Estilo de orquestación: AutoGen empareja un núcleo controlado por eventos con un nivel alto
Team. Agent Framework se centra en una baseWorkflowde gráficos con tipo que enruta los datos a lo largo de los bordes y activa los ejecutores cuando las entradas están listas.Herramientas: AutoGen ajusta las funciones con
FunctionTool. Agent Framework usa@ai_functionesquemas , deduce automáticamente los esquemas y agrega herramientas hospedadas como un intérprete de código y una búsqueda web.Comportamiento del agente:
AssistantAgentes de un solo turno a menos que aumentemax_tool_iterations.ChatAgentes multiturno de forma predeterminada y mantiene la invocación de herramientas hasta que pueda devolver una respuesta final.Runtime: AutoGen ofrece entornos de ejecución distribuidos insertados y experimentales. Agent Framework se centra en la composición de un solo proceso hoy en día; la ejecución distribuida está planeada.
Creación y configuración de cliente de modelos
Ambos marcos proporcionan clientes de modelos para proveedores de inteligencia artificial principales, con API similares, pero no idénticas.
| Característica | AutoGen | Marco de trabajo del agente |
|---|---|---|
| Cliente openAI | OpenAIChatCompletionClient |
OpenAIChatClient |
| Cliente de respuestas de OpenAI | ❌ No disponible | OpenAIResponsesClient |
| Azure OpenAI | AzureOpenAIChatCompletionClient |
AzureOpenAIChatClient |
| Respuestas de Azure OpenAI | ❌ No disponible | AzureOpenAIResponsesClient |
| Azure AI | AzureAIChatCompletionClient |
AzureAIAgentClient |
| Anthropic | AnthropicChatCompletionClient |
🚧 Planificado |
| Ollama | OllamaChatCompletionClient |
🚧 Planificado |
| Almacenamiento en memoria caché |
ChatCompletionCache envoltura |
🚧 Planificado |
Clientes de modelos de AutoGen
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient
# OpenAI
client = OpenAIChatCompletionClient(
model="gpt-5",
api_key="your-key"
)
# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
azure_endpoint="https://your-endpoint.openai.azure.com/",
azure_deployment="gpt-5",
api_version="2024-12-01",
api_key="your-key"
)
ChatClients de Marco de agente
from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient
# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")
# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")
Para obtener ejemplos detallados, consulte:
- Cliente de chat de OpenAI : configuración básica del cliente openAI
- Cliente de chat de Azure OpenAI : Azure OpenAI con autenticación
- Cliente de Azure AI : integración del agente de Azure AI
Compatibilidad con la API de respuestas (agente framework exclusivo)
Agent Framework y AzureOpenAIResponsesClientOpenAIResponsesClient proporcionan compatibilidad especializada con modelos de razonamiento y respuestas estructuradas que no están disponibles en AutoGen:
from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient
# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")
# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")
Para ver ejemplos de la API de respuestas, consulte:
- Azure Responses Client Basic : Azure OpenAI con respuestas
- OpenAI Responses Client Basic : integración de respuestas de OpenAI
Asignación de características de Single-Agent
En esta sección se asignan características de agente único entre AutoGen y Agent Framework. Con un cliente implementado, cree un agente, adjunte herramientas y elija entre la ejecución de streaming y streaming.
Creación y ejecución básicas del agente
Una vez configurado un cliente de modelo, el siguiente paso consiste en crear agentes. Ambos marcos proporcionan abstracciones de agente similares, pero con diferentes comportamientos predeterminados y opciones de configuración.
AutoGen AssistantAgent
from autogen_agentchat.agents import AssistantAgent
agent = AssistantAgent(
name="assistant",
model_client=client,
system_message="You are a helpful assistant.",
tools=[my_tool],
max_tool_iterations=1 # Single-turn by default
)
# Execution
result = await agent.run(task="What's the weather?")
Agent Framework ChatAgent
from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient
# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
@ai_function
def get_time() -> str:
"""Get current time."""
return "Current time: 2:30 PM"
# Create client
client = OpenAIChatClient(model_id="gpt-5")
async def example():
# Direct creation
agent = ChatAgent(
name="assistant",
chat_client=client,
instructions="You are a helpful assistant.",
tools=[get_weather] # Multi-turn by default
)
# Factory method (more convenient)
agent = client.create_agent(
name="assistant",
instructions="You are a helpful assistant.",
tools=[get_weather]
)
# Execution with runtime tool configuration
result = await agent.run(
"What's the weather?",
tools=[get_time], # Can add tools at runtime
tool_choice="auto"
)
Diferencias clave:
-
Comportamiento predeterminado:
ChatAgentitera automáticamente a través de llamadas a herramientas, mientras queAssistantAgentrequiere una configuración explícitamax_tool_iterations -
Configuración en tiempo de ejecución:
ChatAgent.run()acepta parámetrostoolsytool_choicepara la personalización por invocación - Métodos de fábrica: Agent Framework proporciona métodos de fábrica prácticos directamente desde clientes de chat
-
Administración de estado:
ChatAgentno tiene estado y no mantiene el historial de conversaciones entre invocaciones, a diferenciaAssistantAgentde lo que mantiene el historial de conversaciones como parte de su estado.
Administración del estado de conversación con AgentThread
Para continuar las conversaciones con ChatAgent, use AgentThread para administrar el historial de conversaciones:
# Assume we have an agent from previous examples
async def conversation_example():
# Create a new thread that will be reused
thread = agent.get_new_thread()
# First interaction - thread is empty
result1 = await agent.run("What's 2+2?", thread=thread)
print(result1.text) # "4"
# Continue conversation - thread contains previous messages
result2 = await agent.run("What about that number times 10?", thread=thread)
print(result2.text) # "40" (understands "that number" refers to 4)
# AgentThread can use external storage, similar to ChatCompletionContext in AutoGen
Sin estado de forma predeterminada: demostración rápida
# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text) # for example, "4"
r2 = await agent.run("What about that number times 10?")
print(r2.text) # Likely ambiguous without prior context; cannot be "40"
# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text) # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text) # "40"
Para ver ejemplos de administración de subprocesos, consulte:
- Azure AI con Thread: administración de estado de conversación
- Cliente de chat openAI con subproceso : patrones de uso de subprocesos
- Subprocesos respaldados por Redis : conservación del estado de conversación externamente
Equivalencia del agente de OpenAI Assistant
Ambos marcos proporcionan integración de api de OpenAI Assistant:
# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient
Para ver ejemplos de OpenAI Assistant, consulte:
- OpenAI Assistants Basic : configuración básica del asistente
- OpenAI Assistants with Function Tools : integración de herramientas personalizadas
- Azure OpenAI Assistants Basic: configuración del asistente de Azure
- Asistentes de OpenAI con subprocesos: administración de subprocesos
Compatibilidad con streaming
Ambos marcos transmiten tokens en tiempo real (desde clientes y agentes) para mantener la capacidad de respuesta de las INTERFACES de usuario.
AutoGen Streaming
# Model client streaming
async for chunk in client.create_stream(messages):
if isinstance(chunk, str):
print(chunk, end="")
# Agent streaming
async for event in agent.run_stream(task="Hello"):
if isinstance(event, ModelClientStreamingChunkEvent):
print(event.content, end="")
elif isinstance(event, TaskResult):
print("Final result received")
Streaming del marco de trabajo del agente
# Assume we have client, agent, and tools from previous examples
async def streaming_example():
# Chat client streaming
async for chunk in client.get_streaming_response("Hello", tools=tools):
if chunk.text:
print(chunk.text, end="")
# Agent streaming
async for chunk in agent.run_stream("Hello"):
if chunk.text:
print(chunk.text, end="", flush=True)
Sugerencia: En Agent Framework, los clientes y los agentes producen la misma forma de actualización; puede leer chunk.text en cualquier caso.
Tipos de mensaje y creación
Comprender cómo funcionan los mensajes es fundamental para una comunicación eficaz del agente. Ambos marcos proporcionan enfoques diferentes para la creación y control de mensajes, con AutoGen mediante clases de mensajes independientes y El marco de trabajo del agente mediante un sistema de mensajes unificado.
Tipos de mensajes AutoGen
from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage
# Text message
text_msg = TextMessage(content="Hello", source="user")
# Multi-modal message
multi_modal_msg = MultiModalMessage(
content=["Describe this image", image_data],
source="user"
)
# Convert to model format for use with model clients
user_message = text_msg.to_model_message()
Tipos de mensajes del marco de agente
from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64
# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")
# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"
# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
role=Role.USER,
contents=[
TextContent(text="Describe this image"),
DataContent(uri=image_uri, media_type="image/jpeg")
]
)
Diferencias clave:
- AutoGen usa clases de mensaje independientes (
TextMessage,MultiModalMessage) con unsourcecampo - Agent Framework usa un unificado
ChatMessagecon objetos de contenido con tipo y unrolecampo - Los mensajes de Agent Framework usan
Roleenumeración (USER, ASSISTANT, SYSTEM, TOOL) en lugar de orígenes de cadena
Creación e integración de herramientas
Las herramientas amplían las funcionalidades del agente más allá de la generación de texto. Los marcos toman diferentes enfoques para la creación de herramientas, con Agent Framework que proporciona una generación de esquemas más automatizada.
AutoGen FunctionTool
from autogen_core.tools import FunctionTool
async def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Manual tool creation
tool = FunctionTool(
func=get_weather,
description="Get weather information"
)
# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])
Marco de trabajo del agente @ai_function
from agent_framework import ai_function
from typing import Annotated
from pydantic import Field
@ai_function
def get_weather(
location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])
Para obtener ejemplos detallados, consulte:
- Agente de chat de OpenAI Básico : agente de chat de OpenAI simple
- OpenAI con Herramientas de funciones : agente con herramientas personalizadas
- Azure OpenAI Basic : configuración del agente de Azure OpenAI
Herramientas hospedadas (exclusivo de Agent Framework)
Agent Framework proporciona herramientas hospedadas que no están disponibles en AutoGen:
from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient
# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")
# Code execution tool
code_tool = HostedCodeInterpreterTool()
# Web search tool
search_tool = HostedWebSearchTool()
agent = ChatAgent(
name="researcher",
chat_client=client,
tools=[code_tool, search_tool]
)
Para obtener ejemplos detallados, consulte:
- Azure AI con el intérprete de código : herramienta de ejecución de código
- Azure AI con varias herramientas: varias herramientas hospedadas
- OpenAI con Web Search: integración de búsqueda web
Requisitos y advertencias:
- Las herramientas hospedadas solo están disponibles en modelos o cuentas que las admitan. Compruebe los derechos y la compatibilidad del modelo con el proveedor antes de habilitar estas herramientas.
- La configuración difiere según el proveedor; siga los requisitos previos de cada ejemplo para la configuración y los permisos.
- No todos los modelos admiten todas las herramientas hospedadas (por ejemplo, búsqueda web frente al intérprete de código). Elija un modelo compatible en su entorno.
Nota:
AutoGen admite herramientas de ejecución de código local, pero esta característica está planeada para futuras versiones de Agent Framework.
Diferencia clave: Agent Framework controla automáticamente la iteración de herramientas en el nivel de agente. A diferencia del parámetro de AutoGen, los agentes de max_tool_iterations Agent Framework continúan la ejecución de herramientas hasta la finalización de forma predeterminada, con mecanismos de seguridad integrados para evitar bucles infinitos.
Compatibilidad con el servidor MCP
Para la integración avanzada de herramientas, ambos marcos admiten el Protocolo de contexto de modelo (MCP), lo que permite a los agentes interactuar con los servicios externos y los orígenes de datos. Agent Framework proporciona compatibilidad integrada más completa.
Compatibilidad con MCP de AutoGen
AutoGen tiene compatibilidad básica con MCP a través de extensiones (los detalles de implementación específicos varían según la versión).
Compatibilidad con MCP de Agent Framework
from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
# Stdio MCP server
mcp_tool = MCPStdioTool(
name="filesystem",
command="uvx mcp-server-filesystem",
args=["/allowed/directory"]
)
# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
name="http_mcp",
url="http://localhost:8000/sse"
)
# WebSocket MCP
ws_mcp = MCPWebsocketTool(
name="websocket_mcp",
url="ws://localhost:8000/ws"
)
agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])
Para ver ejemplos de MCP, consulte:
- OpenAI con MCP local: uso de MCPStreamableHTTPTool con OpenAI
- OpenAI con MCP hospedado: uso de servicios DE MCP hospedados
- Azure AI con MCP local: uso de MCP con Azure AI
- Azure AI con MCP hospedado : uso de MCP hospedado con Azure AI
Patrón de agente como herramienta
Un patrón eficaz es usar agentes como herramientas, lo que permite arquitecturas de agente jerárquicas. Ambos marcos admiten este patrón con diferentes implementaciones.
AutoGen AgentTool
from autogen_agentchat.tools import AgentTool
# Create specialized agent
writer = AssistantAgent(
name="writer",
model_client=client,
system_message="You are a creative writer."
)
# Wrap as tool
writer_tool = AgentTool(agent=writer)
# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
model="gpt-5",
parallel_tool_calls=False
)
coordinator = AssistantAgent(
name="coordinator",
model_client=coordinator_client,
tools=[writer_tool]
)
Agent Framework as_tool()
from agent_framework import ChatAgent
# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
name="writer",
chat_client=client,
instructions="You are a creative writer."
)
# Convert to tool
writer_tool = writer.as_tool(
name="creative_writer",
description="Generate creative content",
arg_name="request",
arg_description="What to write"
)
# Use in coordinator
coordinator = ChatAgent(
name="coordinator",
chat_client=client,
tools=[writer_tool]
)
Nota de migración explícita: en AutoGen, establezca parallel_tool_calls=False en el cliente del modelo del coordinador al encapsular agentes como herramientas para evitar problemas de simultaneidad al invocar la misma instancia del agente.
En Agent Framework, as_tool() no requiere deshabilitar las llamadas a herramientas paralelas, ya que los agentes no tienen estado de forma predeterminada.
Middleware (característica del marco de agente)
Agent Framework presenta funcionalidades de middleware que AutoGen carece. El middleware permite problemas transversales eficaces, como el registro, la seguridad y la supervisión del rendimiento.
from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable
# Assume we have client from previous examples
async def logging_middleware(
context: AgentRunContext,
next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
print(f"Agent {context.agent.name} starting")
await next(context)
print(f"Agent {context.agent.name} completed")
async def security_middleware(
context: FunctionInvocationContext,
next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
if "password" in str(context.arguments):
print("Blocking function call with sensitive data")
return # Don't call next()
await next(context)
agent = ChatAgent(
name="secure_agent",
chat_client=client,
middleware=[logging_middleware, security_middleware]
)
Ventajas:
- Seguridad: validación de entrada y filtrado de contenido
- Observabilidad: registro, métricas y seguimiento
- Rendimiento: almacenamiento en caché y limitación de velocidad
- Control de errores: degradación correcta y lógica de reintento
Para obtener ejemplos detallados de middleware, consulte:
- Middleware basado en funciones: middleware de función simple
- Middleware basado en clases : middleware orientado a objetos
- Middleware de control de excepciones : patrones de control de errores
- Middleware de estado compartido : administración de estado entre agentes
Agentes personalizados
A veces no quiere un agente respaldado por modelos, sino que quiere un agente con respaldo de API o determinista con lógica personalizada. Ambos marcos admiten la creación de agentes personalizados, pero los patrones difieren.
AutoGen: Subclase BaseChatAgent
from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken
class StaticAgent(BaseChatAgent):
def __init__(self, name: str = "static", description: str = "Static responder") -> None:
super().__init__(name, description)
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]: # Which message types this agent produces
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
# Always return a static response
return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))
Notes:
- Implemente
on_messages(...)y devuelva unResponsecon un mensaje de chat. - Opcionalmente, implemente
on_reset(...)para borrar el estado interno entre ejecuciones.
Marco del agente: extender BaseAgent (compatible con subprocesos)
from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
BaseAgent,
ChatMessage,
Role,
TextContent,
)
class StaticAgent(BaseAgent):
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
# Build a static reply
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
# Persist conversation to the provided AgentThread (if any)
if thread is not None:
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
return AgentRunResponse(messages=[reply])
async def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
# Stream the same static response in a single chunk for simplicity
yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)
# Notify thread of input and the complete response once streaming ends
if thread is not None:
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
Notes:
-
AgentThreadmantiene el estado de conversación externamente; useagent.get_new_thread()y páselo arun/run_stream. - Llame
self._notify_thread_of_new_messages(thread, input_messages, response_messages)a para que el subproceso tenga ambos lados del intercambio. - Consulte el ejemplo completo: Agente personalizado
A continuación, echemos un vistazo a la orquestación multiagente, el área en la que los marcos difieren más.
Asignación de características multiagente
Información general sobre el modelo de programación
Los modelos de programación multiagente representan la diferencia más significativa entre los dos marcos.
Enfoque de modelo dual de AutoGen
AutoGen proporciona dos modelos de programación:
-
autogen-core: programación basada en eventos de bajo nivel conRoutedAgentsuscripciones de mensajes y -
Teamabstracción: modelo de alto nivel centrado en la ejecución basado en la base deautogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
@message_handler
async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
# Handle specific message types
pass
# High-level Team (easier but limited)
team = RoundRobinGroupChat(
participants=[agent1, agent2],
termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")
Desafíos:
- El modelo de bajo nivel es demasiado complejo para la mayoría de los usuarios
- El modelo de alto nivel puede convertirse en una limitación para comportamientos complejos
- El puente entre los dos modelos agrega complejidad de implementación
Modelo de flujo de trabajo unificado de Agent Framework
Agent Framework proporciona una única Workflow abstracción que combina lo mejor de ambos enfoques:
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
response = await agent1.run(input_msg)
await ctx.send_message(response.text)
@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
response = await agent2.run(input_msg)
await ctx.yield_output(response.text) # Final output
# Build typed data flow graph
workflow = (WorkflowBuilder()
.add_edge(agent1_executor, agent2_executor)
.set_start_executor(agent1_executor)
.build())
# Example usage (would be in async context)
# result = await workflow.run("Initial input")
Para obtener ejemplos detallados de flujo de trabajo, consulte:
- Conceptos básicos de flujo de trabajo : introducción a ejecutores y bordes
- Agentes en flujo de trabajo : integración de agentes en flujos de trabajo
- Streaming de flujo de trabajo: ejecución del flujo de trabajo en tiempo real
Ventajas:
- Modelo unificado: abstracción única para todos los niveles de complejidad
- Seguridad de tipos: entradas y salidas fuertemente tipadas
- Visualización de grafos: borrar la representación del flujo de datos
- Composición flexible: mezcla de agentes, funciones y sub flujos de trabajo
Flujo de trabajo frente a GraphFlow
La abstracción de Workflow Agent Framework está inspirada en la característica experimental GraphFlow de AutoGen, pero representa una evolución significativa en la filosofía de diseño:
- GraphFlow: flujo de control basado en el que los bordes son transiciones y los mensajes se transmiten a todos los agentes; las transiciones están condicionadas en el contenido del mensaje transmitido
- Flujo de trabajo: flujo de datos basado en el que los mensajes se enrutan a través de bordes y ejecutores específicos se activan mediante bordes, con compatibilidad con la ejecución simultánea.
Información general del objeto visual
El diagrama siguiente contrasta el flujo de control de GraphFlow (izquierda) de AutoGen con el flujo de trabajo de flujo de datos (derecha) de Agent Framework. GraphFlow modela agentes como nodos con transiciones condicionales y difusiones. Ejecutores de modelos de flujo de trabajo (agentes, funciones o sub flujos de trabajo) conectados por bordes tipados; también admite pausas de solicitud/respuesta y puntos de control.
flowchart LR
subgraph AutoGenGraphFlow
direction TB
U[User / Task] --> A[Agent A]
A -->|success| B[Agent B]
A -->|retry| C[Agent C]
A -. broadcast .- B
A -. broadcast .- C
end
subgraph AgentFrameworkWorkflow
direction TB
I[Input] --> E1[Executor 1]
E1 -->|"str"| E2[Executor 2]
E1 -->|"image"| E3[Executor 3]
E3 -->|"str"| E2
E2 --> OUT[(Final Output)]
end
R[Request / Response Gate]
E2 -. request .-> R
R -. resume .-> E2
CP[Checkpoint]
E1 -. save .-> CP
CP -. load .-> E1
En la práctica:
- GraphFlow usa agentes como nodos y transmite mensajes; los bordes representan transiciones condicionales.
- Las rutas de flujo de trabajo escriben mensajes a lo largo de los bordes. Los nodos (ejecutores) pueden ser agentes, funciones puras o sub flujos de trabajo.
- Solicitud/respuesta permite pausar un flujo de trabajo para la entrada externa; la creación de puntos de control conserva el progreso y habilita la reanudación.
Comparación de código
1) Secuencial + Condicional
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)
graph = (
DiGraphBuilder()
.add_node(writer).add_node(reviewer).add_node(editor)
.add_edge(writer, reviewer) # always
.add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
.set_entry_point(writer)
).build()
team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"Draft: {task}")
@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
decision = "approve" if "solar" in draft.lower() else "revise"
await ctx.send_message(f"{decision}:{draft}")
@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
if msg.startswith("approve:"):
await ctx.yield_output(msg.split(":", 1)[1])
else:
await ctx.yield_output("Needs revision")
workflow_seq = (
WorkflowBuilder()
.add_edge(writer_exec, reviewer_exec)
.add_edge(reviewer_exec, editor_exec)
.set_start_executor(writer_exec)
.build()
)
2) Fan-out + Join (ALL vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d
# ALL (default): D runs after both B and C
g_all = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D).add_edge(C, D)
.set_entry_point(A)
).build()
# ANY: D runs when either B or C completes
g_any = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D, activation_group="join_d", activation_condition="any")
.add_edge(C, D, activation_group="join_d", activation_condition="any")
.set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B:{task}", target_id="B")
await ctx.send_message(f"C:{task}", target_id="C")
@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B_done:{text}")
@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"C_done:{text}")
@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"First: {msg}") # ANY join (first arrival)
@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
state = await ctx.get_executor_state() or {"items": []}
state["items"].append(msg)
await ctx.set_executor_state(state)
if len(state["items"]) >= 2:
await ctx.yield_output(" | ".join(state["items"])) # ALL join
wf_any = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_any).add_edge(branch_c, join_any)
.set_start_executor(start)
.build()
)
wf_all = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_all).add_edge(branch_c, join_all)
.set_start_executor(start)
.build()
)
3) Enrutamiento dirigido (sin difusión)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
# Route selectively using target_id
if task.startswith("image:"):
await ctx.send_message(task.removeprefix("image:"), target_id="vision")
else:
await ctx.send_message(task, target_id="writer")
@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Draft: {text}")
@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Caption: {image_ref}")
workflow = (
WorkflowBuilder()
.add_edge(ingest, write)
.add_edge(ingest, caption)
.set_start_executor(ingest)
.build()
)
# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")
Qué se debe observar:
- GraphFlow difunde mensajes y usa transiciones condicionales. El comportamiento de combinación se configura a través del lado de
activationdestino y por bordeactivation_group/activation_condition(por ejemplo, agrupa ambos bordes enjoin_dcon ).activation_condition="any" - El flujo de trabajo enruta los datos explícitamente; use
target_idpara seleccionar ejecutores de bajada. El comportamiento de combinación reside en el ejecutor receptor (por ejemplo, el rendimiento de la primera entrada frente a esperar a todos) o a través de generadores o agregadores de orquestación. - Los ejecutores del flujo de trabajo son de forma libre: encapsulan un
ChatAgent, una función o un sub-flujo de trabajo y los mezclan en el mismo gráfico.
Diferencias clave
En la tabla siguiente se resumen las diferencias fundamentales entre graphFlow de AutoGen y el flujo de trabajo de Agent Framework:
| Aspecto | AutoGen GraphFlow | Flujo de trabajo del marco de trabajo del agente |
|---|---|---|
| Tipo de flujo | Flujo de control (los bordes son transiciones) | Flujo de datos (mensajes de ruta de bordes) |
| Tipos de nodo | Solo agentes | Agentes, funciones, sub-flujos de trabajo |
| Activación | Difusión de mensajes | Activación basada en edge |
| Seguridad de tipos | Limitado | Escritura segura a lo largo de todo |
| Capacidad de redacción | Limitado | Altamente composable |
Patrones de anidamiento
Anidamiento de equipos de AutoGen
# Inner team
inner_team = RoundRobinGroupChat(
participants=[specialist1, specialist2],
termination_condition=StopAfterNMessages(3)
)
# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
participants=[coordinator, inner_team, reviewer], # Team as participant
termination_condition=StopAfterNMessages(10)
)
# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")
Características de anidamiento de AutoGen:
- El equipo anidado recibe todos los mensajes del equipo externo
- Los mensajes del equipo anidados se transmiten a todos los participantes externos del equipo
- Contexto de mensaje compartido en todos los niveles
Anidamiento de flujo de trabajo del marco de agente
from agent_framework import WorkflowExecutor, WorkflowBuilder
# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor
# Create sub-workflow
sub_workflow = (WorkflowBuilder()
.add_edge(specialist1_executor, specialist2_executor)
.set_start_executor(specialist1_executor)
.build())
# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
workflow=sub_workflow,
id="sub_process"
)
# Use in parent workflow
parent_workflow = (WorkflowBuilder()
.add_edge(coordinator_executor, sub_workflow_executor)
.add_edge(sub_workflow_executor, reviewer_executor)
.set_start_executor(coordinator_executor)
.build())
Características de anidamiento del marco de agente:
- Entrada y salida aisladas a través de
WorkflowExecutor - Sin difusión de mensajes: los datos fluyen a través de conexiones específicas
- Administración de estado independiente para cada nivel de flujo de trabajo
Patrones de chat en grupo
Los patrones de chat de grupo permiten que varios agentes colaboren en tareas complejas. Este es el modo en que los patrones comunes se traducen entre marcos de trabajo.
Patrón RoundRobinGroupChat
Implementación de AutoGen:
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages
team = RoundRobinGroupChat(
participants=[agent1, agent2, agent3],
termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")
Implementación del marco de trabajo del agente:
from agent_framework import SequentialBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()
# Example usage (would be in async context)
async def sequential_example():
# Each agent appends to shared conversation
async for event in workflow.run_stream("Discuss this topic"):
if isinstance(event, WorkflowOutputEvent):
conversation_history = event.data # list[ChatMessage]
Para obtener ejemplos detallados de orquestación, consulte:
- Agentes secuenciales : ejecución del agente de estilo Round Robin
- Ejecutores personalizados secuenciales : patrones de ejecutor personalizados
En el caso de los patrones de ejecución simultáneos, Agent Framework también proporciona:
from agent_framework import ConcurrentBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
.participants([agent1, agent2, agent3])
.build())
# Example usage (would be in async context)
async def concurrent_example():
# All agents process the input concurrently
async for event in workflow.run_stream("Process this in parallel"):
if isinstance(event, WorkflowOutputEvent):
results = event.data # Combined results from all agents
Para ver ejemplos de ejecución simultánea, consulte:
- Agentes simultáneos : ejecución del agente paralelo
- Ejecutores personalizados simultáneos : patrones paralelos personalizados
- Simultáneo con el agregador personalizado : patrones de agregación de resultados
Patrón MagenticOneGroupChat
Implementación de AutoGen:
from autogen_agentchat.teams import MagenticOneGroupChat
team = MagenticOneGroupChat(
participants=[researcher, coder, executor],
model_client=coordinator_client,
termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")
Implementación del marco de trabajo del agente:
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
MagenticBuilder,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient
# Create a manager agent for orchestration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
chat_client=OpenAIChatClient(),
)
workflow = (
MagenticBuilder()
.participants(researcher=researcher, coder=coder)
.with_standard_manager(
agent=manager_agent,
max_round_count=20,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
# Example usage (would be in async context)
async def magentic_example():
output: str | None = None
async for event in workflow.run_stream("Complex research task"):
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
text = event.data.text if event.data else ""
print(f"[ORCHESTRATOR]: {text}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if event.data and event.data.text:
print(f"[{agent_id}]: {event.data.text}", end="")
elif isinstance(event, WorkflowOutputEvent):
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
Opciones de personalización del marco de agente:
El flujo de trabajo de Magentic proporciona amplias opciones de personalización:
- Configuración del administrador: use un ChatAgent con instrucciones personalizadas y la configuración del modelo
-
Límites de redondeo:
max_round_count,max_stall_count,max_reset_count -
Streaming de eventos: uso
AgentRunUpdateEventconmagentic_event_typemetadatos - Especialización del agente: instrucciones y herramientas personalizadas por agente
- Human-in-the-loop: revisión del plan, aprobación de herramientas y intervención de parada
# Advanced customization example with human-in-the-loop
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
MagenticBuilder,
MagenticHumanInterventionDecision,
MagenticHumanInterventionKind,
MagenticHumanInterventionReply,
MagenticHumanInterventionRequest,
RequestInfoEvent,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient
# Create manager agent with custom configuration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator for complex tasks",
instructions="Custom orchestration instructions...",
chat_client=OpenAIChatClient(model_id="gpt-4o"),
)
workflow = (
MagenticBuilder()
.participants(
researcher=researcher_agent,
coder=coder_agent,
analyst=analyst_agent,
)
.with_standard_manager(
agent=manager_agent,
max_round_count=15, # Limit total rounds
max_stall_count=2, # Trigger stall handling
max_reset_count=1, # Allow one reset on failure
)
.with_plan_review() # Enable human plan review
.with_human_input_on_stall() # Enable human intervention on stalls
.build()
)
# Handle human intervention requests during execution
async for event in workflow.run_stream("Complex task"):
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
req = cast(MagenticHumanInterventionRequest, event.data)
if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
# Review and approve the plan
reply = MagenticHumanInterventionReply(
decision=MagenticHumanInterventionDecision.APPROVE
)
async for ev in workflow.send_responses_streaming({event.request_id: reply}):
pass # Handle continuation
Para obtener ejemplos detallados de Magentic, consulte:
- Flujo de trabajo magentic básico : flujo de trabajo multiagente orquestado estándar
- Magentic con puntos de control : flujos de trabajo orquestados persistentes
- Actualización del plan humano magentic : revisión del plan human-in-the-loop
- Aclaración del agente magentic : aprobación de herramientas para la aclaración del agente
- Magentic Human Replan - Intervención humana en puestos
Patrones futuros
La hoja de ruta de Agent Framework incluye varios patrones de AutoGen actualmente en desarrollo:
- Patrón de enjambre: coordinación de agentes basado en entrega
- SelectorGroupChat: selección del altavoz controlado por LLM
Human-in-the-Loop with Request Response
Una nueva característica clave de Agent Framework Workflow es el concepto de solicitud y respuesta, lo que permite a los flujos de trabajo pausar la ejecución y esperar a la entrada externa antes de continuar. Esta funcionalidad no está presente en la abstracción de Team AutoGen y permite sofisticados patrones humanos en el bucle.
Limitaciones de AutoGen
La abstracción de Team AutoGen se ejecuta continuamente una vez iniciada y no proporciona mecanismos integrados para pausar la ejecución de la entrada humana. Cualquier funcionalidad humana en bucle requiere implementaciones personalizadas fuera del marco.
Agent Framework Request-Response API
Agent Framework proporciona funcionalidades integradas de solicitud-respuesta en las que cualquier ejecutor puede enviar solicitudes mediante ctx.request_info() y controlar las respuestas con el @response_handler decorador.
from agent_framework import (
RequestInfoEvent, WorkflowBuilder, WorkflowContext,
Executor, handler, response_handler
)
from dataclasses import dataclass
# Assume we have agent_executor defined elsewhere
# Define typed request payload
@dataclass
class ApprovalRequest:
"""Request human approval for agent output."""
content: str = ""
agent_name: str = ""
# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
@handler
async def review_content(
self,
agent_response: str,
ctx: WorkflowContext
) -> None:
# Request human input with structured data
approval_request = ApprovalRequest(
content=agent_response,
agent_name="writer_agent"
)
await ctx.request_info(request_data=approval_request, response_type=str)
@response_handler
async def handle_approval_response(
self,
original_request: ApprovalRequest,
decision: str,
ctx: WorkflowContext
) -> None:
decision_lower = decision.strip().lower()
original_content = original_request.content
if decision_lower == "approved":
await ctx.yield_output(f"APPROVED: {original_content}")
else:
await ctx.yield_output(f"REVISION NEEDED: {decision}")
# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")
workflow = (WorkflowBuilder()
.add_edge(agent_executor, reviewer)
.set_start_executor(agent_executor)
.build())
Ejecución de flujos de trabajo humanos en bucle
Agent Framework proporciona API de streaming para controlar el ciclo de pausa y reanudación:
from agent_framework import RequestInfoEvent, WorkflowOutputEvent
# Assume we have workflow defined from previous examples
async def run_with_human_input():
pending_responses = None
completed = False
while not completed:
# First iteration uses run_stream, subsequent use send_responses_streaming
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("initial input")
)
events = [event async for event in stream]
pending_responses = None
# Collect human requests and outputs
for event in events:
if isinstance(event, RequestInfoEvent):
# Display request to human and collect response
request_data = event.data # ApprovalRequest instance
print(f"Review needed: {request_data.content}")
human_response = input("Enter 'approved' or revision notes: ")
pending_responses = {event.request_id: human_response}
elif isinstance(event, WorkflowOutputEvent):
print(f"Final result: {event.data}")
completed = True
Para ver ejemplos de flujo de trabajo de human-in-the-loop, consulte:
- Adivinar juego con entrada humana : flujo de trabajo interactivo con comentarios del usuario
- Flujo de trabajo como agente con entrada humana : flujos de trabajo anidados con interacción humana
Puntos de control y reanudación de flujos de trabajo
Otra ventaja clave de la abstracción de Workflow Agent Framework sobre la abstracción de Team AutoGen es la compatibilidad integrada con puntos de control y reanudación de la ejecución. Esto permite pausar, conservar y reanudar flujos de trabajo más adelante desde cualquier punto de control, lo que proporciona tolerancia a errores y habilita flujos de trabajo de larga duración o asincrónicos.
Limitaciones de AutoGen
La abstracción de Team AutoGen no proporciona funcionalidades de punto de control integradas. Cualquier mecanismo de persistencia o recuperación debe implementarse externamente, a menudo requiere una lógica compleja de administración de estado y serialización.
Puntos de control del marco de agente
Agent Framework proporciona un punto de control completo a través FileCheckpointStorage de y el with_checkpointing() método en WorkflowBuilder. Captura de puntos de control:
-
Estado del ejecutor: estado local para cada ejecutor mediante
ctx.set_executor_state() -
Estado compartido: estado entre ejecutores mediante
ctx.set_shared_state() - Colas de mensajes: mensajes pendientes entre ejecutores
- Posición del flujo de trabajo: progreso de la ejecución actual y pasos siguientes
from agent_framework import (
FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
Executor, handler
)
from typing_extensions import Never
class ProcessingExecutor(Executor):
@handler
async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
# Process the data
result = f"Processed: {data.upper()}"
print(f"Processing: '{data}' -> '{result}'")
# Persist executor-local state
prev_state = await ctx.get_executor_state() or {}
count = prev_state.get("count", 0) + 1
await ctx.set_executor_state({
"count": count,
"last_input": data,
"last_output": result
})
# Persist shared state for other executors
await ctx.set_shared_state("original_input", data)
await ctx.set_shared_state("processed_output", result)
await ctx.send_message(result)
class FinalizeExecutor(Executor):
@handler
async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
result = f"Final: {data}"
await ctx.yield_output(result)
# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")
# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
.add_edge(processing_executor, finalize_executor)
.set_start_executor(processing_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable checkpointing
.build())
# Example usage (would be in async context)
async def checkpoint_example():
# Run workflow - checkpoints are created automatically
async for event in workflow.run_stream("input data"):
print(f"Event: {event}")
Reanudación desde puntos de control
Agent Framework proporciona API para enumerar, inspeccionar y reanudar desde puntos de comprobación específicos:
from typing_extensions import Never
from agent_framework import (
Executor,
FileCheckpointStorage,
WorkflowContext,
WorkflowBuilder,
get_checkpoint_summary,
handler,
)
class UpperCaseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
result = text.upper()
await ctx.send_message(result)
class ReverseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
result = text[::-1]
await ctx.yield_output(result)
def create_workflow(checkpoint_storage: FileCheckpointStorage):
"""Create a workflow with two executors and checkpointing."""
upper_executor = UpperCaseExecutor(id="upper")
reverse_executor = ReverseExecutor(id="reverse")
return (WorkflowBuilder()
.add_edge(upper_executor, reverse_executor)
.set_start_executor(upper_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage)
.build())
# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
async def checkpoint_resume_example():
# List available checkpoints
checkpoints = await checkpoint_storage.list_checkpoints()
# Display checkpoint information
for checkpoint in checkpoints:
summary = get_checkpoint_summary(checkpoint)
print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")
# Resume from a specific checkpoint
if checkpoints:
chosen_checkpoint_id = checkpoints[0].checkpoint_id
# Create new workflow instance and resume
new_workflow = create_workflow(checkpoint_storage)
async for event in new_workflow.run_stream(
checkpoint_id=chosen_checkpoint_id,
checkpoint_storage=checkpoint_storage
):
print(f"Resumed event: {event}")
Características avanzadas de punto de control
Punto de control con integración de human-in-the-loop:
La creación de puntos de comprobación funciona perfectamente con flujos de trabajo humanos en bucle, lo que permite pausar los flujos de trabajo para la entrada humana y reanudarlos más adelante. Al reanudar desde un punto de control que contiene solicitudes pendientes, esas solicitudes se volverán a emitir como eventos:
# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
# Resume from checkpoint - pending requests will be re-emitted
request_info_events = []
async for event in workflow.run_stream(
checkpoint_id=checkpoint_id,
checkpoint_storage=checkpoint_storage
):
if isinstance(event, RequestInfoEvent):
request_info_events.append(event)
# Handle re-emitted pending request
responses = {}
for event in request_info_events:
response = handle_request(event.data)
responses[event.request_id] = response
# Send response back to workflow
async for event in workflow.send_responses_streaming(responses):
print(f"Event: {event}")
Ventajas clave
En comparación con AutoGen, el punto de comprobación de Agent Framework proporciona:
- Persistencia automática: no se requiere administración de estado manual
- Recuperación granular: reanudar desde cualquier límite de superpaso
- Aislamiento de estado: independiente del estado local y compartido del ejecutor
- Integración de human-in-the-loop: pausa de conexión directa con entrada humana
- Tolerancia a errores: recuperación sólida de errores o interrupciones
Ejemplos prácticos
Para obtener ejemplos completos de puntos de comprobación, consulte:
- Punto de control con Resume: punto de control básico y reanudación interactiva
- Punto de control con human-in-the-Loop : flujos de trabajo persistentes con puertas de aprobación humana
- Punto de control de sub-flujo de trabajo : puntos de control de flujos de trabajo anidados
- Punto de control magentic : puntos de control de flujos de trabajo multiagente orquestados
Observability
Tanto AutoGen como Agent Framework proporcionan funcionalidades de observabilidad, pero con diferentes enfoques y características.
Observabilidad de AutoGen
AutoGen tiene compatibilidad nativa con OpenTelemetry con instrumentación para:
-
Seguimiento en tiempo de ejecución:
SingleThreadedAgentRuntimeyGrpcWorkerAgentRuntime -
Ejecución de herramientas:
BaseToolconexecute_toolintervalos siguientes convenciones semánticas de GenAI -
Operaciones del agente:
BaseChatAgentconcreate_agentintervalos yinvoke_agent
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime
# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)
# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)
Observabilidad del marco de agente
Agent Framework proporciona una observabilidad completa a través de varios enfoques:
- Configuración de código cero: instrumentación automática a través de variables de entorno
- Configuración manual: configuración mediante programación con parámetros personalizados
- Telemetría enriquecida: agentes, flujos de trabajo y seguimiento de la ejecución de herramientas
- Salida de la consola: registro y visualización de la consola integrada
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient
# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317
# Or manual setup
setup_observability(
otlp_endpoint="http://localhost:4317"
)
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
async def observability_example():
# Observability is automatically applied to all agents and workflows
agent = ChatAgent(name="assistant", chat_client=client)
result = await agent.run("Hello") # Automatically traced
Diferencias clave:
- Complejidad de la instalación: Agent Framework ofrece opciones más sencillas de configuración de código cero
- Ámbito: Agent Framework proporciona una cobertura más amplia, incluida la observabilidad de nivel de flujo de trabajo.
- Visualización: Agent Framework incluye la salida y la interfaz de usuario de desarrollo de la consola integrada
- Configuración: Agent Framework ofrece opciones de configuración más flexibles
Para obtener ejemplos detallados de observabilidad, consulte:
- Configuración de código cero : configuración de variables de entorno
- Configuración manual : configuración mediante programación
- Observabilidad del agente : telemetría de agente único
- Observabilidad de flujo de trabajo: seguimiento de flujo de trabajo multiagente
Conclusión
Esta guía de migración proporciona una asignación completa entre AutoGen y Microsoft Agent Framework, que abarca todo, desde la creación básica de agentes hasta flujos de trabajo complejos de varios agentes. Conclusiones clave para la migración:
- La migración de un solo agente es sencilla, con API similares y funcionalidades mejoradas en Agent Framework
- Los patrones de varios agentes requieren replantearse el enfoque de las arquitecturas basadas en eventos a las arquitecturas basadas en flujo de datos, pero si ya está familiarizado con GraphFlow, la transición será más fácil.
- Agent Framework ofrece características adicionales, como middleware, herramientas hospedadas y flujos de trabajo tipados.
Para obtener ejemplos adicionales e instrucciones de implementación detalladas, consulte el directorio ejemplos de Agent Framework .
Categorías de ejemplo adicionales
Agent Framework proporciona ejemplos en otras áreas importantes:
- Subprocesos: ejemplos de subprocesos : administración del estado y el contexto de la conversación
- Entrada bidireccional: muestras bidireccionales : trabajar con imágenes y otros tipos de medios
- Proveedores de contexto: ejemplos de proveedores de contexto: patrones de integración de contexto externo