Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Ein umfassendes Handbuch für die Migration von AutoGen zum Microsoft Agent Framework Python SDK.
Inhaltsverzeichnis
- Hintergrund
- Wichtige Ähnlichkeiten und Unterschiede
- Modellclienterstellung und -konfiguration
-
Single-Agent Featurezuordnung
- Grundlegende Agent-Erstellung und -Ausführung
- Verwalten des Unterhaltungsstatus mit AgentThread
- OpenAI Assistant Agent Equivalence
- Streamingunterstützung
- Nachrichtentypen und Erstellung
- Toolerstellung und -integration
- Gehostete Tools (Agent Framework Exklusiv)
- MCP-Serverunterstützung
- Agent-as-a-Tool-Muster
- Middleware (Agent Framework-Feature)
- Benutzerdefinierte Agents
- Zuordnung von Multi-Agent-Features
- Einblick
- Schlussfolgerung
Hintergrund
AutoGen ist ein Framework zum Erstellen von KI-Agents und Multi-Agent-Systemen mit großen Sprachmodellen (LLMs). Es begann als Forschungsprojekt bei Microsoft Research und hat mehrere Konzepte in der Multi-Agent-Orchestrierung, z. B. GroupChat und ereignisgesteuerte Agent-Laufzeit, ins Leben gerufen. Das Projekt war eine fruchtbare Zusammenarbeit der Open-Source-Community, und viele wichtige Features stammen von externen Mitwirkenden.
Microsoft Agent Framework ist ein neues mehrsprachiges SDK zum Erstellen von KI-Agents und Workflows mithilfe von LLMs. Es stellt eine bedeutende Weiterentwicklung der Ideen dar, die in AutoGen pionieriert wurden, und beinhaltet Erkenntnisse aus der praxisnahen Nutzung. Es wird von den wichtigsten AutoGen- und Semantic Kernel-Teams bei Microsoft entwickelt und ist eine neue Grundlage für die zukünftige Entwicklung von KI-Anwendungen.
Dieser Leitfaden beschreibt einen praktischen Migrationspfad: Er beginnt mit dem, was gleich bleibt und was sich auf einen Blick ändert. Anschließend werden modellbasierte Clienteinrichtung, Einzel-Agent-Features und schließlich Multi-Agent-Orchestrierung mit konkretem Code nebeneinander behandelt. Links zu ausführungsfähigen Beispielen im Agent Framework-Repository helfen Ihnen dabei, jeden Schritt zu überprüfen.
Wichtige Ähnlichkeiten und Unterschiede
Was bleibt gleich
Die Grundlagen sind vertraut. Sie erstellen weiterhin Agents um einen Modellclient, stellen Anweisungen bereit und fügen Tools an. Beide Bibliotheken unterstützen Funktionsstiltools, Tokenstreaming, multimodale Inhalte und asynchrone E/A.
# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")
# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")
Zentrale Unterschiede
Orchestrierungsstil: AutoGen koppelt einen ereignisgesteuerten Kern mit einem übergeordneten
Team. Das Agent-Framework zentriert sich auf einem typierten, graphbasiertenWorkflowDiagramm, das Daten entlang der Kanten weiter leitet und Executoren aktiviert, wenn Eingaben bereit sind.Tools: AutoGen umschließt Funktionen mit
FunctionTool. Agent Framework verwendet@ai_functionschemas automatisch, leitet Schemas ab und fügt gehostete Tools wie z. B. einen Codedolmetscher und eine Websuche hinzu.Agent-Verhalten:
AssistantAgentist eine einzelne Drehung, es sei denn, Sie erhöhenmax_tool_iterations.ChatAgentist standardmäßig multidrehend und fordert Tools immer wieder auf, bis sie eine endgültige Antwort zurückgeben kann.Laufzeit: AutoGen bietet eingebettete und experimentelle verteilte Laufzeiten. Agent Framework konzentriert sich heute auf die Komposition mit einem einzigen Prozess; die verteilte Ausführung ist geplant.
Modellclienterstellung und -konfiguration
Beide Frameworks bieten Modellclients für wichtige KI-Anbieter mit ähnlichen, aber nicht identischen APIs.
| Merkmal | AutoGen | Agenten-Framework |
|---|---|---|
| OpenAI-Client | OpenAIChatCompletionClient |
OpenAIChatClient |
| OpenAI-Antwortclient | ❌ Nicht verfügbar | OpenAIResponsesClient |
| Azure OpenAI | AzureOpenAIChatCompletionClient |
AzureOpenAIChatClient |
| Azure OpenAI-Antworten | ❌ Nicht verfügbar | AzureOpenAIResponsesClient |
| Azure AI | AzureAIChatCompletionClient |
AzureAIAgentClient |
| Anthropic | AnthropicChatCompletionClient |
🚧 Geplant |
| Ollama | OllamaChatCompletionClient |
🚧 Geplant |
| Zwischenspeicherung |
ChatCompletionCache Umschlag |
🚧 Geplant |
AutoGen-Modellclients
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient
# OpenAI
client = OpenAIChatCompletionClient(
model="gpt-5",
api_key="your-key"
)
# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
azure_endpoint="https://your-endpoint.openai.azure.com/",
azure_deployment="gpt-5",
api_version="2024-12-01",
api_key="your-key"
)
Agent Framework ChatClients
from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient
# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")
# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")
Ausführliche Beispiele finden Sie unter:
- OpenAI-Chatclient – Grundlegendes OpenAI-Clientsetup
- Azure OpenAI-Chatclient – Azure OpenAI mit Authentifizierung
- Azure AI-Client – Integration des Azure AI-Agents
Unterstützung der Antwort-API (Agent-Framework exklusiv)
Agent Frameworks AzureOpenAIResponsesClient und OpenAIResponsesClient bieten spezielle Unterstützung für Gründemodelle und strukturierte Antworten, die in AutoGen nicht verfügbar sind:
from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient
# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")
# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")
Beispiele für die Antwort-API finden Sie unter:
- Azure Responses Client Basic – Azure OpenAI mit Antworten
- OpenAI Responses Client Basic – OpenAI-Antwortintegration
Single-Agent Featurezuordnung
In diesem Abschnitt werden Einzel-Agent-Features zwischen AutoGen und Agent Framework zugeordnet. Erstellen Sie mit einem Client einen Agent, fügen Sie Tools an, und wählen Sie zwischen Nicht-Streaming- und Streamingausführung aus.
Grundlegende Agent-Erstellung und -Ausführung
Nachdem Sie einen Modellclient konfiguriert haben, erstellt der nächste Schritt Agents. Beide Frameworks bieten ähnliche Agentabstraktionen, aber mit unterschiedlichen Standardverhaltens- und Konfigurationsoptionen.
AutoGen AssistantAgent
from autogen_agentchat.agents import AssistantAgent
agent = AssistantAgent(
name="assistant",
model_client=client,
system_message="You are a helpful assistant.",
tools=[my_tool],
max_tool_iterations=1 # Single-turn by default
)
# Execution
result = await agent.run(task="What's the weather?")
Agent Framework ChatAgent
from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient
# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
@ai_function
def get_time() -> str:
"""Get current time."""
return "Current time: 2:30 PM"
# Create client
client = OpenAIChatClient(model_id="gpt-5")
async def example():
# Direct creation
agent = ChatAgent(
name="assistant",
chat_client=client,
instructions="You are a helpful assistant.",
tools=[get_weather] # Multi-turn by default
)
# Factory method (more convenient)
agent = client.create_agent(
name="assistant",
instructions="You are a helpful assistant.",
tools=[get_weather]
)
# Execution with runtime tool configuration
result = await agent.run(
"What's the weather?",
tools=[get_time], # Can add tools at runtime
tool_choice="auto"
)
Wichtige Unterschiede:
-
Standardverhalten:
ChatAgentAutomatisches Durchlaufen von Toolaufrufen, erfordert jedochAssistantAgenteine explizitemax_tool_iterationsEinstellung -
Laufzeitkonfiguration:
ChatAgent.run()Akzeptierttoolsundtool_choiceParameter für anpassungsbezogene Aufrufe - Factorymethoden: Agent Framework bietet praktische Factorymethoden direkt von Chatclients
-
Zustandsverwaltung:
ChatAgentist zustandslos und verwaltet keine Unterhaltungshistorie zwischen Aufrufen, im Gegensatz dazuAssistantAgent, dass der Unterhaltungsverlauf im Rahmen seines Zustands beibehalten wird.
Verwalten des Unterhaltungsstatus mit AgentThread
Verwenden Sie die Verwendung ChatAgent zum Verwalten des Unterhaltungsverlaufs, um Unterhaltungen AgentThreadfortzusetzen:
# Assume we have an agent from previous examples
async def conversation_example():
# Create a new thread that will be reused
thread = agent.get_new_thread()
# First interaction - thread is empty
result1 = await agent.run("What's 2+2?", thread=thread)
print(result1.text) # "4"
# Continue conversation - thread contains previous messages
result2 = await agent.run("What about that number times 10?", thread=thread)
print(result2.text) # "40" (understands "that number" refers to 4)
# AgentThread can use external storage, similar to ChatCompletionContext in AutoGen
Statuslos standardmäßig: schnelle Demo
# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text) # for example, "4"
r2 = await agent.run("What about that number times 10?")
print(r2.text) # Likely ambiguous without prior context; cannot be "40"
# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text) # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text) # "40"
Beispiele für die Threadverwaltung finden Sie unter:
- Azure AI mit Thread – Verwaltung des Unterhaltungszustands
- OpenAI-Chatclient mit Thread – Threadverwendungsmuster
- Redis-backed Threads – Beibehalten des Unterhaltungszustands extern
OpenAI Assistant Agent Equivalence
Beide Frameworks bieten die Integration der OpenAI-Assistenten-API:
# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient
Beispiele für OpenAI-Assistenten finden Sie unter:
- OpenAI Assistants Basic – Einrichtung des Einfachen Assistenten
- OpenAI-Assistenten mit Funktionstools – Integration von benutzerdefinierten Tools
- Azure OpenAI Assistants Basic – Einrichtung des Azure-Assistenten
- OpenAI-Assistenten mit Thread - Threadverwaltung
Streamingunterstützung
Beide Frameworks streamen Token in Echtzeit – von Clients und von Agents – um UIs reaktionsfähig zu halten.
AutoGen Streaming
# Model client streaming
async for chunk in client.create_stream(messages):
if isinstance(chunk, str):
print(chunk, end="")
# Agent streaming
async for event in agent.run_stream(task="Hello"):
if isinstance(event, ModelClientStreamingChunkEvent):
print(event.content, end="")
elif isinstance(event, TaskResult):
print("Final result received")
Agent Framework Streaming
# Assume we have client, agent, and tools from previous examples
async def streaming_example():
# Chat client streaming
async for chunk in client.get_streaming_response("Hello", tools=tools):
if chunk.text:
print(chunk.text, end="")
# Agent streaming
async for chunk in agent.run_stream("Hello"):
if chunk.text:
print(chunk.text, end="", flush=True)
Tipp: Im Agent-Framework liefern Sowohl Clients als auch Agents das gleiche Update-Shape; Sie können in beiden Fällen lesen chunk.text .
Nachrichtentypen und Erstellung
Das Verständnis, wie Nachrichten funktionieren, ist für die effektive Agentkommunikation von entscheidender Bedeutung. Beide Frameworks bieten unterschiedliche Ansätze für die Nachrichtenerstellung und -behandlung, wobei AutoGen separate Nachrichtenklassen und Agent Framework mit einem einheitlichen Nachrichtensystem verwendet.
AutoGen-Nachrichtentypen
from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage
# Text message
text_msg = TextMessage(content="Hello", source="user")
# Multi-modal message
multi_modal_msg = MultiModalMessage(
content=["Describe this image", image_data],
source="user"
)
# Convert to model format for use with model clients
user_message = text_msg.to_model_message()
Agent Framework-Nachrichtentypen
from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64
# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")
# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"
# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
role=Role.USER,
contents=[
TextContent(text="Describe this image"),
DataContent(uri=image_uri, media_type="image/jpeg")
]
)
Wichtige Unterschiede:
- AutoGen verwendet separate Nachrichtenklassen (
TextMessage,MultiModalMessage) mit einemsourceFeld - Agent Framework verwendet ein Einheitliches
ChatMessagemit typierten Inhaltsobjekten und einemroleFeld. - Agent Framework-Nachrichten verwenden
RoleEnum (USER, ASSISTANT, SYSTEM, TOOL) anstelle von Zeichenfolgenquellen
Toolerstellung und -integration
Tools erweitern Agent-Funktionen über die Textgenerierung hinaus. Die Frameworks nutzen unterschiedliche Ansätze für die Toolerstellung, wobei Agent Framework eine automatisiertere Schemagenerierung bereitstellt.
AutoGen FunctionTool
from autogen_core.tools import FunctionTool
async def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Manual tool creation
tool = FunctionTool(
func=get_weather,
description="Get weather information"
)
# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])
Agent Framework @ai_function
from agent_framework import ai_function
from typing import Annotated
from pydantic import Field
@ai_function
def get_weather(
location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])
Ausführliche Beispiele finden Sie unter:
- OpenAI Chat Agent Basic - Einfacher OpenAI-Chat-Agent
- OpenAI mit Funktionstools – Agent mit benutzerdefinierten Tools
- Azure OpenAI Basic – Einrichtung des Azure OpenAI-Agents
Gehostete Tools (Agent Framework Exklusiv)
Agent Framework stellt gehostete Tools bereit, die in AutoGen nicht verfügbar sind:
from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient
# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")
# Code execution tool
code_tool = HostedCodeInterpreterTool()
# Web search tool
search_tool = HostedWebSearchTool()
agent = ChatAgent(
name="researcher",
chat_client=client,
tools=[code_tool, search_tool]
)
Ausführliche Beispiele finden Sie unter:
- Azure AI mit Codedolmetscher – Codeausführungstool
- Azure AI mit mehreren Tools – mehrere gehostete Tools
- OpenAI mit Web search - Web search integration
Anforderungen und Vorbehalte:
- Gehostete Tools sind nur für Modelle/Konten verfügbar, die sie unterstützen. Überprüfen Sie die Berechtigungen und Modellunterstützung für Ihren Anbieter, bevor Sie diese Tools aktivieren.
- Die Konfiguration unterscheidet sich von Anbieter; befolgen Sie die Voraussetzungen in den einzelnen Beispielen für Setup und Berechtigungen.
- Nicht jedes Modell unterstützt jedes gehostete Tool (z. B. Websuche und Codedolmetscher). Wählen Sie ein kompatibles Modell in Ihrer Umgebung aus.
Hinweis
AutoGen unterstützt lokale Codeausführungstools, dieses Feature ist jedoch für zukünftige Agent Framework-Versionen geplant.
Schlüsselunterschied: Das Agentframework verarbeitet die Tooliteration automatisch auf Agentebene. Im Gegensatz zum Parameter von max_tool_iterations AutoGen setzen Agent Framework-Agents die Toolausführung bis zum Abschluss standardmäßig mit integrierten Sicherheitsmechanismen fort, um endlose Schleifen zu verhindern.
MCP-Serverunterstützung
Für die erweiterte Toolintegration unterstützen beide Frameworks Model Context Protocol (MCP), sodass Agents mit externen Diensten und Datenquellen interagieren können. Agent Framework bietet umfassendere integrierte Unterstützung.
AutoGen MCP-Unterstützung
AutoGen verfügt über grundlegende MCP-Unterstützung durch Erweiterungen (spezifische Implementierungsdetails variieren je nach Version).
Agent Framework MCP-Unterstützung
from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
# Stdio MCP server
mcp_tool = MCPStdioTool(
name="filesystem",
command="uvx mcp-server-filesystem",
args=["/allowed/directory"]
)
# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
name="http_mcp",
url="http://localhost:8000/sse"
)
# WebSocket MCP
ws_mcp = MCPWebsocketTool(
name="websocket_mcp",
url="ws://localhost:8000/ws"
)
agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])
Beispiele für MCP finden Sie unter:
- OpenAI mit lokalem MCP – Verwenden von MCPStreamableHTTPTool mit OpenAI
- OpenAI mit gehosteten MCP - Verwendung gehosteter MCP-Dienste
- Azure AI mit lokalem MCP – Verwenden von MCP mit Azure AI
- Azure AI mit gehostetem MCP – Verwenden gehosteter MCP mit Azure AI
Agent-as-a-Tool-Muster
Ein leistungsfähiges Muster ist die Verwendung von Agents selbst als Tools, die hierarchische Agentarchitekturen ermöglichen. Beide Frameworks unterstützen dieses Muster mit unterschiedlichen Implementierungen.
AutoGen AgentTool
from autogen_agentchat.tools import AgentTool
# Create specialized agent
writer = AssistantAgent(
name="writer",
model_client=client,
system_message="You are a creative writer."
)
# Wrap as tool
writer_tool = AgentTool(agent=writer)
# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
model="gpt-5",
parallel_tool_calls=False
)
coordinator = AssistantAgent(
name="coordinator",
model_client=coordinator_client,
tools=[writer_tool]
)
Agent Framework as_tool()
from agent_framework import ChatAgent
# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
name="writer",
chat_client=client,
instructions="You are a creative writer."
)
# Convert to tool
writer_tool = writer.as_tool(
name="creative_writer",
description="Generate creative content",
arg_name="request",
arg_description="What to write"
)
# Use in coordinator
coordinator = ChatAgent(
name="coordinator",
chat_client=client,
tools=[writer_tool]
)
Expliziter Migrationshinweis: Legen Sie in AutoGen beim Umschließen von Agents als Tools auf dem Modellclient des Koordinators fest, parallel_tool_calls=False um Parallelitätsprobleme beim Aufrufen derselben Agentinstanz zu vermeiden.
Im Agent Framework ist es nicht erforderlich, parallele Toolaufrufe zu deaktivieren, as_tool() da Agents standardmäßig zustandslos sind.
Middleware (Agent Framework-Feature)
Das Agent Framework führt Middleware-Funktionen ein, die AutoGen fehlt. Middleware ermöglicht leistungsstarke cross-cutting-Bedenken wie Protokollierung, Sicherheit und Leistungsüberwachung.
from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable
# Assume we have client from previous examples
async def logging_middleware(
context: AgentRunContext,
next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
print(f"Agent {context.agent.name} starting")
await next(context)
print(f"Agent {context.agent.name} completed")
async def security_middleware(
context: FunctionInvocationContext,
next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
if "password" in str(context.arguments):
print("Blocking function call with sensitive data")
return # Don't call next()
await next(context)
agent = ChatAgent(
name="secure_agent",
chat_client=client,
middleware=[logging_middleware, security_middleware]
)
Vorteile:
- Sicherheit: Eingabeüberprüfung und Inhaltsfilterung
- Observability: Protokollierung, Metriken und Ablaufverfolgung
- Leistung: Zwischenspeichern und Einschränken der Rate
- Fehlerbehandlung: Ordnungsgemäße Beeinträchtigung und Wiederholungslogik
Ausführliche Middlewarebeispiele finden Sie unter:
- Funktionsbasierte Middleware – Einfache Funktions-Middleware
- Klassenbasierte Middleware – objektorientierte Middleware
- Ausnahmebehandlungs-Middleware – Fehlerbehandlungsmuster
- Shared State Middleware – Zustandsverwaltung über Agents hinweg
Benutzerdefinierte Agenten
Manchmal möchten Sie überhaupt keinen modellgestützten Agent – Sie möchten einen deterministischen oder API-gesicherten Agent mit benutzerdefinierter Logik. Beide Frameworks unterstützen das Erstellen von benutzerdefinierten Agents, aber die Muster unterscheiden sich.
AutoGen: Subclass BaseChatAgent
from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken
class StaticAgent(BaseChatAgent):
def __init__(self, name: str = "static", description: str = "Static responder") -> None:
super().__init__(name, description)
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]: # Which message types this agent produces
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
# Always return a static response
return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))
Notes:
- Implementieren und zurückgeben Sie
on_messages(...)eineResponsemit einer Chatnachricht. - Optional implementieren
on_reset(...), um den internen Zustand zwischen Denläufen zu löschen.
Agent-Framework: Erweitern von BaseAgent (threadfähig)
from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
BaseAgent,
ChatMessage,
Role,
TextContent,
)
class StaticAgent(BaseAgent):
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
# Build a static reply
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
# Persist conversation to the provided AgentThread (if any)
if thread is not None:
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
return AgentRunResponse(messages=[reply])
async def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
# Stream the same static response in a single chunk for simplicity
yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)
# Notify thread of input and the complete response once streaming ends
if thread is not None:
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
Notes:
-
AgentThreadverwaltet den Unterhaltungszustand extern; verwendenagent.get_new_thread()und übergeben Sie es anrun/run_stream. - Rufen Sie
self._notify_thread_of_new_messages(thread, input_messages, response_messages)auf, damit der Thread beide Seiten des Austauschs hat. - Sehen Sie sich das vollständige Beispiel an: Benutzerdefinierter Agent
Als Nächstes betrachten wir die Multi-Agent-Orchestrierung – der Bereich, in dem sich die Frameworks am meisten unterscheiden.
Zuordnung von Multi-Agent-Features
Übersicht über das Programmiermodell
Die Multi-Agent-Programmiermodelle stellen den wichtigsten Unterschied zwischen den beiden Frameworks dar.
Der Dualmodellansatz von AutoGen
AutoGen bietet zwei Programmiermodelle:
-
autogen-core: Low-Level, ereignisgesteuerte Programmierung mitRoutedAgentund Nachrichtenabonnements -
TeamAbstraktion: Allgemeines, lauforientiertes Modell, das auf der Grundlage vonautogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
@message_handler
async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
# Handle specific message types
pass
# High-level Team (easier but limited)
team = RoundRobinGroupChat(
participants=[agent1, agent2],
termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")
Herausforderungen:
- Das Modell auf niedriger Ebene ist für die meisten Benutzer zu komplex.
- Allgemeines Modell kann für komplexe Verhaltensweisen eingeschränkt werden
- Die Überbrückung zwischen den beiden Modellen erhöht die Implementierungskomplexität
Einheitliches Workflowmodell des Agent-Frameworks
Agent Framework bietet eine einzelne Workflow Abstraktion, die das Beste aus beiden Ansätzen kombiniert:
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
response = await agent1.run(input_msg)
await ctx.send_message(response.text)
@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
response = await agent2.run(input_msg)
await ctx.yield_output(response.text) # Final output
# Build typed data flow graph
workflow = (WorkflowBuilder()
.add_edge(agent1_executor, agent2_executor)
.set_start_executor(agent1_executor)
.build())
# Example usage (would be in async context)
# result = await workflow.run("Initial input")
Ausführliche Workflowbeispiele finden Sie unter:
- Workflowgrundlagen – Einführung in Ausführungs- und Edges
- Agents in Workflow – Integrieren von Agents in Workflows
- Workflowstreaming – Workflowausführung in Echtzeit
Vorteile:
- Einheitliches Modell: Einzelne Abstraktion für alle Komplexitätsstufen
- Typsicherheit: Stark typierte Eingaben und Ausgaben
- Graph-Visualisierung: Datenflussdarstellung löschen
- Flexible Komposition: Mix-Agents, Funktionen und Unterworkflows
Workflow vs GraphFlow
Die Abstraktion des Workflow Agent Frameworks ist von AutoGens experimentellem GraphFlow Feature inspiriert, stellt aber eine bedeutende Entwicklung in der Designphilosophie dar:
- GraphFlow: Steuerungsfluss basierend auf Übergängen und Nachrichten werden an alle Agents übertragen; Übergänge sind auf übertragenen Nachrichteninhalten bedingt
- Workflow: Datenflussbasiert, bei dem Nachrichten über bestimmte Kanten und Executoren weitergeleitet werden, werden von Rändern aktiviert, mit Unterstützung für die gleichzeitige Ausführung.
Visuelle Übersicht
Das folgende Diagramm kontrastiert autoGens Steuerfluss-GraphFlow (links) mit dem Datenflussworkflow-Workflow von Agent Framework (rechts). GraphFlow modelliert Agents als Knoten mit bedingten Übergängen und Übertragungen. Workflowmodelle-Executoren (Agents, Funktionen oder Unterworkflows), die durch typierte Kanten verbunden sind; sie unterstützt auch Anforderungs-/Antwort-Pausen und Prüfpunkte.
flowchart LR
subgraph AutoGenGraphFlow
direction TB
U[User / Task] --> A[Agent A]
A -->|success| B[Agent B]
A -->|retry| C[Agent C]
A -. broadcast .- B
A -. broadcast .- C
end
subgraph AgentFrameworkWorkflow
direction TB
I[Input] --> E1[Executor 1]
E1 -->|"str"| E2[Executor 2]
E1 -->|"image"| E3[Executor 3]
E3 -->|"str"| E2
E2 --> OUT[(Final Output)]
end
R[Request / Response Gate]
E2 -. request .-> R
R -. resume .-> E2
CP[Checkpoint]
E1 -. save .-> CP
CP -. load .-> E1
Praktisch:
- GraphFlow verwendet Agents als Knoten und sendet Nachrichten; Kanten stellen bedingte Übergänge dar.
- Workflow leitet Nachrichten entlang der Ränder weiter. Knoten (Executoren) können Agents, reine Funktionen oder Unterworkflows sein.
- Mit Anforderung/Antwort kann ein Workflow für externe Eingaben angehalten werden; Die Prüfpunkterstellung behält den Fortschritt bei und aktiviert den Fortsetzen.
Codevergleich
1) Sequenzielle + bedingte
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)
graph = (
DiGraphBuilder()
.add_node(writer).add_node(reviewer).add_node(editor)
.add_edge(writer, reviewer) # always
.add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
.set_entry_point(writer)
).build()
team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"Draft: {task}")
@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
decision = "approve" if "solar" in draft.lower() else "revise"
await ctx.send_message(f"{decision}:{draft}")
@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
if msg.startswith("approve:"):
await ctx.yield_output(msg.split(":", 1)[1])
else:
await ctx.yield_output("Needs revision")
workflow_seq = (
WorkflowBuilder()
.add_edge(writer_exec, reviewer_exec)
.add_edge(reviewer_exec, editor_exec)
.set_start_executor(writer_exec)
.build()
)
2) Fan-out + Join (ALL vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d
# ALL (default): D runs after both B and C
g_all = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D).add_edge(C, D)
.set_entry_point(A)
).build()
# ANY: D runs when either B or C completes
g_any = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D, activation_group="join_d", activation_condition="any")
.add_edge(C, D, activation_group="join_d", activation_condition="any")
.set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B:{task}", target_id="B")
await ctx.send_message(f"C:{task}", target_id="C")
@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B_done:{text}")
@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"C_done:{text}")
@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"First: {msg}") # ANY join (first arrival)
@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
state = await ctx.get_executor_state() or {"items": []}
state["items"].append(msg)
await ctx.set_executor_state(state)
if len(state["items"]) >= 2:
await ctx.yield_output(" | ".join(state["items"])) # ALL join
wf_any = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_any).add_edge(branch_c, join_any)
.set_start_executor(start)
.build()
)
wf_all = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_all).add_edge(branch_c, join_all)
.set_start_executor(start)
.build()
)
3) Gezieltes Routing (keine Übertragung)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
# Route selectively using target_id
if task.startswith("image:"):
await ctx.send_message(task.removeprefix("image:"), target_id="vision")
else:
await ctx.send_message(task, target_id="writer")
@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Draft: {text}")
@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Caption: {image_ref}")
workflow = (
WorkflowBuilder()
.add_edge(ingest, write)
.add_edge(ingest, caption)
.set_start_executor(ingest)
.build()
)
# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")
Was zu beachten ist:
- GraphFlow sendet Nachrichten und verwendet bedingte Übergänge. Das Verknüpfungsverhalten wird über zielseitige
activationund pro Edgeactivation_group/activation_conditionkonfiguriert (z. B. gruppieren Sie beide Kanten mit ).join_dactivation_condition="any" - Workflow leitet Daten explizit weiter; wird
target_idverwendet, um nachgeschaltete Executoren auszuwählen. Das Verknüpfungsverhalten befindet sich im empfangenden Vollstreckungsausführer (z. B. ertragen bei der ersten Eingabe vs für alle) oder über Orchestrierungs-Generatoren/Aggregatoren. - Executors in Workflow are free-form: wrap a
ChatAgent, a function, or a sub-workflow and mix them within the same graph.
Zentrale Unterschiede
Die folgende Tabelle fasst die grundlegenden Unterschiede zwischen dem GraphFlow von AutoGen und dem Workflow von Agent Framework zusammen:
| Aspekt | AutoGen GraphFlow | Agent Framework-Workflow |
|---|---|---|
| Flusstyp | Steuerungsfluss (Kanten sind Übergänge) | Datenfluss (Ränder weiterleiten Nachrichten) |
| Knotentypen | Nur Agents | Agents, Funktionen, Unterworkflows |
| Aktivierung | Nachrichtenübertragung | Edgebasierte Aktivierung |
| Typsicherheit | Begrenzt | Starke Eingabe während des gesamten |
| Kompositierbarkeit | Begrenzt | Hochkomposierbar |
Schachtelungsmuster
AutoGen Team Nesting
# Inner team
inner_team = RoundRobinGroupChat(
participants=[specialist1, specialist2],
termination_condition=StopAfterNMessages(3)
)
# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
participants=[coordinator, inner_team, reviewer], # Team as participant
termination_condition=StopAfterNMessages(10)
)
# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")
AutoGen-Schachtelungsmerkmale:
- Geschachteltes Team empfängt alle Nachrichten aus dem äußeren Team
- Geschachtelte Teamnachrichten werden an alle äußeren Teamteilnehmer übertragen.
- Freigegebener Nachrichtenkontext auf allen Ebenen
Schachtelung von Agent-Framework-Workflows
from agent_framework import WorkflowExecutor, WorkflowBuilder
# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor
# Create sub-workflow
sub_workflow = (WorkflowBuilder()
.add_edge(specialist1_executor, specialist2_executor)
.set_start_executor(specialist1_executor)
.build())
# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
workflow=sub_workflow,
id="sub_process"
)
# Use in parent workflow
parent_workflow = (WorkflowBuilder()
.add_edge(coordinator_executor, sub_workflow_executor)
.add_edge(sub_workflow_executor, reviewer_executor)
.set_start_executor(coordinator_executor)
.build())
Schachtelungsmerkmale des Agent-Frameworks:
- Isolierte Eingabe/Ausgabe über
WorkflowExecutor - Keine Nachrichtenübertragung – Daten fließen über bestimmte Verbindungen
- Unabhängige Zustandsverwaltung für jede Workflowebene
Gruppenchatmuster
Gruppenchatmuster ermöglichen es mehreren Agents, an komplexen Aufgaben zusammenzuarbeiten. Hier erfahren Sie, wie allgemeine Muster zwischen Frameworks übersetzt werden.
RoundRobinGroupChat-Muster
AutoGen-Implementierung:
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages
team = RoundRobinGroupChat(
participants=[agent1, agent2, agent3],
termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")
Agent Framework-Implementierung:
from agent_framework import SequentialBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()
# Example usage (would be in async context)
async def sequential_example():
# Each agent appends to shared conversation
async for event in workflow.run_stream("Discuss this topic"):
if isinstance(event, WorkflowOutputEvent):
conversation_history = event.data # list[ChatMessage]
Ausführliche Orchestrierungsbeispiele finden Sie unter:
- Sequenzielle Agents – Ausführung des Roundrobin-Agents
- Sequenzielle benutzerdefinierte Executors – Benutzerdefinierte Executormuster
Für gleichzeitige Ausführungsmuster bietet Das Agent Framework auch Folgendes:
from agent_framework import ConcurrentBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
.participants([agent1, agent2, agent3])
.build())
# Example usage (would be in async context)
async def concurrent_example():
# All agents process the input concurrently
async for event in workflow.run_stream("Process this in parallel"):
if isinstance(event, WorkflowOutputEvent):
results = event.data # Combined results from all agents
Beispiele für gleichzeitige Ausführung finden Sie unter:
- Gleichzeitige Agents – Parallele Agentausführung
- Gleichzeitige benutzerdefinierte Executors – benutzerdefinierte parallele Muster
- Gleichzeitig mit benutzerdefiniertem Aggregator – Ergebnisaggregationsmuster
MagenticOneGroupChat-Muster
AutoGen-Implementierung:
from autogen_agentchat.teams import MagenticOneGroupChat
team = MagenticOneGroupChat(
participants=[researcher, coder, executor],
model_client=coordinator_client,
termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")
Agent Framework-Implementierung:
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
MagenticBuilder,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient
# Create a manager agent for orchestration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
chat_client=OpenAIChatClient(),
)
workflow = (
MagenticBuilder()
.participants(researcher=researcher, coder=coder)
.with_standard_manager(
agent=manager_agent,
max_round_count=20,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
# Example usage (would be in async context)
async def magentic_example():
output: str | None = None
async for event in workflow.run_stream("Complex research task"):
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
text = event.data.text if event.data else ""
print(f"[ORCHESTRATOR]: {text}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if event.data and event.data.text:
print(f"[{agent_id}]: {event.data.text}", end="")
elif isinstance(event, WorkflowOutputEvent):
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
Anpassungsoptionen des Agent-Frameworks:
Der Magentic-Workflow bietet umfangreiche Anpassungsoptionen:
- Managerkonfiguration: Verwenden eines ChatAgent mit benutzerdefinierten Anweisungen und Modelleinstellungen
-
Round limits:
max_round_count, ,max_stall_countmax_reset_count -
Ereignisstreaming: Verwenden
AgentRunUpdateEventmitmagentic_event_typeMetadaten - Agent-Spezialisierung: Benutzerdefinierte Anweisungen und Tools pro Agent
- Mensch-in-the-Loop: Planen der Überprüfung, Genehmigung von Tools und Einstandsintervention
# Advanced customization example with human-in-the-loop
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
MagenticBuilder,
MagenticHumanInterventionDecision,
MagenticHumanInterventionKind,
MagenticHumanInterventionReply,
MagenticHumanInterventionRequest,
RequestInfoEvent,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient
# Create manager agent with custom configuration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator for complex tasks",
instructions="Custom orchestration instructions...",
chat_client=OpenAIChatClient(model_id="gpt-4o"),
)
workflow = (
MagenticBuilder()
.participants(
researcher=researcher_agent,
coder=coder_agent,
analyst=analyst_agent,
)
.with_standard_manager(
agent=manager_agent,
max_round_count=15, # Limit total rounds
max_stall_count=2, # Trigger stall handling
max_reset_count=1, # Allow one reset on failure
)
.with_plan_review() # Enable human plan review
.with_human_input_on_stall() # Enable human intervention on stalls
.build()
)
# Handle human intervention requests during execution
async for event in workflow.run_stream("Complex task"):
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
req = cast(MagenticHumanInterventionRequest, event.data)
if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
# Review and approve the plan
reply = MagenticHumanInterventionReply(
decision=MagenticHumanInterventionDecision.APPROVE
)
async for ev in workflow.send_responses_streaming({event.request_id: reply}):
pass # Handle continuation
Ausführliche Magentische Beispiele finden Sie unter:
- Einfacher Magentic-Workflow – Standard-koordinierter Multi-Agent-Workflow
- Magentisch mit Prüfpunkterstellung – Persistente koordinierte Workflows
- Magentic Human Plan Update - Überprüfung des menschlichen Plans
- Magentische Agentenklärung - Werkzeuggenehmigung zur Agentenklärung
- Magentic Human Replan - Menschliche Intervention auf Stände
Zukünftige Muster
Die Agent Framework-Roadmap enthält mehrere AutoGen-Muster, die sich derzeit in der Entwicklung befinden:
- Schwarmmuster: Handoff-basierte Agentenkoordination
- SelectorGroupChat: LLM-gesteuerte Lautsprecherauswahl
Human-in-the-Loop mit Anforderungsantwort
Ein wichtiges neues Feature im Agent Framework Workflow ist das Konzept der Anforderung und Antwort, mit dem Workflows die Ausführung anhalten und auf externe Eingaben warten können, bevor sie fortfahren. Diese Funktion ist nicht in der Abstraktion von Team AutoGen vorhanden und ermöglicht anspruchsvolle Menschliche-in-the-Loop-Muster.
AutoGen-Einschränkungen
Die Abstraktion von Team AutoGen wird kontinuierlich ausgeführt, sobald sie gestartet wurde, und bietet keine integrierten Mechanismen zum Anhalten der Ausführung für menschliche Eingaben. Für alle Funktionen von Menschen in der Schleife sind benutzerdefinierte Implementierungen außerhalb des Frameworks erforderlich.
Agent Framework-Request-Response-API
Das Agent-Framework bietet integrierte Anforderungsantwortfunktionen, bei denen jeder Verwalter Anforderungen mithilfe ctx.request_info() von Antworten mit dem @response_handler Dekorateur senden und verarbeiten kann.
from agent_framework import (
RequestInfoEvent, WorkflowBuilder, WorkflowContext,
Executor, handler, response_handler
)
from dataclasses import dataclass
# Assume we have agent_executor defined elsewhere
# Define typed request payload
@dataclass
class ApprovalRequest:
"""Request human approval for agent output."""
content: str = ""
agent_name: str = ""
# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
@handler
async def review_content(
self,
agent_response: str,
ctx: WorkflowContext
) -> None:
# Request human input with structured data
approval_request = ApprovalRequest(
content=agent_response,
agent_name="writer_agent"
)
await ctx.request_info(request_data=approval_request, response_type=str)
@response_handler
async def handle_approval_response(
self,
original_request: ApprovalRequest,
decision: str,
ctx: WorkflowContext
) -> None:
decision_lower = decision.strip().lower()
original_content = original_request.content
if decision_lower == "approved":
await ctx.yield_output(f"APPROVED: {original_content}")
else:
await ctx.yield_output(f"REVISION NEEDED: {decision}")
# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")
workflow = (WorkflowBuilder()
.add_edge(agent_executor, reviewer)
.set_start_executor(agent_executor)
.build())
Ausführen von Human-in-the-Loop-Workflows
Das Agent Framework stellt Streaming-APIs bereit, um den Pausen-Fortsetzungszyklus zu behandeln:
from agent_framework import RequestInfoEvent, WorkflowOutputEvent
# Assume we have workflow defined from previous examples
async def run_with_human_input():
pending_responses = None
completed = False
while not completed:
# First iteration uses run_stream, subsequent use send_responses_streaming
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("initial input")
)
events = [event async for event in stream]
pending_responses = None
# Collect human requests and outputs
for event in events:
if isinstance(event, RequestInfoEvent):
# Display request to human and collect response
request_data = event.data # ApprovalRequest instance
print(f"Review needed: {request_data.content}")
human_response = input("Enter 'approved' or revision notes: ")
pending_responses = {event.request_id: human_response}
elif isinstance(event, WorkflowOutputEvent):
print(f"Final result: {event.data}")
completed = True
Beispiele für Benutzer-in-the-Loop-Workflows finden Sie unter:
- Spiel mit menschlicher Eingabe erraten – interaktiver Workflow mit Benutzerfeedback
- Workflow als Agent mit menschlicher Eingabe – Geschachtelte Workflows mit menschlicher Interaktion
Prüfpunkt und Fortsetzen von Workflows
Ein weiterer wichtiger Vorteil der AutoGen-Abstraktion von Workflow Agent Framework Team ist die integrierte Unterstützung für die Prüfpunkt- und Fortsetzungsausführung. Auf diese Weise können Workflows angehalten, beibehalten und später von jedem Prüfpunkt fortgesetzt werden, wodurch Fehlertoleranz bereitgestellt und lange ausgeführte oder asynchrone Workflows aktiviert werden.
AutoGen-Einschränkungen
Die Abstraktion von Team AutoGen bietet keine integrierten Prüfpunktfunktionen. Alle Persistenz- oder Wiederherstellungsmechanismen müssen extern implementiert werden und erfordern häufig komplexe Zustandsverwaltungs- und Serialisierungslogik.
Agent Framework Prüfpunkting
Agent Framework bietet umfassende Prüfpunkte und FileCheckpointStorage die with_checkpointing() Methode für WorkflowBuilder. Prüfpunkteerfassung:
-
Ausführungszustand: Lokaler Zustand für jeden Executor, der verwendet wird
ctx.set_executor_state() -
Freigegebener Zustand: Ausführungsübergreifender Zustand mithilfe von
ctx.set_shared_state() - Nachrichtenwarteschlangen: Ausstehende Nachrichten zwischen Executoren
- Workflowposition: Aktueller Ausführungsfortschritt und nächste Schritte
from agent_framework import (
FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
Executor, handler
)
from typing_extensions import Never
class ProcessingExecutor(Executor):
@handler
async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
# Process the data
result = f"Processed: {data.upper()}"
print(f"Processing: '{data}' -> '{result}'")
# Persist executor-local state
prev_state = await ctx.get_executor_state() or {}
count = prev_state.get("count", 0) + 1
await ctx.set_executor_state({
"count": count,
"last_input": data,
"last_output": result
})
# Persist shared state for other executors
await ctx.set_shared_state("original_input", data)
await ctx.set_shared_state("processed_output", result)
await ctx.send_message(result)
class FinalizeExecutor(Executor):
@handler
async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
result = f"Final: {data}"
await ctx.yield_output(result)
# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")
# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
.add_edge(processing_executor, finalize_executor)
.set_start_executor(processing_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable checkpointing
.build())
# Example usage (would be in async context)
async def checkpoint_example():
# Run workflow - checkpoints are created automatically
async for event in workflow.run_stream("input data"):
print(f"Event: {event}")
Fortsetzen ab Prüfpunkten
Agent Framework stellt APIs bereit, um bestimmte Prüfpunkte auflisten, prüfen und fortsetzen zu können:
from typing_extensions import Never
from agent_framework import (
Executor,
FileCheckpointStorage,
WorkflowContext,
WorkflowBuilder,
get_checkpoint_summary,
handler,
)
class UpperCaseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
result = text.upper()
await ctx.send_message(result)
class ReverseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
result = text[::-1]
await ctx.yield_output(result)
def create_workflow(checkpoint_storage: FileCheckpointStorage):
"""Create a workflow with two executors and checkpointing."""
upper_executor = UpperCaseExecutor(id="upper")
reverse_executor = ReverseExecutor(id="reverse")
return (WorkflowBuilder()
.add_edge(upper_executor, reverse_executor)
.set_start_executor(upper_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage)
.build())
# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
async def checkpoint_resume_example():
# List available checkpoints
checkpoints = await checkpoint_storage.list_checkpoints()
# Display checkpoint information
for checkpoint in checkpoints:
summary = get_checkpoint_summary(checkpoint)
print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")
# Resume from a specific checkpoint
if checkpoints:
chosen_checkpoint_id = checkpoints[0].checkpoint_id
# Create new workflow instance and resume
new_workflow = create_workflow(checkpoint_storage)
async for event in new_workflow.run_stream(
checkpoint_id=chosen_checkpoint_id,
checkpoint_storage=checkpoint_storage
):
print(f"Resumed event: {event}")
Erweiterte Prüfpunktfunktionen
Prüfpunkt mit Human-in-the-Loop-Integration:
Die Prüfpunkterstellung funktioniert nahtlos mit menschlichen In-the-Loop-Workflows, sodass Workflows für menschliche Eingaben angehalten und später fortgesetzt werden können. Beim Fortsetzen eines Prüfpunkts, der ausstehende Anforderungen enthält, werden diese Anforderungen als Ereignisse erneut ausgegeben:
# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
# Resume from checkpoint - pending requests will be re-emitted
request_info_events = []
async for event in workflow.run_stream(
checkpoint_id=checkpoint_id,
checkpoint_storage=checkpoint_storage
):
if isinstance(event, RequestInfoEvent):
request_info_events.append(event)
# Handle re-emitted pending request
responses = {}
for event in request_info_events:
response = handle_request(event.data)
responses[event.request_id] = response
# Send response back to workflow
async for event in workflow.send_responses_streaming(responses):
print(f"Event: {event}")
Wichtige Vorteile
Im Vergleich zu AutoGen bietet die Prüfpunkterstellung des Agent-Frameworks Folgendes:
- Automatische Persistenz: Keine manuelle Zustandsverwaltung erforderlich
- Granulare Wiederherstellung: Fortsetzen von jeder Oberschrittgrenze
- Statusisolation: Separater ausführungslokaler und gemeinsam genutzter Zustand
- Human-in-the-Loop-Integration: Nahtloses Anhalten mit menschlichem Input
- Fehlertoleranz: Robuste Wiederherstellung von Fehlern oder Unterbrechungen
Praktische Beispiele
Umfassende Prüfpunktbeispiele finden Sie unter:
- Prüfpunkt mit Lebenslauf – Grundlegende Prüfpunkte und interaktive Lebensläufe
- Prüfpunkt mit Human-in-the-Loop – Persistente Workflows mit menschlichen Genehmigungsgaten
- Unterworkflow-Prüfpunkt – Prüfpunkt für verschachtelte Workflows
- Magentischer Prüfpunkt – Kontrollierte Multi-Agent-Workflows
Observierbarkeit
Sowohl AutoGen als auch Agent Framework bieten Observability-Funktionen, aber mit unterschiedlichen Ansätzen und Features.
AutoGen Observability
AutoGen verfügt über systemeigene Unterstützung für OpenTelemetry mit Instrumentierung für:
-
Laufzeitablaufverfolgung:
SingleThreadedAgentRuntimeundGrpcWorkerAgentRuntime -
Toolausführung:
BaseToolumfasstexecute_toolfolgende GenAI-Semantikkonventionen -
Agent-Vorgänge:
BaseChatAgentmitcreate_agentundinvoke_agentüberspannen
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime
# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)
# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)
Agent Framework Observability
Agent Framework bietet umfassende Observability durch mehrere Ansätze:
- Zero-Code-Setup: Automatische Instrumentierung über Umgebungsvariablen
- Manuelle Konfiguration: Programmgesteuertes Setup mit benutzerdefinierten Parametern
- Umfassende Telemetrie: Agents, Workflows und Toolausführungsverfolgung
- Konsolenausgabe: Integrierte Konsolenprotokollierung und -visualisierung
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient
# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317
# Or manual setup
setup_observability(
otlp_endpoint="http://localhost:4317"
)
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
async def observability_example():
# Observability is automatically applied to all agents and workflows
agent = ChatAgent(name="assistant", chat_client=client)
result = await agent.run("Hello") # Automatically traced
Wichtige Unterschiede:
- Setupkomplexität: Das Agent-Framework bietet einfachere Zero-Code-Setupoptionen.
- Bereich: Das Agent-Framework bietet eine umfassendere Abdeckung, einschließlich der Beobachtbarkeit auf Workflowebene
- Visualisierung: Agent Framework umfasst integrierte Konsolenausgabe und Entwicklungs-UI
- Konfiguration: Agent Framework bietet flexiblere Konfigurationsoptionen
Ausführliche Beispiele zur Observierbarkeit finden Sie unter:
- Zero-Code-Setup – Umgebungsvariablenkonfiguration
- Manuelles Setup – Programmgesteuerte Konfiguration
- Agent Observability – Telemetrie des einzelnen Agents
- Workflowbeobachtbarkeit – Workflowablaufverfolgung mit mehreren Agents
Conclusion
Dieser Migrationsleitfaden bietet eine umfassende Zuordnung zwischen AutoGen und Microsoft Agent Framework, die alles von der grundlegenden Agenterstellung bis hin zu komplexen Multi-Agent-Workflows abdeckt. Wichtige Voraussetzungen für die Migration:
- Die Migration mit einem einzelnen Agent ist einfach, mit ähnlichen APIs und erweiterten Funktionen im Agent Framework
- Multi-Agent-Muster müssen Ihren Ansatz von ereignisgesteuerten zu datenflussbasierten Architekturen überdenken, aber wenn Sie bereits mit GraphFlow vertraut sind, ist der Übergang einfacher.
- Agent Framework bietet zusätzliche Features wie Middleware, gehostete Tools und eingegebene Workflows.
Weitere Beispiele und detaillierte Anleitungen zur Implementierung finden Sie im Verzeichnis "Agent Framework-Beispiele ".
Zusätzliche Beispielkategorien
Das Agent Framework bietet Beispiele in mehreren anderen wichtigen Bereichen:
- Threads: Threadbeispiele – Verwalten des Unterhaltungszustands und kontexts
- Multimodale Eingabe: Multimodale Beispiele - Arbeiten mit Bildern und anderen Medientypen
- Kontextanbieter: Beispiele für Kontextanbieter – Integrationsmuster für externe Kontexte