Freigeben über


Microsoft Agent Framework Workflows Orchestrations - Magnetic

Magentic Orchestration basiert auf dem Magentic-One System , das von AutoGen erfunden wurde. Es handelt sich um ein flexibles, allgemeines mehrstufiges Muster, das für komplexe, offene Aufgaben entwickelt wurde, die eine dynamische Zusammenarbeit erfordern. In diesem Muster koordiniert ein dedizierter Magentic-Manager ein Team von spezialisierten Agents, wobei ausgewählt wird, welcher Agent auf der Grundlage des sich entwickelnden Kontexts, des Aufgabenfortschritts und der Agent-Funktionen als nächstes agieren soll.

Der Magentic-Manager verwaltet einen freigegebenen Kontext, verfolgt den Fortschritt und passt den Workflow in Echtzeit an. Auf diese Weise kann das System komplexe Probleme aufschlüsseln, Teilvorgänge delegieren und Lösungen durch die Agentzusammenarbeit iterativ verfeinern. Die Orchestrierung eignet sich besonders gut für Szenarien, in denen der Lösungspfad im Voraus nicht bekannt ist und möglicherweise mehrere Runden von Gründen, Forschung und Berechnung erforderlich ist.

Magentische Orchestrierung

Sie lernen Folgendes

  • So richten Sie einen Magentic-Manager ein, um mehrere spezialisierte Agents zu koordinieren
  • So bearbeiten Sie Streaming-Ereignisse mit AgentRunUpdateEvent
  • So implementieren Sie die Überprüfung des Human-in-the-Loop-Prozesses, die Genehmigung von Tools und das Eingreifen bei Verzögerungen.
  • Wie man die Zusammenarbeit von Agenten und den Fortschritt bei komplexen Aufgaben nachverfolgt

Definieren Sie Ihre spezialisierten Agenten

Demnächst...

In Magentic Orchestration definieren Sie spezialisierte Agents, die der Vorgesetzte basierend auf den Aufgabenanforderungen dynamisch auswählen kann:

from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

researcher_agent = ChatAgent(
    name="ResearcherAgent",
    description="Specialist in research and information gathering",
    instructions=(
        "You are a Researcher. You find information without additional computation or quantitative analysis."
    ),
    # This agent requires the gpt-4o-search-preview model to perform web searches
    chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)

coder_agent = ChatAgent(
    name="CoderAgent",
    description="A helpful assistant that writes and executes code to process and analyze data.",
    instructions="You solve questions using code. Please provide detailed analysis and computation process.",
    chat_client=OpenAIResponsesClient(),
    tools=HostedCodeInterpreterTool(),
)

# Create a manager agent for orchestration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator that coordinates the research and coding workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    chat_client=OpenAIChatClient(),
)

Erstellen des Magentischen Workflows

Verwenden Sie MagenticBuilder, um den Workflow mit einem Standard-Manager zu konfigurieren.

from agent_framework import MagenticBuilder

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,  # Maximum collaboration rounds
        max_stall_count=3,   # Maximum rounds without progress
        max_reset_count=2,   # Maximum plan resets allowed
    )
    .build()
)

Ausführen des Workflows mit Ereignisstreaming

Führen Sie eine komplexe Aufgabe aus und behandeln Sie Ereignisse für Streaming-Ausgabe und Orchestrierungs-Updates:

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatMessage,
    WorkflowOutputEvent,
)

task = (
    "I am preparing a report on the energy efficiency of different machine learning model architectures. "
    "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
    "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
    "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
    "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
    "per task type (image classification, text classification, and text generation)."
)

# State for streaming callback
last_stream_agent_id: str | None = None
stream_line_open: bool = False
output: str | None = None

async for event in workflow.run_stream(task):
    if isinstance(event, AgentRunUpdateEvent):
        props = event.data.additional_properties if event.data else None
        event_type = props.get("magentic_event_type") if props else None

        if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
            # Manager's planning and coordination messages
            kind = props.get("orchestrator_message_kind", "") if props else ""
            text = event.data.text if event.data else ""
            print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")

        elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
            # Streaming tokens from agents
            agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
            if last_stream_agent_id != agent_id or not stream_line_open:
                if stream_line_open:
                    print()
                print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
                last_stream_agent_id = agent_id
                stream_line_open = True
            if event.data and event.data.text:
                print(event.data.text, end="", flush=True)

        elif event.data and event.data.text:
            print(event.data.text, end="", flush=True)

    elif isinstance(event, WorkflowOutputEvent):
        output_messages = cast(list[ChatMessage], event.data)
        if output_messages:
            output = output_messages[-1].text

if stream_line_open:
    print()

if output is not None:
    print(f"Workflow completed with result:\n\n{output}")

Erweitert: Überprüfung des Plans Human-in-the-Loop

Aktivieren Sie die menschliche Überprüfung und Genehmigung des Vorgesetztenplans vor der Ausführung:

Planüberprüfung konfigurieren

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    MagenticHumanInterventionDecision,
    MagenticHumanInterventionKind,
    MagenticHumanInterventionReply,
    MagenticHumanInterventionRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,
        max_stall_count=3,
        max_reset_count=2,
    )
    .with_plan_review()  # Enable plan review
    .build()
)

Behandeln von Planüberprüfungsanforderungen

pending_request: RequestInfoEvent | None = None
pending_responses: dict[str, MagenticHumanInterventionReply] | None = None
completed = False
workflow_output: str | None = None

while not completed:
    # Use streaming for both initial run and response sending
    if pending_responses is not None:
        stream = workflow.send_responses_streaming(pending_responses)
    else:
        stream = workflow.run_stream(task)

    async for event in stream:
        if isinstance(event, AgentRunUpdateEvent):
            # Handle streaming events as shown above
            pass
        elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
            request = cast(MagenticHumanInterventionRequest, event.data)
            if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
                pending_request = event
                if request.plan_text:
                    print(f"\n=== PLAN REVIEW REQUEST ===\n{request.plan_text}\n")
        elif isinstance(event, WorkflowOutputEvent):
            workflow_output = str(event.data) if event.data else None
            completed = True

    pending_responses = None

    # Handle pending plan review request
    if pending_request is not None:
        # Collect human decision (approve/reject/modify)
        # For demo, we auto-approve:
        reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE)

        # Or approve with comments:
        # reply = MagenticHumanInterventionReply(
        #     decision=MagenticHumanInterventionDecision.APPROVE,
        #     comments="Looks good, but prioritize efficiency metrics."
        # )

        # Or request revision:
        # reply = MagenticHumanInterventionReply(
        #     decision=MagenticHumanInterventionDecision.REVISE,
        #     comments="Please include a comparison with newer models like LLaMA."
        # )

        pending_responses = {pending_request.request_id: reply}
        pending_request = None

Erweitert: Agent-Klarstellung über die Toolgenehmigung

Agenten können während der Ausführung mit der Toolfreigabe Benutzern klärende Fragen stellen. Auf diese Weise können Human-in-the-Loop (HITL)-Interaktionen aktiviert werden, bei denen der Agent zusätzliche Informationen anfordern kann, bevor er fortfahren kann.

Tool definieren, Genehmigung erforderlich

from typing import Annotated
from agent_framework import ai_function

@ai_function(approval_mode="always_require")
def ask_user(question: Annotated[str, "The question to ask the user for clarification"]) -> str:
    """Ask the user a clarifying question to gather missing information.

    Use this tool when you need additional information from the user to complete
    your task effectively.
    """
    # This function body is a placeholder - the actual interaction happens via HITL.
    return f"User was asked: {question}"

Erstellen eines Agents mit dem Tool

onboarding_agent = ChatAgent(
    name="OnboardingAgent",
    description="HR specialist who handles employee onboarding",
    instructions=(
        "You are an HR Onboarding Specialist. Your job is to onboard new employees.\n\n"
        "IMPORTANT: When given an onboarding request, you MUST gather the following "
        "information before proceeding:\n"
        "1. Department (e.g., Engineering, Sales, Marketing)\n"
        "2. Role/Title (e.g., Software Engineer, Account Executive)\n\n"
        "Use the ask_user tool to request ANY missing information."
    ),
    chat_client=OpenAIChatClient(model_id="gpt-4o"),
    tools=[ask_user],
)

Behandeln von Toolgenehmigungsanforderungen

async for event in workflow.run_stream("Onboard Jessica Smith"):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)

        if req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL:
            print(f"Agent: {req.agent_id}")
            print(f"Question: {req.prompt}")

            # Get user's answer
            answer = input("> ").strip()

            # Send the answer back - it will be fed to the agent as the function result
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.APPROVE,
                response_text=answer,
            )
            pending_responses = {event.request_id: reply}

            # Continue workflow with response
            async for ev in workflow.send_responses_streaming(pending_responses):
                # Handle continuation events
                pass

Fortgeschritten: Menschliche Intervention am Stall

Aktivieren Sie den menschlichen Eingriff, wenn der Workflow erkennt, dass Agents keine Fortschritte machen:

Konfigurieren der Stallintervention

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, analyst=analyst_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,
        max_stall_count=1,  # Stall detection after 1 round without progress
        max_reset_count=2,
    )
    .with_human_input_on_stall()  # Request human input when stalled
    .build()
)

Bearbeitung von Stall-Interventionsanforderungen

async for event in workflow.run_stream(task):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)

        if req.kind == MagenticHumanInterventionKind.STALL:
            print(f"Workflow stalled after {req.stall_count} rounds")
            print(f"Reason: {req.stall_reason}")
            if req.plan_text:
                print(f"Current plan:\n{req.plan_text}")

            # Choose response: CONTINUE, REPLAN, or GUIDANCE
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.GUIDANCE,
                comments="Focus on completing the research step first before moving to analysis.",
            )
            pending_responses = {event.request_id: reply}

Wichtige Konzepte

  • Dynamische Koordination: Der Magentische Manager wählt dynamisch aus, welcher Agent basierend auf dem sich entwickelnden Kontext als nächstes handeln soll.
  • Iterative Verfeinerung: Das System kann komplexe Probleme zerlegen und Lösungen durch mehrere Runden iterativ verfeinern.
  • Fortschrittsverfolgung: Integrierte Mechanismen zum Erkennen von Stillständen und Zurücksetzen des Plans bei Bedarf
  • Flexible Zusammenarbeit: Agents können mehrmals in beliebiger Reihenfolge aufgerufen werden, wie vom Vorgesetzten festgelegt.
  • Menschliche Aufsicht: Optionale Human-in-the-Loop-Mechanismen einschließlich Planüberprüfung, Werkzeuggenehmigung und Intervention bei Störungen
  • Unified Event System: Verwenden Sie AgentRunUpdateEvent mit magentic_event_type, um Orchestrator- und Agent-Streamingereignisse zu bearbeiten

Ablauf der Workflow-Ausführung

Die Magentische Orchestrierung folgt diesem Ausführungsmuster:

  1. Planungsphase: Der Vorgesetzte analysiert die Aufgabe und erstellt einen anfänglichen Plan.
  2. Optionale Planüberprüfung: Wenn aktiviert, kann der Mensch den Plan überprüfen und genehmigen/ändern.
  3. Agentauswahl: Der Manager wählt den am besten geeigneten Agent für jeden Teilvorgang aus.
  4. Ausführung: Der ausgewählte Agent führt seinen Teil der Aufgabe aus.
  5. Statusbewertung: Der Vorgesetzte wertet den Fortschritt aus und aktualisiert den Plan.
  6. Stillstandserkennung: Wenn der Fortschritt stagniert, entweder automatisch neu planen oder menschliches Eingreifen anfordern
  7. Iteration: Die Schritte 3 bis 6 wiederholen, bis die Aufgabe abgeschlossen ist oder Grenzwerte erreicht sind
  8. Endgültige Synthese: Der Manager synthetisiert alle Agent-Ausgaben in ein Endergebnis.

Vollständiges Beispiel

Hier ist ein vollständiges Beispiel, das alle Konzepte zusammenbringt:

import asyncio
import logging
from typing import cast

from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    ChatMessage,
    HostedCodeInterpreterTool,
    MagenticBuilder,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)

async def main() -> None:
    # Define specialized agents
    researcher_agent = ChatAgent(
        name="ResearcherAgent",
        description="Specialist in research and information gathering",
        instructions=(
            "You are a Researcher. You find information without additional "
            "computation or quantitative analysis."
        ),
        chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
    )

    coder_agent = ChatAgent(
        name="CoderAgent",
        description="A helpful assistant that writes and executes code to process and analyze data.",
        instructions="You solve questions using code. Please provide detailed analysis and computation process.",
        chat_client=OpenAIResponsesClient(),
        tools=HostedCodeInterpreterTool(),
    )

    # Create a manager agent for orchestration
    manager_agent = ChatAgent(
        name="MagenticManager",
        description="Orchestrator that coordinates the research and coding workflow",
        instructions="You coordinate a team to complete complex tasks efficiently.",
        chat_client=OpenAIChatClient(),
    )

    # State for streaming output
    last_stream_agent_id: str | None = None
    stream_line_open: bool = False

    # Build the workflow
    print("\nBuilding Magentic Workflow...")

    workflow = (
        MagenticBuilder()
        .participants(researcher=researcher_agent, coder=coder_agent)
        .with_standard_manager(
            agent=manager_agent,
            max_round_count=10,
            max_stall_count=3,
            max_reset_count=2,
        )
        .build()
    )

    # Define the task
    task = (
        "I am preparing a report on the energy efficiency of different machine learning model architectures. "
        "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
        "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
        "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
        "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
        "per task type (image classification, text classification, and text generation)."
    )

    print(f"\nTask: {task}")
    print("\nStarting workflow execution...")

    # Run the workflow
    try:
        output: str | None = None
        async for event in workflow.run_stream(task):
            if isinstance(event, AgentRunUpdateEvent):
                props = event.data.additional_properties if event.data else None
                event_type = props.get("magentic_event_type") if props else None

                if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
                    kind = props.get("orchestrator_message_kind", "") if props else ""
                    text = event.data.text if event.data else ""
                    print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
                elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
                    agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
                    if last_stream_agent_id != agent_id or not stream_line_open:
                        if stream_line_open:
                            print()
                        print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
                        last_stream_agent_id = agent_id
                        stream_line_open = True
                    if event.data and event.data.text:
                        print(event.data.text, end="", flush=True)
                elif event.data and event.data.text:
                    print(event.data.text, end="", flush=True)
            elif isinstance(event, WorkflowOutputEvent):
                output_messages = cast(list[ChatMessage], event.data)
                if output_messages:
                    output = output_messages[-1].text

        if stream_line_open:
            print()

        if output is not None:
            print(f"Workflow completed with result:\n\n{output}")

    except Exception as e:
        print(f"Workflow execution failed: {e}")
        logger.exception("Workflow exception", exc_info=e)

if __name__ == "__main__":
    asyncio.run(main())

Konfigurationsoptionen

Managerparameter

  • max_round_count: Maximale Anzahl von Zusammenarbeitsrunden (Standard: 10)
  • max_stall_count: Maximale Runden ohne Fortschritt, bevor die Stillstandbehandlung ausgelöst wird (Standard: 3)
  • max_reset_count: Maximale Anzahl der zulässigen Planzurücksetzungen (Standard: 2)

Interventionsarten des Menschen

  • PLAN_REVIEW: Überprüfen und Genehmigen/Überarbeiten des ursprünglichen Plans
  • TOOL_APPROVAL: Genehmigen eines Tool-/Funktionsaufrufs (wird für die Agentklärung verwendet)
  • STALL: Workflow hat angehalten und benötigt Anleitungen

Entscheidungen über menschliche Interventionen

  • APPROVE: Akzeptieren Sie den Plan- oder Toolaufruf unverändert
  • REVISE: Überarbeitung mit Feedback anfordern (Planüberprüfung)
  • REJECT: Ablehnen/Verweigern (Toolgenehmigung)
  • CONTINUE: Weiter mit dem aktuellen Zustand (Stand)
  • REPLAN: Neuplanen auslösen (Stall)
  • GUIDANCE: Anleitungstext bereitstellen (Hemmung, Toolgenehmigung)

Ereignistypen

Ereignisse werden über AgentRunUpdateEvent mit Metadaten in additional_properties ausgegeben.

  • magentic_event_type: Entweder MAGENTIC_EVENT_TYPE_ORCHESTRATOR oder MAGENTIC_EVENT_TYPE_AGENT_DELTA
  • orchestrator_message_kind: Gibt für Orchestratorereignisse den Nachrichtentyp an (z. B. "Anweisung", "Hinweis", "task_ledger")
  • agent_id: Identifiziert für Agent-Delta-Ereignisse den Streaming-Agent.

Beispielausgabe

Demnächst...

Nächste Schritte