Compartir a través de


Guía de migración de AutoGen to Microsoft Agent Framework

Una guía completa para migrar de AutoGen al SDK de Python de Microsoft Agent Framework.

Tabla de contenido

Contexto

AutoGen es un marco para crear agentes de INTELIGENCIA ARTIFICIAL y sistemas multiagente mediante modelos de lenguaje grande (LLM). Se inició como un proyecto de investigación en Microsoft Research y fue pionero en varios conceptos en la orquestación multiagente, como GroupChat y el entorno de ejecución del agente controlado por eventos. El proyecto ha sido una colaboración fructífero de la comunidad de código abierto y muchas características importantes procedentes de colaboradores externos.

Microsoft Agent Framework es un nuevo SDK de varios lenguajes para compilar agentes y flujos de trabajo de inteligencia artificial mediante LLM. Representa una evolución significativa de las ideas pioneras en AutoGen e incorpora lecciones aprendidas del uso real. Está desarrollado por los equipos principales de AutoGen y Kernel semántico en Microsoft, y está diseñado para ser una nueva base para crear aplicaciones de inteligencia artificial en el futuro.

En esta guía se describe una ruta de migración práctica: comienza escribiendo lo que permanece igual y qué cambios se producen de un vistazo. A continuación, abarca la configuración del cliente del modelo, las características de un solo agente y, por último, la orquestación multiagente con código concreto en paralelo. A lo largo del proceso, los vínculos a ejemplos ejecutables en el repositorio de Agent Framework le ayudan a validar cada paso.

Diferencias y similitudes clave

Qué sigue siendo el mismo

Los fundamentos son familiares. Todavía se crean agentes en torno a un cliente de modelo, se proporcionan instrucciones y se adjuntan herramientas. Ambas bibliotecas admiten herramientas de estilo de función, streaming de tokens, contenido multiplataforma y E/S asincrónica.

# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")

# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")

Diferencias clave

  1. Estilo de orquestación: AutoGen empareja un núcleo controlado por eventos con un nivel alto Team. Agent Framework se centra en una base Workflow de gráficos con tipo que enruta los datos a lo largo de los bordes y activa los ejecutores cuando las entradas están listas.

  2. Herramientas: AutoGen ajusta las funciones con FunctionTool. Agent Framework usa @ai_functionesquemas , deduce automáticamente los esquemas y agrega herramientas hospedadas como un intérprete de código y una búsqueda web.

  3. Comportamiento del agente: AssistantAgent es de un solo turno a menos que aumente max_tool_iterations. ChatAgent es multiturno de forma predeterminada y mantiene la invocación de herramientas hasta que pueda devolver una respuesta final.

  4. Runtime: AutoGen ofrece entornos de ejecución distribuidos insertados y experimentales. Agent Framework se centra en la composición de un solo proceso hoy en día; la ejecución distribuida está planeada.

Creación y configuración de cliente de modelos

Ambos marcos proporcionan clientes de modelos para proveedores de inteligencia artificial principales, con API similares, pero no idénticas.

Característica AutoGen Marco de trabajo del agente
Cliente openAI OpenAIChatCompletionClient OpenAIChatClient
Cliente de respuestas de OpenAI ❌ No disponible OpenAIResponsesClient
Azure OpenAI AzureOpenAIChatCompletionClient AzureOpenAIChatClient
Respuestas de Azure OpenAI ❌ No disponible AzureOpenAIResponsesClient
Azure AI AzureAIChatCompletionClient AzureAIAgentClient
Anthropic AnthropicChatCompletionClient 🚧 Planificado
Ollama OllamaChatCompletionClient 🚧 Planificado
Almacenamiento en memoria caché ChatCompletionCache envoltura 🚧 Planificado

Clientes de modelos de AutoGen

from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient

# OpenAI
client = OpenAIChatCompletionClient(
    model="gpt-5",
    api_key="your-key"
)

# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
    azure_endpoint="https://your-endpoint.openai.azure.com/",
    azure_deployment="gpt-5",
    api_version="2024-12-01",
    api_key="your-key"
)

ChatClients de Marco de agente

from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient

# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")

# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")

Para obtener ejemplos detallados, consulte:

Compatibilidad con la API de respuestas (agente framework exclusivo)

Agent Framework y AzureOpenAIResponsesClientOpenAIResponsesClient proporcionan compatibilidad especializada con modelos de razonamiento y respuestas estructuradas que no están disponibles en AutoGen:

from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient

# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")

# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")

Para ver ejemplos de la API de respuestas, consulte:

Asignación de características de Single-Agent

En esta sección se asignan características de agente único entre AutoGen y Agent Framework. Con un cliente implementado, cree un agente, adjunte herramientas y elija entre la ejecución de streaming y streaming.

Creación y ejecución básicas del agente

Una vez configurado un cliente de modelo, el siguiente paso consiste en crear agentes. Ambos marcos proporcionan abstracciones de agente similares, pero con diferentes comportamientos predeterminados y opciones de configuración.

AutoGen AssistantAgent

from autogen_agentchat.agents import AssistantAgent

agent = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message="You are a helpful assistant.",
    tools=[my_tool],
    max_tool_iterations=1  # Single-turn by default
)

# Execution
result = await agent.run(task="What's the weather?")

Agent Framework ChatAgent

from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient

# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

@ai_function
def get_time() -> str:
    """Get current time."""
    return "Current time: 2:30 PM"

# Create client
client = OpenAIChatClient(model_id="gpt-5")

async def example():
    # Direct creation
    agent = ChatAgent(
        name="assistant",
        chat_client=client,
        instructions="You are a helpful assistant.",
        tools=[get_weather]  # Multi-turn by default
    )

    # Factory method (more convenient)
    agent = client.create_agent(
        name="assistant",
        instructions="You are a helpful assistant.",
        tools=[get_weather]
    )

    # Execution with runtime tool configuration
    result = await agent.run(
        "What's the weather?",
        tools=[get_time],  # Can add tools at runtime
        tool_choice="auto"
    )

Diferencias clave:

  • Comportamiento predeterminado: ChatAgent itera automáticamente a través de llamadas a herramientas, mientras que AssistantAgent requiere una configuración explícita max_tool_iterations
  • Configuración en tiempo de ejecución: ChatAgent.run() acepta parámetros tools y tool_choice para la personalización por invocación
  • Métodos de fábrica: Agent Framework proporciona métodos de fábrica prácticos directamente desde clientes de chat
  • Administración de estado: ChatAgent no tiene estado y no mantiene el historial de conversaciones entre invocaciones, a diferencia AssistantAgent de lo que mantiene el historial de conversaciones como parte de su estado.

Administración del estado de conversación con AgentThread

Para continuar las conversaciones con ChatAgent, use AgentThread para administrar el historial de conversaciones:

# Assume we have an agent from previous examples
async def conversation_example():
    # Create a new thread that will be reused
    thread = agent.get_new_thread()

    # First interaction - thread is empty
    result1 = await agent.run("What's 2+2?", thread=thread)
    print(result1.text)  # "4"

    # Continue conversation - thread contains previous messages
    result2 = await agent.run("What about that number times 10?", thread=thread)
    print(result2.text)  # "40" (understands "that number" refers to 4)

    # AgentThread can use external storage, similar to ChatCompletionContext in AutoGen

Sin estado de forma predeterminada: demostración rápida

# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text)  # for example, "4"

r2 = await agent.run("What about that number times 10?")
print(r2.text)  # Likely ambiguous without prior context; cannot be "40"

# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text)  # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text)  # "40"

Para ver ejemplos de administración de subprocesos, consulte:

Equivalencia del agente de OpenAI Assistant

Ambos marcos proporcionan integración de api de OpenAI Assistant:

# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient

Para ver ejemplos de OpenAI Assistant, consulte:

Compatibilidad con streaming

Ambos marcos transmiten tokens en tiempo real (desde clientes y agentes) para mantener la capacidad de respuesta de las INTERFACES de usuario.

AutoGen Streaming

# Model client streaming
async for chunk in client.create_stream(messages):
    if isinstance(chunk, str):
        print(chunk, end="")

# Agent streaming
async for event in agent.run_stream(task="Hello"):
    if isinstance(event, ModelClientStreamingChunkEvent):
        print(event.content, end="")
    elif isinstance(event, TaskResult):
        print("Final result received")

Streaming del marco de trabajo del agente

# Assume we have client, agent, and tools from previous examples
async def streaming_example():
    # Chat client streaming
    async for chunk in client.get_streaming_response("Hello", tools=tools):
        if chunk.text:
            print(chunk.text, end="")

    # Agent streaming
    async for chunk in agent.run_stream("Hello"):
        if chunk.text:
            print(chunk.text, end="", flush=True)

Sugerencia: En Agent Framework, los clientes y los agentes producen la misma forma de actualización; puede leer chunk.text en cualquier caso.

Tipos de mensaje y creación

Comprender cómo funcionan los mensajes es fundamental para una comunicación eficaz del agente. Ambos marcos proporcionan enfoques diferentes para la creación y control de mensajes, con AutoGen mediante clases de mensajes independientes y El marco de trabajo del agente mediante un sistema de mensajes unificado.

Tipos de mensajes AutoGen

from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage

# Text message
text_msg = TextMessage(content="Hello", source="user")

# Multi-modal message
multi_modal_msg = MultiModalMessage(
    content=["Describe this image", image_data],
    source="user"
)

# Convert to model format for use with model clients
user_message = text_msg.to_model_message()

Tipos de mensajes del marco de agente

from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64

# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")

# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"

# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
    role=Role.USER,
    contents=[
        TextContent(text="Describe this image"),
        DataContent(uri=image_uri, media_type="image/jpeg")
    ]
)

Diferencias clave:

  • AutoGen usa clases de mensaje independientes (TextMessage, MultiModalMessage) con un source campo
  • Agent Framework usa un unificado ChatMessage con objetos de contenido con tipo y un role campo
  • Los mensajes de Agent Framework usan Role enumeración (USER, ASSISTANT, SYSTEM, TOOL) en lugar de orígenes de cadena

Creación e integración de herramientas

Las herramientas amplían las funcionalidades del agente más allá de la generación de texto. Los marcos toman diferentes enfoques para la creación de herramientas, con Agent Framework que proporciona una generación de esquemas más automatizada.

AutoGen FunctionTool

from autogen_core.tools import FunctionTool

async def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Manual tool creation
tool = FunctionTool(
    func=get_weather,
    description="Get weather information"
)

# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])

Marco de trabajo del agente @ai_function

from agent_framework import ai_function
from typing import Annotated
from pydantic import Field

@ai_function
def get_weather(
    location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])

Para obtener ejemplos detallados, consulte:

Herramientas hospedadas (exclusivo de Agent Framework)

Agent Framework proporciona herramientas hospedadas que no están disponibles en AutoGen:

from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient

# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")

# Code execution tool
code_tool = HostedCodeInterpreterTool()

# Web search tool
search_tool = HostedWebSearchTool()

agent = ChatAgent(
    name="researcher",
    chat_client=client,
    tools=[code_tool, search_tool]
)

Para obtener ejemplos detallados, consulte:

Requisitos y advertencias:

  • Las herramientas hospedadas solo están disponibles en modelos o cuentas que las admitan. Compruebe los derechos y la compatibilidad del modelo con el proveedor antes de habilitar estas herramientas.
  • La configuración difiere según el proveedor; siga los requisitos previos de cada ejemplo para la configuración y los permisos.
  • No todos los modelos admiten todas las herramientas hospedadas (por ejemplo, búsqueda web frente al intérprete de código). Elija un modelo compatible en su entorno.

Nota:

AutoGen admite herramientas de ejecución de código local, pero esta característica está planeada para futuras versiones de Agent Framework.

Diferencia clave: Agent Framework controla automáticamente la iteración de herramientas en el nivel de agente. A diferencia del parámetro de AutoGen, los agentes de max_tool_iterations Agent Framework continúan la ejecución de herramientas hasta la finalización de forma predeterminada, con mecanismos de seguridad integrados para evitar bucles infinitos.

Compatibilidad con el servidor MCP

Para la integración avanzada de herramientas, ambos marcos admiten el Protocolo de contexto de modelo (MCP), lo que permite a los agentes interactuar con los servicios externos y los orígenes de datos. Agent Framework proporciona compatibilidad integrada más completa.

Compatibilidad con MCP de AutoGen

AutoGen tiene compatibilidad básica con MCP a través de extensiones (los detalles de implementación específicos varían según la versión).

Compatibilidad con MCP de Agent Framework

from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

# Stdio MCP server
mcp_tool = MCPStdioTool(
    name="filesystem",
    command="uvx mcp-server-filesystem",
    args=["/allowed/directory"]
)

# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
    name="http_mcp",
    url="http://localhost:8000/sse"
)

# WebSocket MCP
ws_mcp = MCPWebsocketTool(
    name="websocket_mcp",
    url="ws://localhost:8000/ws"
)

agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])

Para ver ejemplos de MCP, consulte:

Patrón de agente como herramienta

Un patrón eficaz es usar agentes como herramientas, lo que permite arquitecturas de agente jerárquicas. Ambos marcos admiten este patrón con diferentes implementaciones.

AutoGen AgentTool

from autogen_agentchat.tools import AgentTool

# Create specialized agent
writer = AssistantAgent(
    name="writer",
    model_client=client,
    system_message="You are a creative writer."
)

# Wrap as tool
writer_tool = AgentTool(agent=writer)

# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
    model="gpt-5",
    parallel_tool_calls=False
)
coordinator = AssistantAgent(
    name="coordinator",
    model_client=coordinator_client,
    tools=[writer_tool]
)

Agent Framework as_tool()

from agent_framework import ChatAgent

# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
    name="writer",
    chat_client=client,
    instructions="You are a creative writer."
)

# Convert to tool
writer_tool = writer.as_tool(
    name="creative_writer",
    description="Generate creative content",
    arg_name="request",
    arg_description="What to write"
)

# Use in coordinator
coordinator = ChatAgent(
    name="coordinator",
    chat_client=client,
    tools=[writer_tool]
)

Nota de migración explícita: en AutoGen, establezca parallel_tool_calls=False en el cliente del modelo del coordinador al encapsular agentes como herramientas para evitar problemas de simultaneidad al invocar la misma instancia del agente. En Agent Framework, as_tool() no requiere deshabilitar las llamadas a herramientas paralelas, ya que los agentes no tienen estado de forma predeterminada.

Middleware (característica del marco de agente)

Agent Framework presenta funcionalidades de middleware que AutoGen carece. El middleware permite problemas transversales eficaces, como el registro, la seguridad y la supervisión del rendimiento.

from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable

# Assume we have client from previous examples
async def logging_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
    print(f"Agent {context.agent.name} starting")
    await next(context)
    print(f"Agent {context.agent.name} completed")

async def security_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
    if "password" in str(context.arguments):
        print("Blocking function call with sensitive data")
        return  # Don't call next()
    await next(context)

agent = ChatAgent(
    name="secure_agent",
    chat_client=client,
    middleware=[logging_middleware, security_middleware]
)

Ventajas:

  • Seguridad: validación de entrada y filtrado de contenido
  • Observabilidad: registro, métricas y seguimiento
  • Rendimiento: almacenamiento en caché y limitación de velocidad
  • Control de errores: degradación correcta y lógica de reintento

Para obtener ejemplos detallados de middleware, consulte:

Agentes personalizados

A veces no quiere un agente respaldado por modelos, sino que quiere un agente con respaldo de API o determinista con lógica personalizada. Ambos marcos admiten la creación de agentes personalizados, pero los patrones difieren.

AutoGen: Subclase BaseChatAgent

from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken

class StaticAgent(BaseChatAgent):
    def __init__(self, name: str = "static", description: str = "Static responder") -> None:
        super().__init__(name, description)

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:  # Which message types this agent produces
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # Always return a static response
        return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))

Notes:

  • Implemente on_messages(...) y devuelva un Response con un mensaje de chat.
  • Opcionalmente, implemente on_reset(...) para borrar el estado interno entre ejecuciones.

Marco del agente: extender BaseAgent (compatible con subprocesos)

from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
    AgentRunResponse,
    AgentRunResponseUpdate,
    AgentThread,
    BaseAgent,
    ChatMessage,
    Role,
    TextContent,
)

class StaticAgent(BaseAgent):
    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        # Build a static reply
        reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])

        # Persist conversation to the provided AgentThread (if any)
        if thread is not None:
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

        return AgentRunResponse(messages=[reply])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        # Stream the same static response in a single chunk for simplicity
        yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)

        # Notify thread of input and the complete response once streaming ends
        if thread is not None:
            reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

Notes:

  • AgentThreadmantiene el estado de conversación externamente; use agent.get_new_thread() y páselo arun/run_stream .
  • Llame self._notify_thread_of_new_messages(thread, input_messages, response_messages) a para que el subproceso tenga ambos lados del intercambio.
  • Consulte el ejemplo completo: Agente personalizado

A continuación, echemos un vistazo a la orquestación multiagente, el área en la que los marcos difieren más.

Asignación de características multiagente

Información general sobre el modelo de programación

Los modelos de programación multiagente representan la diferencia más significativa entre los dos marcos.

Enfoque de modelo dual de AutoGen

AutoGen proporciona dos modelos de programación:

  1. autogen-core: programación basada en eventos de bajo nivel con RoutedAgent suscripciones de mensajes y
  2. Team abstracción: modelo de alto nivel centrado en la ejecución basado en la base de autogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
        # Handle specific message types
        pass

# High-level Team (easier but limited)
team = RoundRobinGroupChat(
    participants=[agent1, agent2],
    termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")

Desafíos:

  • El modelo de bajo nivel es demasiado complejo para la mayoría de los usuarios
  • El modelo de alto nivel puede convertirse en una limitación para comportamientos complejos
  • El puente entre los dos modelos agrega complejidad de implementación

Modelo de flujo de trabajo unificado de Agent Framework

Agent Framework proporciona una única Workflow abstracción que combina lo mejor de ambos enfoques:

from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
    response = await agent1.run(input_msg)
    await ctx.send_message(response.text)

@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
    response = await agent2.run(input_msg)
    await ctx.yield_output(response.text)  # Final output

# Build typed data flow graph
workflow = (WorkflowBuilder()
           .add_edge(agent1_executor, agent2_executor)
           .set_start_executor(agent1_executor)
           .build())

# Example usage (would be in async context)
# result = await workflow.run("Initial input")

Para obtener ejemplos detallados de flujo de trabajo, consulte:

Ventajas:

  • Modelo unificado: abstracción única para todos los niveles de complejidad
  • Seguridad de tipos: entradas y salidas fuertemente tipadas
  • Visualización de grafos: borrar la representación del flujo de datos
  • Composición flexible: mezcla de agentes, funciones y sub flujos de trabajo

Flujo de trabajo frente a GraphFlow

La abstracción de Workflow Agent Framework está inspirada en la característica experimental GraphFlow de AutoGen, pero representa una evolución significativa en la filosofía de diseño:

  • GraphFlow: flujo de control basado en el que los bordes son transiciones y los mensajes se transmiten a todos los agentes; las transiciones están condicionadas en el contenido del mensaje transmitido
  • Flujo de trabajo: flujo de datos basado en el que los mensajes se enrutan a través de bordes y ejecutores específicos se activan mediante bordes, con compatibilidad con la ejecución simultánea.

Información general del objeto visual

El diagrama siguiente contrasta el flujo de control de GraphFlow (izquierda) de AutoGen con el flujo de trabajo de flujo de datos (derecha) de Agent Framework. GraphFlow modela agentes como nodos con transiciones condicionales y difusiones. Ejecutores de modelos de flujo de trabajo (agentes, funciones o sub flujos de trabajo) conectados por bordes tipados; también admite pausas de solicitud/respuesta y puntos de control.

flowchart LR

  subgraph AutoGenGraphFlow
    direction TB
    U[User / Task] --> A[Agent A]
    A -->|success| B[Agent B]
    A -->|retry| C[Agent C]
    A -. broadcast .- B
    A -. broadcast .- C
  end

  subgraph AgentFrameworkWorkflow
    direction TB
    I[Input] --> E1[Executor 1]
    E1 -->|"str"| E2[Executor 2]
    E1 -->|"image"| E3[Executor 3]
    E3 -->|"str"| E2
    E2 --> OUT[(Final Output)]
  end

  R[Request / Response Gate]
  E2 -. request .-> R
  R -. resume .-> E2

  CP[Checkpoint]
  E1 -. save .-> CP
  CP -. load .-> E1

En la práctica:

  • GraphFlow usa agentes como nodos y transmite mensajes; los bordes representan transiciones condicionales.
  • Las rutas de flujo de trabajo escriben mensajes a lo largo de los bordes. Los nodos (ejecutores) pueden ser agentes, funciones puras o sub flujos de trabajo.
  • Solicitud/respuesta permite pausar un flujo de trabajo para la entrada externa; la creación de puntos de control conserva el progreso y habilita la reanudación.

Comparación de código

1) Secuencial + Condicional
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow

writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)

graph = (
    DiGraphBuilder()
    .add_node(writer).add_node(reviewer).add_node(editor)
    .add_edge(writer, reviewer)  # always
    .add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
    .set_entry_point(writer)
).build()

team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"Draft: {task}")

@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
    decision = "approve" if "solar" in draft.lower() else "revise"
    await ctx.send_message(f"{decision}:{draft}")

@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    if msg.startswith("approve:"):
        await ctx.yield_output(msg.split(":", 1)[1])
    else:
        await ctx.yield_output("Needs revision")

workflow_seq = (
    WorkflowBuilder()
    .add_edge(writer_exec, reviewer_exec)
    .add_edge(reviewer_exec, editor_exec)
    .set_start_executor(writer_exec)
    .build()
)
2) Fan-out + Join (ALL vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d

# ALL (default): D runs after both B and C
g_all = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D).add_edge(C, D)
    .set_entry_point(A)
).build()

# ANY: D runs when either B or C completes
g_any = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D, activation_group="join_d", activation_condition="any")
    .add_edge(C, D, activation_group="join_d", activation_condition="any")
    .set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B:{task}", target_id="B")
    await ctx.send_message(f"C:{task}", target_id="C")

@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B_done:{text}")

@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"C_done:{text}")

@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"First: {msg}")  # ANY join (first arrival)

@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
    state = await ctx.get_executor_state() or {"items": []}
    state["items"].append(msg)
    await ctx.set_executor_state(state)
    if len(state["items"]) >= 2:
        await ctx.yield_output(" | ".join(state["items"]))  # ALL join

wf_any = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_any).add_edge(branch_c, join_any)
    .set_start_executor(start)
    .build()
)

wf_all = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_all).add_edge(branch_c, join_all)
    .set_start_executor(start)
    .build()
)
3) Enrutamiento dirigido (sin difusión)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
    # Route selectively using target_id
    if task.startswith("image:"):
        await ctx.send_message(task.removeprefix("image:"), target_id="vision")
    else:
        await ctx.send_message(task, target_id="writer")

@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Draft: {text}")

@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Caption: {image_ref}")

workflow = (
    WorkflowBuilder()
    .add_edge(ingest, write)
    .add_edge(ingest, caption)
    .set_start_executor(ingest)
    .build()
)

# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")

Qué se debe observar:

  • GraphFlow difunde mensajes y usa transiciones condicionales. El comportamiento de combinación se configura a través del lado de activation destino y por borde activation_group/activation_condition(por ejemplo, agrupa ambos bordes en join_d con ).activation_condition="any"
  • El flujo de trabajo enruta los datos explícitamente; use target_id para seleccionar ejecutores de bajada. El comportamiento de combinación reside en el ejecutor receptor (por ejemplo, el rendimiento de la primera entrada frente a esperar a todos) o a través de generadores o agregadores de orquestación.
  • Los ejecutores del flujo de trabajo son de forma libre: encapsulan un ChatAgent, una función o un sub-flujo de trabajo y los mezclan en el mismo gráfico.

Diferencias clave

En la tabla siguiente se resumen las diferencias fundamentales entre graphFlow de AutoGen y el flujo de trabajo de Agent Framework:

Aspecto AutoGen GraphFlow Flujo de trabajo del marco de trabajo del agente
Tipo de flujo Flujo de control (los bordes son transiciones) Flujo de datos (mensajes de ruta de bordes)
Tipos de nodo Solo agentes Agentes, funciones, sub-flujos de trabajo
Activación Difusión de mensajes Activación basada en edge
Seguridad de tipos Limitado Escritura segura a lo largo de todo
Capacidad de redacción Limitado Altamente composable

Patrones de anidamiento

Anidamiento de equipos de AutoGen

# Inner team
inner_team = RoundRobinGroupChat(
    participants=[specialist1, specialist2],
    termination_condition=StopAfterNMessages(3)
)

# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
    participants=[coordinator, inner_team, reviewer],  # Team as participant
    termination_condition=StopAfterNMessages(10)
)

# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")

Características de anidamiento de AutoGen:

  • El equipo anidado recibe todos los mensajes del equipo externo
  • Los mensajes del equipo anidados se transmiten a todos los participantes externos del equipo
  • Contexto de mensaje compartido en todos los niveles

Anidamiento de flujo de trabajo del marco de agente

from agent_framework import WorkflowExecutor, WorkflowBuilder

# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor

# Create sub-workflow
sub_workflow = (WorkflowBuilder()
               .add_edge(specialist1_executor, specialist2_executor)
               .set_start_executor(specialist1_executor)
               .build())

# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
    workflow=sub_workflow,
    id="sub_process"
)

# Use in parent workflow
parent_workflow = (WorkflowBuilder()
                  .add_edge(coordinator_executor, sub_workflow_executor)
                  .add_edge(sub_workflow_executor, reviewer_executor)
                  .set_start_executor(coordinator_executor)
                  .build())

Características de anidamiento del marco de agente:

  • Entrada y salida aisladas a través de WorkflowExecutor
  • Sin difusión de mensajes: los datos fluyen a través de conexiones específicas
  • Administración de estado independiente para cada nivel de flujo de trabajo

Patrones de chat en grupo

Los patrones de chat de grupo permiten que varios agentes colaboren en tareas complejas. Este es el modo en que los patrones comunes se traducen entre marcos de trabajo.

Patrón RoundRobinGroupChat

Implementación de AutoGen:

from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages

team = RoundRobinGroupChat(
    participants=[agent1, agent2, agent3],
    termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")

Implementación del marco de trabajo del agente:

from agent_framework import SequentialBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()

# Example usage (would be in async context)
async def sequential_example():
    # Each agent appends to shared conversation
    async for event in workflow.run_stream("Discuss this topic"):
        if isinstance(event, WorkflowOutputEvent):
            conversation_history = event.data  # list[ChatMessage]

Para obtener ejemplos detallados de orquestación, consulte:

En el caso de los patrones de ejecución simultáneos, Agent Framework también proporciona:

from agent_framework import ConcurrentBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
           .participants([agent1, agent2, agent3])
           .build())

# Example usage (would be in async context)
async def concurrent_example():
    # All agents process the input concurrently
    async for event in workflow.run_stream("Process this in parallel"):
        if isinstance(event, WorkflowOutputEvent):
            results = event.data  # Combined results from all agents

Para ver ejemplos de ejecución simultánea, consulte:

Patrón MagenticOneGroupChat

Implementación de AutoGen:

from autogen_agentchat.teams import MagenticOneGroupChat

team = MagenticOneGroupChat(
    participants=[researcher, coder, executor],
    model_client=coordinator_client,
    termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")

Implementación del marco de trabajo del agente:

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    ChatMessage,
    MagenticBuilder,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create a manager agent for orchestration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator that coordinates the workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    chat_client=OpenAIChatClient(),
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher, coder=coder)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=20,
        max_stall_count=3,
        max_reset_count=2,
    )
    .build()
)

# Example usage (would be in async context)
async def magentic_example():
    output: str | None = None
    async for event in workflow.run_stream("Complex research task"):
        if isinstance(event, AgentRunUpdateEvent):
            props = event.data.additional_properties if event.data else None
            event_type = props.get("magentic_event_type") if props else None

            if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
                text = event.data.text if event.data else ""
                print(f"[ORCHESTRATOR]: {text}")
            elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
                agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
                if event.data and event.data.text:
                    print(f"[{agent_id}]: {event.data.text}", end="")

        elif isinstance(event, WorkflowOutputEvent):
            output_messages = cast(list[ChatMessage], event.data)
            if output_messages:
                output = output_messages[-1].text

Opciones de personalización del marco de agente:

El flujo de trabajo de Magentic proporciona amplias opciones de personalización:

  • Configuración del administrador: use un ChatAgent con instrucciones personalizadas y la configuración del modelo
  • Límites de redondeo: max_round_count, max_stall_count, max_reset_count
  • Streaming de eventos: uso AgentRunUpdateEvent con magentic_event_type metadatos
  • Especialización del agente: instrucciones y herramientas personalizadas por agente
  • Human-in-the-loop: revisión del plan, aprobación de herramientas y intervención de parada
# Advanced customization example with human-in-the-loop
from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    MagenticBuilder,
    MagenticHumanInterventionDecision,
    MagenticHumanInterventionKind,
    MagenticHumanInterventionReply,
    MagenticHumanInterventionRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create manager agent with custom configuration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator for complex tasks",
    instructions="Custom orchestration instructions...",
    chat_client=OpenAIChatClient(model_id="gpt-4o"),
)

workflow = (
    MagenticBuilder()
    .participants(
        researcher=researcher_agent,
        coder=coder_agent,
        analyst=analyst_agent,
    )
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=15,      # Limit total rounds
        max_stall_count=2,       # Trigger stall handling
        max_reset_count=1,       # Allow one reset on failure
    )
    .with_plan_review()           # Enable human plan review
    .with_human_input_on_stall()  # Enable human intervention on stalls
    .build()
)

# Handle human intervention requests during execution
async for event in workflow.run_stream("Complex task"):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)
        if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
            # Review and approve the plan
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.APPROVE
            )
            async for ev in workflow.send_responses_streaming({event.request_id: reply}):
                pass  # Handle continuation

Para obtener ejemplos detallados de Magentic, consulte:

Patrones futuros

La hoja de ruta de Agent Framework incluye varios patrones de AutoGen actualmente en desarrollo:

  • Patrón de enjambre: coordinación de agentes basado en entrega
  • SelectorGroupChat: selección del altavoz controlado por LLM

Human-in-the-Loop with Request Response

Una nueva característica clave de Agent Framework Workflow es el concepto de solicitud y respuesta, lo que permite a los flujos de trabajo pausar la ejecución y esperar a la entrada externa antes de continuar. Esta funcionalidad no está presente en la abstracción de Team AutoGen y permite sofisticados patrones humanos en el bucle.

Limitaciones de AutoGen

La abstracción de Team AutoGen se ejecuta continuamente una vez iniciada y no proporciona mecanismos integrados para pausar la ejecución de la entrada humana. Cualquier funcionalidad humana en bucle requiere implementaciones personalizadas fuera del marco.

Agent Framework Request-Response API

Agent Framework proporciona funcionalidades integradas de solicitud-respuesta en las que cualquier ejecutor puede enviar solicitudes mediante ctx.request_info() y controlar las respuestas con el @response_handler decorador.

from agent_framework import (
    RequestInfoEvent, WorkflowBuilder, WorkflowContext, 
    Executor, handler, response_handler
)
from dataclasses import dataclass

# Assume we have agent_executor defined elsewhere

# Define typed request payload
@dataclass
class ApprovalRequest:
    """Request human approval for agent output."""
    content: str = ""
    agent_name: str = ""

# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
    
    @handler
    async def review_content(
        self,
        agent_response: str,
        ctx: WorkflowContext
    ) -> None:
        # Request human input with structured data
        approval_request = ApprovalRequest(
            content=agent_response,
            agent_name="writer_agent"
        )
        await ctx.request_info(request_data=approval_request, response_type=str)
    
    @response_handler
    async def handle_approval_response(
        self,
        original_request: ApprovalRequest,
        decision: str,
        ctx: WorkflowContext
    ) -> None:
        decision_lower = decision.strip().lower()
        original_content = original_request.content

        if decision_lower == "approved":
            await ctx.yield_output(f"APPROVED: {original_content}")
        else:
            await ctx.yield_output(f"REVISION NEEDED: {decision}")

# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")

workflow = (WorkflowBuilder()
           .add_edge(agent_executor, reviewer)
           .set_start_executor(agent_executor)
           .build())

Ejecución de flujos de trabajo humanos en bucle

Agent Framework proporciona API de streaming para controlar el ciclo de pausa y reanudación:

from agent_framework import RequestInfoEvent, WorkflowOutputEvent

# Assume we have workflow defined from previous examples
async def run_with_human_input():
    pending_responses = None
    completed = False

    while not completed:
        # First iteration uses run_stream, subsequent use send_responses_streaming
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("initial input")
        )

        events = [event async for event in stream]
        pending_responses = None

        # Collect human requests and outputs
        for event in events:
            if isinstance(event, RequestInfoEvent):
                # Display request to human and collect response
                request_data = event.data  # ApprovalRequest instance
                print(f"Review needed: {request_data.content}")

                human_response = input("Enter 'approved' or revision notes: ")
                pending_responses = {event.request_id: human_response}

            elif isinstance(event, WorkflowOutputEvent):
                print(f"Final result: {event.data}")
                completed = True

Para ver ejemplos de flujo de trabajo de human-in-the-loop, consulte:

Puntos de control y reanudación de flujos de trabajo

Otra ventaja clave de la abstracción de Workflow Agent Framework sobre la abstracción de Team AutoGen es la compatibilidad integrada con puntos de control y reanudación de la ejecución. Esto permite pausar, conservar y reanudar flujos de trabajo más adelante desde cualquier punto de control, lo que proporciona tolerancia a errores y habilita flujos de trabajo de larga duración o asincrónicos.

Limitaciones de AutoGen

La abstracción de Team AutoGen no proporciona funcionalidades de punto de control integradas. Cualquier mecanismo de persistencia o recuperación debe implementarse externamente, a menudo requiere una lógica compleja de administración de estado y serialización.

Puntos de control del marco de agente

Agent Framework proporciona un punto de control completo a través FileCheckpointStorage de y el with_checkpointing() método en WorkflowBuilder. Captura de puntos de control:

  • Estado del ejecutor: estado local para cada ejecutor mediante ctx.set_executor_state()
  • Estado compartido: estado entre ejecutores mediante ctx.set_shared_state()
  • Colas de mensajes: mensajes pendientes entre ejecutores
  • Posición del flujo de trabajo: progreso de la ejecución actual y pasos siguientes
from agent_framework import (
    FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
    Executor, handler
)
from typing_extensions import Never

class ProcessingExecutor(Executor):
    @handler
    async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
        # Process the data
        result = f"Processed: {data.upper()}"
        print(f"Processing: '{data}' -> '{result}'")

        # Persist executor-local state
        prev_state = await ctx.get_executor_state() or {}
        count = prev_state.get("count", 0) + 1
        await ctx.set_executor_state({
            "count": count,
            "last_input": data,
            "last_output": result
        })

        # Persist shared state for other executors
        await ctx.set_shared_state("original_input", data)
        await ctx.set_shared_state("processed_output", result)

        await ctx.send_message(result)

class FinalizeExecutor(Executor):
    @handler
    async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
        result = f"Final: {data}"
        await ctx.yield_output(result)

# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")

# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
           .add_edge(processing_executor, finalize_executor)
           .set_start_executor(processing_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
           .build())

# Example usage (would be in async context)
async def checkpoint_example():
    # Run workflow - checkpoints are created automatically
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")

Reanudación desde puntos de control

Agent Framework proporciona API para enumerar, inspeccionar y reanudar desde puntos de comprobación específicos:

from typing_extensions import Never

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    WorkflowContext,
    WorkflowBuilder,
    get_checkpoint_summary,
    handler,
)

class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        result = text.upper()
        await ctx.send_message(result)

class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
        result = text[::-1]
        await ctx.yield_output(result)

def create_workflow(checkpoint_storage: FileCheckpointStorage):
    """Create a workflow with two executors and checkpointing."""
    upper_executor = UpperCaseExecutor(id="upper")
    reverse_executor = ReverseExecutor(id="reverse")

    return (WorkflowBuilder()
           .add_edge(upper_executor, reverse_executor)
           .set_start_executor(upper_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)
           .build())

# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

async def checkpoint_resume_example():
    # List available checkpoints
    checkpoints = await checkpoint_storage.list_checkpoints()

    # Display checkpoint information
    for checkpoint in checkpoints:
        summary = get_checkpoint_summary(checkpoint)
        print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")

    # Resume from a specific checkpoint
    if checkpoints:
        chosen_checkpoint_id = checkpoints[0].checkpoint_id

        # Create new workflow instance and resume
        new_workflow = create_workflow(checkpoint_storage)
        async for event in new_workflow.run_stream(
            checkpoint_id=chosen_checkpoint_id,
            checkpoint_storage=checkpoint_storage
        ):
            print(f"Resumed event: {event}")

Características avanzadas de punto de control

Punto de control con integración de human-in-the-loop:

La creación de puntos de comprobación funciona perfectamente con flujos de trabajo humanos en bucle, lo que permite pausar los flujos de trabajo para la entrada humana y reanudarlos más adelante. Al reanudar desde un punto de control que contiene solicitudes pendientes, esas solicitudes se volverán a emitir como eventos:

# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
    # Resume from checkpoint - pending requests will be re-emitted
    request_info_events = []
    async for event in workflow.run_stream(
        checkpoint_id=checkpoint_id,
        checkpoint_storage=checkpoint_storage
    ):
        if isinstance(event, RequestInfoEvent):
            request_info_events.append(event)

    # Handle re-emitted pending request
    responses = {}
    for event in request_info_events:
        response = handle_request(event.data)
        responses[event.request_id] = response

    # Send response back to workflow
    async for event in workflow.send_responses_streaming(responses):
        print(f"Event: {event}")

Ventajas clave

En comparación con AutoGen, el punto de comprobación de Agent Framework proporciona:

  • Persistencia automática: no se requiere administración de estado manual
  • Recuperación granular: reanudar desde cualquier límite de superpaso
  • Aislamiento de estado: independiente del estado local y compartido del ejecutor
  • Integración de human-in-the-loop: pausa de conexión directa con entrada humana
  • Tolerancia a errores: recuperación sólida de errores o interrupciones

Ejemplos prácticos

Para obtener ejemplos completos de puntos de comprobación, consulte:


Observability

Tanto AutoGen como Agent Framework proporcionan funcionalidades de observabilidad, pero con diferentes enfoques y características.

Observabilidad de AutoGen

AutoGen tiene compatibilidad nativa con OpenTelemetry con instrumentación para:

  • Seguimiento en tiempo de ejecución: SingleThreadedAgentRuntime y GrpcWorkerAgentRuntime
  • Ejecución de herramientas: BaseTool con execute_tool intervalos siguientes convenciones semánticas de GenAI
  • Operaciones del agente: BaseChatAgent con create_agent intervalos y invoke_agent
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime

# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)

# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)

Observabilidad del marco de agente

Agent Framework proporciona una observabilidad completa a través de varios enfoques:

  • Configuración de código cero: instrumentación automática a través de variables de entorno
  • Configuración manual: configuración mediante programación con parámetros personalizados
  • Telemetría enriquecida: agentes, flujos de trabajo y seguimiento de la ejecución de herramientas
  • Salida de la consola: registro y visualización de la consola integrada
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient

# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317

# Or manual setup
setup_observability(
    otlp_endpoint="http://localhost:4317"
)

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

async def observability_example():
    # Observability is automatically applied to all agents and workflows
    agent = ChatAgent(name="assistant", chat_client=client)
    result = await agent.run("Hello")  # Automatically traced

Diferencias clave:

  • Complejidad de la instalación: Agent Framework ofrece opciones más sencillas de configuración de código cero
  • Ámbito: Agent Framework proporciona una cobertura más amplia, incluida la observabilidad de nivel de flujo de trabajo.
  • Visualización: Agent Framework incluye la salida y la interfaz de usuario de desarrollo de la consola integrada
  • Configuración: Agent Framework ofrece opciones de configuración más flexibles

Para obtener ejemplos detallados de observabilidad, consulte:


Conclusión

Esta guía de migración proporciona una asignación completa entre AutoGen y Microsoft Agent Framework, que abarca todo, desde la creación básica de agentes hasta flujos de trabajo complejos de varios agentes. Conclusiones clave para la migración:

  • La migración de un solo agente es sencilla, con API similares y funcionalidades mejoradas en Agent Framework
  • Los patrones de varios agentes requieren replantearse el enfoque de las arquitecturas basadas en eventos a las arquitecturas basadas en flujo de datos, pero si ya está familiarizado con GraphFlow, la transición será más fácil.
  • Agent Framework ofrece características adicionales, como middleware, herramientas hospedadas y flujos de trabajo tipados.

Para obtener ejemplos adicionales e instrucciones de implementación detalladas, consulte el directorio ejemplos de Agent Framework .

Categorías de ejemplo adicionales

Agent Framework proporciona ejemplos en otras áreas importantes:

  • Subprocesos: ejemplos de subprocesos : administración del estado y el contexto de la conversación
  • Entrada bidireccional: muestras bidireccionales : trabajar con imágenes y otros tipos de medios
  • Proveedores de contexto: ejemplos de proveedores de contexto: patrones de integración de contexto externo

Pasos siguientes