次の方法で共有


Microsoft Agent Framework ワークフロー オーケストレーション - Magentic

マゼンティック オーケストレーションは、AutoGen によって発明された Magentic-One システムに基づいて設計されています。 これは、動的なコラボレーションを必要とする複雑で自由なタスク用に設計された、柔軟で汎用のマルチエージェント パターンです。 このパターンでは、専用の Magentic マネージャーが専門エージェントのチームを調整し、進化するコンテキスト、タスクの進行状況、およびエージェントの機能に基づいて次に対応するエージェントを選択します。

Magentic マネージャーは、共有コンテキストを維持し、進行状況を追跡し、ワークフローをリアルタイムで調整します。 これにより、システムは複雑な問題を分解し、サブタスクを委任し、エージェントのコラボレーションを通じてソリューションを繰り返し絞り込むことができます。 オーケストレーションは、ソリューション パスが事前に不明で、複数の推論、調査、計算が必要になる場合があるシナリオに特に適しています。

マゼンティック オーケストレーション

ここでは、次の内容について学習します

  • 複数の特殊なエージェントを調整するように Magentic マネージャーを設定する方法
  • ストリーミング イベントを処理する方法 AgentRunUpdateEvent
  • ヒューマンインザループの計画レビュー、ツール承認、スタール介入を実装する方法
  • 複雑なタスクを通じてエージェントのコラボレーションと進行状況を追跡する方法

特殊なエージェントを定義する

もうすぐです。。。

マゼンティック オーケストレーションでは、マネージャーがタスクの要件に基づいて動的に選択できる特殊なエージェントを定義します。

from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

researcher_agent = ChatAgent(
    name="ResearcherAgent",
    description="Specialist in research and information gathering",
    instructions=(
        "You are a Researcher. You find information without additional computation or quantitative analysis."
    ),
    # This agent requires the gpt-4o-search-preview model to perform web searches
    chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)

coder_agent = ChatAgent(
    name="CoderAgent",
    description="A helpful assistant that writes and executes code to process and analyze data.",
    instructions="You solve questions using code. Please provide detailed analysis and computation process.",
    chat_client=OpenAIResponsesClient(),
    tools=HostedCodeInterpreterTool(),
)

# Create a manager agent for orchestration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator that coordinates the research and coding workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    chat_client=OpenAIChatClient(),
)

マゼンティック ワークフローを構築する

MagenticBuilderを使用して、標準マネージャーでワークフローを構成します。

from agent_framework import MagenticBuilder

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,  # Maximum collaboration rounds
        max_stall_count=3,   # Maximum rounds without progress
        max_reset_count=2,   # Maximum plan resets allowed
    )
    .build()
)

イベント ストリーミングを使用してワークフローを実行する

複雑なタスクを実行し、ストリーミング出力とオーケストレーションの更新のイベントを処理します。

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatMessage,
    WorkflowOutputEvent,
)

task = (
    "I am preparing a report on the energy efficiency of different machine learning model architectures. "
    "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
    "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
    "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
    "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
    "per task type (image classification, text classification, and text generation)."
)

# State for streaming callback
last_stream_agent_id: str | None = None
stream_line_open: bool = False
output: str | None = None

async for event in workflow.run_stream(task):
    if isinstance(event, AgentRunUpdateEvent):
        props = event.data.additional_properties if event.data else None
        event_type = props.get("magentic_event_type") if props else None

        if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
            # Manager's planning and coordination messages
            kind = props.get("orchestrator_message_kind", "") if props else ""
            text = event.data.text if event.data else ""
            print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")

        elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
            # Streaming tokens from agents
            agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
            if last_stream_agent_id != agent_id or not stream_line_open:
                if stream_line_open:
                    print()
                print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
                last_stream_agent_id = agent_id
                stream_line_open = True
            if event.data and event.data.text:
                print(event.data.text, end="", flush=True)

        elif event.data and event.data.text:
            print(event.data.text, end="", flush=True)

    elif isinstance(event, WorkflowOutputEvent):
        output_messages = cast(list[ChatMessage], event.data)
        if output_messages:
            output = output_messages[-1].text

if stream_line_open:
    print()

if output is not None:
    print(f"Workflow completed with result:\n\n{output}")

高度: ヒューマンインザループ計画レビュー

実行前にマネージャーの計画の人間によるレビューと承認を有効にします。

プラン レビューの構成

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    MagenticHumanInterventionDecision,
    MagenticHumanInterventionKind,
    MagenticHumanInterventionReply,
    MagenticHumanInterventionRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,
        max_stall_count=3,
        max_reset_count=2,
    )
    .with_plan_review()  # Enable plan review
    .build()
)

プランのレビュー要求を処理する

pending_request: RequestInfoEvent | None = None
pending_responses: dict[str, MagenticHumanInterventionReply] | None = None
completed = False
workflow_output: str | None = None

while not completed:
    # Use streaming for both initial run and response sending
    if pending_responses is not None:
        stream = workflow.send_responses_streaming(pending_responses)
    else:
        stream = workflow.run_stream(task)

    async for event in stream:
        if isinstance(event, AgentRunUpdateEvent):
            # Handle streaming events as shown above
            pass
        elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
            request = cast(MagenticHumanInterventionRequest, event.data)
            if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
                pending_request = event
                if request.plan_text:
                    print(f"\n=== PLAN REVIEW REQUEST ===\n{request.plan_text}\n")
        elif isinstance(event, WorkflowOutputEvent):
            workflow_output = str(event.data) if event.data else None
            completed = True

    pending_responses = None

    # Handle pending plan review request
    if pending_request is not None:
        # Collect human decision (approve/reject/modify)
        # For demo, we auto-approve:
        reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE)

        # Or approve with comments:
        # reply = MagenticHumanInterventionReply(
        #     decision=MagenticHumanInterventionDecision.APPROVE,
        #     comments="Looks good, but prioritize efficiency metrics."
        # )

        # Or request revision:
        # reply = MagenticHumanInterventionReply(
        #     decision=MagenticHumanInterventionDecision.REVISE,
        #     comments="Please include a comparison with newer models like LLaMA."
        # )

        pending_responses = {pending_request.request_id: reply}
        pending_request = None

詳細: ツール承認によるエージェントの明確化

エージェントは、ツールの承認を使用して、実行中にユーザーに明確な質問をすることができます。 これにより、Human-in-the-Loop (HITL) の対話が可能になり、エージェントは続行する前に追加情報を要求できます。

承認が必要なツールを定義する

from typing import Annotated
from agent_framework import ai_function

@ai_function(approval_mode="always_require")
def ask_user(question: Annotated[str, "The question to ask the user for clarification"]) -> str:
    """Ask the user a clarifying question to gather missing information.

    Use this tool when you need additional information from the user to complete
    your task effectively.
    """
    # This function body is a placeholder - the actual interaction happens via HITL.
    return f"User was asked: {question}"

ツールを使用してエージェントを作成する

onboarding_agent = ChatAgent(
    name="OnboardingAgent",
    description="HR specialist who handles employee onboarding",
    instructions=(
        "You are an HR Onboarding Specialist. Your job is to onboard new employees.\n\n"
        "IMPORTANT: When given an onboarding request, you MUST gather the following "
        "information before proceeding:\n"
        "1. Department (e.g., Engineering, Sales, Marketing)\n"
        "2. Role/Title (e.g., Software Engineer, Account Executive)\n\n"
        "Use the ask_user tool to request ANY missing information."
    ),
    chat_client=OpenAIChatClient(model_id="gpt-4o"),
    tools=[ask_user],
)

ツール承認要求の処理

async for event in workflow.run_stream("Onboard Jessica Smith"):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)

        if req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL:
            print(f"Agent: {req.agent_id}")
            print(f"Question: {req.prompt}")

            # Get user's answer
            answer = input("> ").strip()

            # Send the answer back - it will be fed to the agent as the function result
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.APPROVE,
                response_text=answer,
            )
            pending_responses = {event.request_id: reply}

            # Continue workflow with response
            async for ev in workflow.send_responses_streaming(pending_responses):
                # Handle continuation events
                pass

上級: 失速現象に対する人間の介入

ワークフローがエージェントの進捗がないことを検出したとき、人間による介入を有効にします。

ストール介入の設定

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, analyst=analyst_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,
        max_stall_count=1,  # Stall detection after 1 round without progress
        max_reset_count=2,
    )
    .with_human_input_on_stall()  # Request human input when stalled
    .build()
)

ストール介入要求の処理

async for event in workflow.run_stream(task):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)

        if req.kind == MagenticHumanInterventionKind.STALL:
            print(f"Workflow stalled after {req.stall_count} rounds")
            print(f"Reason: {req.stall_reason}")
            if req.plan_text:
                print(f"Current plan:\n{req.plan_text}")

            # Choose response: CONTINUE, REPLAN, or GUIDANCE
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.GUIDANCE,
                comments="Focus on completing the research step first before moving to analysis.",
            )
            pending_responses = {event.request_id: reply}

主要概念

  • 動的調整: Magentic マネージャーは、進化するコンテキストに基づいて次に動作するエージェントを動的に選択します
  • 反復的な絞り込み: システムは複雑な問題を分割し、複数のラウンドを通じてソリューションを反復的に調整できます
  • 進行状況の追跡: ストールを検出し、必要に応じてプランをリセットするための組み込みのメカニズム
  • 柔軟なコラボレーション: エージェントは、マネージャーによって決定された順序で複数回呼び出すことができます
  • ヒューマン・オーバーサイト:計画レビュー、ツール承認、ストール介入を含む、オプションのヒューマン・イン・ザ・ループメカニズム
  • 統合イベント システム: AgentRunUpdateEventmagentic_event_typeを使用してオーケストレーターおよびエージェント ストリーミング イベントを処理する

ワークフロー実行フロー

マゼンティック オーケストレーションは、次の実行パターンに従います。

  1. 計画フェーズ: マネージャーはタスクを分析し、初期計画を作成します
  2. オプションのプラン レビュー: 有効にした場合、人間はプランを確認および承認/変更できます
  3. エージェントの選択: マネージャーは、各サブタスクに最適なエージェントを選択します
  4. 実行: 選択したエージェントがタスクの一部を実行します
  5. 進行状況の評価: マネージャーは進行状況を評価し、計画を更新します
  6. 停止検出: 進行状況が停止している場合は、自動で再計画するか、人間の介入を要求します
  7. イテレーション: タスクが完了するか、制限に達するまで手順 3 から 6 を繰り返します
  8. 最終的な合成: マネージャーは、すべてのエージェント出力を最終的な結果に合成します

コード例全体

すべての概念をまとめる完全な例を次に示します。

import asyncio
import logging
from typing import cast

from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    ChatMessage,
    HostedCodeInterpreterTool,
    MagenticBuilder,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)

async def main() -> None:
    # Define specialized agents
    researcher_agent = ChatAgent(
        name="ResearcherAgent",
        description="Specialist in research and information gathering",
        instructions=(
            "You are a Researcher. You find information without additional "
            "computation or quantitative analysis."
        ),
        chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
    )

    coder_agent = ChatAgent(
        name="CoderAgent",
        description="A helpful assistant that writes and executes code to process and analyze data.",
        instructions="You solve questions using code. Please provide detailed analysis and computation process.",
        chat_client=OpenAIResponsesClient(),
        tools=HostedCodeInterpreterTool(),
    )

    # Create a manager agent for orchestration
    manager_agent = ChatAgent(
        name="MagenticManager",
        description="Orchestrator that coordinates the research and coding workflow",
        instructions="You coordinate a team to complete complex tasks efficiently.",
        chat_client=OpenAIChatClient(),
    )

    # State for streaming output
    last_stream_agent_id: str | None = None
    stream_line_open: bool = False

    # Build the workflow
    print("\nBuilding Magentic Workflow...")

    workflow = (
        MagenticBuilder()
        .participants(researcher=researcher_agent, coder=coder_agent)
        .with_standard_manager(
            agent=manager_agent,
            max_round_count=10,
            max_stall_count=3,
            max_reset_count=2,
        )
        .build()
    )

    # Define the task
    task = (
        "I am preparing a report on the energy efficiency of different machine learning model architectures. "
        "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
        "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
        "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
        "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
        "per task type (image classification, text classification, and text generation)."
    )

    print(f"\nTask: {task}")
    print("\nStarting workflow execution...")

    # Run the workflow
    try:
        output: str | None = None
        async for event in workflow.run_stream(task):
            if isinstance(event, AgentRunUpdateEvent):
                props = event.data.additional_properties if event.data else None
                event_type = props.get("magentic_event_type") if props else None

                if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
                    kind = props.get("orchestrator_message_kind", "") if props else ""
                    text = event.data.text if event.data else ""
                    print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
                elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
                    agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
                    if last_stream_agent_id != agent_id or not stream_line_open:
                        if stream_line_open:
                            print()
                        print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
                        last_stream_agent_id = agent_id
                        stream_line_open = True
                    if event.data and event.data.text:
                        print(event.data.text, end="", flush=True)
                elif event.data and event.data.text:
                    print(event.data.text, end="", flush=True)
            elif isinstance(event, WorkflowOutputEvent):
                output_messages = cast(list[ChatMessage], event.data)
                if output_messages:
                    output = output_messages[-1].text

        if stream_line_open:
            print()

        if output is not None:
            print(f"Workflow completed with result:\n\n{output}")

    except Exception as e:
        print(f"Workflow execution failed: {e}")
        logger.exception("Workflow exception", exc_info=e)

if __name__ == "__main__":
    asyncio.run(main())

構成オプション

マネージャー パラメーター

  • max_round_count: コラボレーション ラウンドの最大数 (既定値: 10)
  • max_stall_count: 停滞処理を起動する前に進行がない最大サイクル数 (既定値: 3)
  • max_reset_count: 許容されるプランのリセットの最大数 (既定値: 2)

人間の介入の種類

  • PLAN_REVIEW: 初期計画の確認と承認/修正
  • TOOL_APPROVAL: ツール/関数呼び出しを承認する (エージェントの明確化に使用)
  • STALL: ワークフローがストールし、ガイダンスが必要です

人間の介入の決定

  • APPROVE: プランまたはツールの呼び出し as-is を受け入れる
  • REVISE: フィードバックを考慮したリビジョンの依頼 (計画レビュー)
  • REJECT: 拒否/却下 (ツールの承認)
  • CONTINUE: 現在の状態で続行する (停止)
  • REPLAN: リプランニングの再トリガー (停止)
  • GUIDANCE: ガイダンステキストを提供する(プロセスの停止、ツールの承認)

イベントの種類

イベントは、AgentRunUpdateEventのメタデータを含むadditional_propertiesを介して生成されます。

  • magentic_event_type: MAGENTIC_EVENT_TYPE_ORCHESTRATOR または MAGENTIC_EVENT_TYPE_AGENT_DELTA
  • orchestrator_message_kind: オーケストレーター イベントの場合は、メッセージの種類 (例: "instruction"、"notice"、"task_ledger") を示します。
  • agent_id: エージェントデルタイベントの場合、ストリーミングエージェントを識別します。

サンプル出力

もうすぐです。。。

次のステップ