マゼンティック オーケストレーションは、AutoGen によって発明された Magentic-One システムに基づいて設計されています。 これは、動的なコラボレーションを必要とする複雑で自由なタスク用に設計された、柔軟で汎用のマルチエージェント パターンです。 このパターンでは、専用の Magentic マネージャーが専門エージェントのチームを調整し、進化するコンテキスト、タスクの進行状況、およびエージェントの機能に基づいて次に対応するエージェントを選択します。
Magentic マネージャーは、共有コンテキストを維持し、進行状況を追跡し、ワークフローをリアルタイムで調整します。 これにより、システムは複雑な問題を分解し、サブタスクを委任し、エージェントのコラボレーションを通じてソリューションを繰り返し絞り込むことができます。 オーケストレーションは、ソリューション パスが事前に不明で、複数の推論、調査、計算が必要になる場合があるシナリオに特に適しています。
ここでは、次の内容について学習します
- 複数の特殊なエージェントを調整するように Magentic マネージャーを設定する方法
- ストリーミング イベントを処理する方法
AgentRunUpdateEvent - ヒューマンインザループの計画レビュー、ツール承認、スタール介入を実装する方法
- 複雑なタスクを通じてエージェントのコラボレーションと進行状況を追跡する方法
特殊なエージェントを定義する
もうすぐです。。。
マゼンティック オーケストレーションでは、マネージャーがタスクの要件に基づいて動的に選択できる特殊なエージェントを定義します。
from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
researcher_agent = ChatAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional computation or quantitative analysis."
),
# This agent requires the gpt-4o-search-preview model to perform web searches
chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)
coder_agent = ChatAgent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
chat_client=OpenAIResponsesClient(),
tools=HostedCodeInterpreterTool(),
)
# Create a manager agent for orchestration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the research and coding workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
chat_client=OpenAIChatClient(),
)
マゼンティック ワークフローを構築する
MagenticBuilderを使用して、標準マネージャーでワークフローを構成します。
from agent_framework import MagenticBuilder
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10, # Maximum collaboration rounds
max_stall_count=3, # Maximum rounds without progress
max_reset_count=2, # Maximum plan resets allowed
)
.build()
)
イベント ストリーミングを使用してワークフローを実行する
複雑なタスクを実行し、ストリーミング出力とオーケストレーションの更新のイベントを処理します。
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatMessage,
WorkflowOutputEvent,
)
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
# State for streaming callback
last_stream_agent_id: str | None = None
stream_line_open: bool = False
output: str | None = None
async for event in workflow.run_stream(task):
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
# Manager's planning and coordination messages
kind = props.get("orchestrator_message_kind", "") if props else ""
text = event.data.text if event.data else ""
print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
# Streaming tokens from agents
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if last_stream_agent_id != agent_id or not stream_line_open:
if stream_line_open:
print()
print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
last_stream_agent_id = agent_id
stream_line_open = True
if event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif isinstance(event, WorkflowOutputEvent):
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
if stream_line_open:
print()
if output is not None:
print(f"Workflow completed with result:\n\n{output}")
高度: ヒューマンインザループ計画レビュー
実行前にマネージャーの計画の人間によるレビューと承認を有効にします。
プラン レビューの構成
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
MagenticHumanInterventionDecision,
MagenticHumanInterventionKind,
MagenticHumanInterventionReply,
MagenticHumanInterventionRequest,
RequestInfoEvent,
WorkflowOutputEvent,
)
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
.with_plan_review() # Enable plan review
.build()
)
プランのレビュー要求を処理する
pending_request: RequestInfoEvent | None = None
pending_responses: dict[str, MagenticHumanInterventionReply] | None = None
completed = False
workflow_output: str | None = None
while not completed:
# Use streaming for both initial run and response sending
if pending_responses is not None:
stream = workflow.send_responses_streaming(pending_responses)
else:
stream = workflow.run_stream(task)
async for event in stream:
if isinstance(event, AgentRunUpdateEvent):
# Handle streaming events as shown above
pass
elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
request = cast(MagenticHumanInterventionRequest, event.data)
if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
pending_request = event
if request.plan_text:
print(f"\n=== PLAN REVIEW REQUEST ===\n{request.plan_text}\n")
elif isinstance(event, WorkflowOutputEvent):
workflow_output = str(event.data) if event.data else None
completed = True
pending_responses = None
# Handle pending plan review request
if pending_request is not None:
# Collect human decision (approve/reject/modify)
# For demo, we auto-approve:
reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE)
# Or approve with comments:
# reply = MagenticHumanInterventionReply(
# decision=MagenticHumanInterventionDecision.APPROVE,
# comments="Looks good, but prioritize efficiency metrics."
# )
# Or request revision:
# reply = MagenticHumanInterventionReply(
# decision=MagenticHumanInterventionDecision.REVISE,
# comments="Please include a comparison with newer models like LLaMA."
# )
pending_responses = {pending_request.request_id: reply}
pending_request = None
詳細: ツール承認によるエージェントの明確化
エージェントは、ツールの承認を使用して、実行中にユーザーに明確な質問をすることができます。 これにより、Human-in-the-Loop (HITL) の対話が可能になり、エージェントは続行する前に追加情報を要求できます。
承認が必要なツールを定義する
from typing import Annotated
from agent_framework import ai_function
@ai_function(approval_mode="always_require")
def ask_user(question: Annotated[str, "The question to ask the user for clarification"]) -> str:
"""Ask the user a clarifying question to gather missing information.
Use this tool when you need additional information from the user to complete
your task effectively.
"""
# This function body is a placeholder - the actual interaction happens via HITL.
return f"User was asked: {question}"
ツールを使用してエージェントを作成する
onboarding_agent = ChatAgent(
name="OnboardingAgent",
description="HR specialist who handles employee onboarding",
instructions=(
"You are an HR Onboarding Specialist. Your job is to onboard new employees.\n\n"
"IMPORTANT: When given an onboarding request, you MUST gather the following "
"information before proceeding:\n"
"1. Department (e.g., Engineering, Sales, Marketing)\n"
"2. Role/Title (e.g., Software Engineer, Account Executive)\n\n"
"Use the ask_user tool to request ANY missing information."
),
chat_client=OpenAIChatClient(model_id="gpt-4o"),
tools=[ask_user],
)
ツール承認要求の処理
async for event in workflow.run_stream("Onboard Jessica Smith"):
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
req = cast(MagenticHumanInterventionRequest, event.data)
if req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL:
print(f"Agent: {req.agent_id}")
print(f"Question: {req.prompt}")
# Get user's answer
answer = input("> ").strip()
# Send the answer back - it will be fed to the agent as the function result
reply = MagenticHumanInterventionReply(
decision=MagenticHumanInterventionDecision.APPROVE,
response_text=answer,
)
pending_responses = {event.request_id: reply}
# Continue workflow with response
async for ev in workflow.send_responses_streaming(pending_responses):
# Handle continuation events
pass
上級: 失速現象に対する人間の介入
ワークフローがエージェントの進捗がないことを検出したとき、人間による介入を有効にします。
ストール介入の設定
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, analyst=analyst_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=1, # Stall detection after 1 round without progress
max_reset_count=2,
)
.with_human_input_on_stall() # Request human input when stalled
.build()
)
ストール介入要求の処理
async for event in workflow.run_stream(task):
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
req = cast(MagenticHumanInterventionRequest, event.data)
if req.kind == MagenticHumanInterventionKind.STALL:
print(f"Workflow stalled after {req.stall_count} rounds")
print(f"Reason: {req.stall_reason}")
if req.plan_text:
print(f"Current plan:\n{req.plan_text}")
# Choose response: CONTINUE, REPLAN, or GUIDANCE
reply = MagenticHumanInterventionReply(
decision=MagenticHumanInterventionDecision.GUIDANCE,
comments="Focus on completing the research step first before moving to analysis.",
)
pending_responses = {event.request_id: reply}
主要概念
- 動的調整: Magentic マネージャーは、進化するコンテキストに基づいて次に動作するエージェントを動的に選択します
- 反復的な絞り込み: システムは複雑な問題を分割し、複数のラウンドを通じてソリューションを反復的に調整できます
- 進行状況の追跡: ストールを検出し、必要に応じてプランをリセットするための組み込みのメカニズム
- 柔軟なコラボレーション: エージェントは、マネージャーによって決定された順序で複数回呼び出すことができます
- ヒューマン・オーバーサイト:計画レビュー、ツール承認、ストール介入を含む、オプションのヒューマン・イン・ザ・ループメカニズム
-
統合イベント システム:
AgentRunUpdateEventでmagentic_event_typeを使用してオーケストレーターおよびエージェント ストリーミング イベントを処理する
ワークフロー実行フロー
マゼンティック オーケストレーションは、次の実行パターンに従います。
- 計画フェーズ: マネージャーはタスクを分析し、初期計画を作成します
- オプションのプラン レビュー: 有効にした場合、人間はプランを確認および承認/変更できます
- エージェントの選択: マネージャーは、各サブタスクに最適なエージェントを選択します
- 実行: 選択したエージェントがタスクの一部を実行します
- 進行状況の評価: マネージャーは進行状況を評価し、計画を更新します
- 停止検出: 進行状況が停止している場合は、自動で再計画するか、人間の介入を要求します
- イテレーション: タスクが完了するか、制限に達するまで手順 3 から 6 を繰り返します
- 最終的な合成: マネージャーは、すべてのエージェント出力を最終的な結果に合成します
コード例全体
すべての概念をまとめる完全な例を次に示します。
import asyncio
import logging
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
HostedCodeInterpreterTool,
MagenticBuilder,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
async def main() -> None:
# Define specialized agents
researcher_agent = ChatAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional "
"computation or quantitative analysis."
),
chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)
coder_agent = ChatAgent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
chat_client=OpenAIResponsesClient(),
tools=HostedCodeInterpreterTool(),
)
# Create a manager agent for orchestration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the research and coding workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
chat_client=OpenAIChatClient(),
)
# State for streaming output
last_stream_agent_id: str | None = None
stream_line_open: bool = False
# Build the workflow
print("\nBuilding Magentic Workflow...")
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
# Define the task
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
print(f"\nTask: {task}")
print("\nStarting workflow execution...")
# Run the workflow
try:
output: str | None = None
async for event in workflow.run_stream(task):
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
kind = props.get("orchestrator_message_kind", "") if props else ""
text = event.data.text if event.data else ""
print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if last_stream_agent_id != agent_id or not stream_line_open:
if stream_line_open:
print()
print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
last_stream_agent_id = agent_id
stream_line_open = True
if event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif isinstance(event, WorkflowOutputEvent):
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
if stream_line_open:
print()
if output is not None:
print(f"Workflow completed with result:\n\n{output}")
except Exception as e:
print(f"Workflow execution failed: {e}")
logger.exception("Workflow exception", exc_info=e)
if __name__ == "__main__":
asyncio.run(main())
構成オプション
マネージャー パラメーター
-
max_round_count: コラボレーション ラウンドの最大数 (既定値: 10) -
max_stall_count: 停滞処理を起動する前に進行がない最大サイクル数 (既定値: 3) -
max_reset_count: 許容されるプランのリセットの最大数 (既定値: 2)
人間の介入の種類
-
PLAN_REVIEW: 初期計画の確認と承認/修正 -
TOOL_APPROVAL: ツール/関数呼び出しを承認する (エージェントの明確化に使用) -
STALL: ワークフローがストールし、ガイダンスが必要です
人間の介入の決定
-
APPROVE: プランまたはツールの呼び出し as-is を受け入れる -
REVISE: フィードバックを考慮したリビジョンの依頼 (計画レビュー) -
REJECT: 拒否/却下 (ツールの承認) -
CONTINUE: 現在の状態で続行する (停止) -
REPLAN: リプランニングの再トリガー (停止) -
GUIDANCE: ガイダンステキストを提供する(プロセスの停止、ツールの承認)
イベントの種類
イベントは、AgentRunUpdateEventのメタデータを含むadditional_propertiesを介して生成されます。
-
magentic_event_type:MAGENTIC_EVENT_TYPE_ORCHESTRATORまたはMAGENTIC_EVENT_TYPE_AGENT_DELTA -
orchestrator_message_kind: オーケストレーター イベントの場合は、メッセージの種類 (例: "instruction"、"notice"、"task_ledger") を示します。 -
agent_id: エージェントデルタイベントの場合、ストリーミングエージェントを識別します。
サンプル出力
もうすぐです。。。