Udostępnij przez


Orkiestracje przepływów pracy programu Microsoft Agent Framework — Magentic

Orkiestracja magentyczna została zaprojektowana na podstawie systemu Magentic-One opracowanego przez AutoGen. Jest to elastyczny wzorzec wieloaplikowy ogólnego przeznaczenia przeznaczony dla złożonych, otwartych zadań wymagających dynamicznej współpracy. W tym wzorcu dedykowany menedżer magentyczny koordynuje zespół wyspecjalizowanych agentów, wybierając agenta, który powinien działać dalej na podstawie zmieniającego się kontekstu, postępu zadań i możliwości agenta.

Menedżer Magentic utrzymuje wspólny kontekst, śledzi postęp i dostosowuje przepływ pracy w czasie rzeczywistym. Dzięki temu system może rozdzielić złożone problemy, delegować podzadania i iteracyjnie uściślić rozwiązania za pośrednictwem współpracy agentów. Orkiestracja jest szczególnie odpowiednia dla scenariuszy, w których ścieżka rozwiązania nie jest znana z wyprzedzeniem i może wymagać wielu rund rozumowania, badań i obliczeń.

Orkiestracja magentyczna

Czego nauczysz się

  • Jak skonfigurować menedżera magentycznego w celu koordynowania wielu wyspecjalizowanych agentów
  • Jak obsługiwać zdarzenia przesyłania strumieniowego za pomocą polecenia AgentRunUpdateEvent
  • Jak wdrożyć przegląd planów z udziałem człowieka, zatwierdzanie narzędzi i interwencję zapobiegającą zatrzymaniu procesu
  • Jak śledzić współpracę i postęp agenta za pośrednictwem złożonych zadań

Definiowanie wyspecjalizowanych agentów

Wkrótce...

W orkiestracji magentycznej definiujesz wyspecjalizowanych agentów, których menedżer może dynamicznie wybierać na podstawie wymagań dotyczących zadań:

from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

researcher_agent = ChatAgent(
    name="ResearcherAgent",
    description="Specialist in research and information gathering",
    instructions=(
        "You are a Researcher. You find information without additional computation or quantitative analysis."
    ),
    # This agent requires the gpt-4o-search-preview model to perform web searches
    chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)

coder_agent = ChatAgent(
    name="CoderAgent",
    description="A helpful assistant that writes and executes code to process and analyze data.",
    instructions="You solve questions using code. Please provide detailed analysis and computation process.",
    chat_client=OpenAIResponsesClient(),
    tools=HostedCodeInterpreterTool(),
)

# Create a manager agent for orchestration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator that coordinates the research and coding workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    chat_client=OpenAIChatClient(),
)

Tworzenie przepływu pracy magentycznego

Użyj MagenticBuilder polecenia , aby skonfigurować przepływ pracy za pomocą menedżera standardowego:

from agent_framework import MagenticBuilder

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,  # Maximum collaboration rounds
        max_stall_count=3,   # Maximum rounds without progress
        max_reset_count=2,   # Maximum plan resets allowed
    )
    .build()
)

Uruchamianie przepływu pracy za pomocą przesyłania strumieniowego zdarzeń

Wykonaj złożone zadanie i obsłuż zdarzenia dla strumieniowego przesyłania danych wyjściowych i aktualizacji orkiestracji.

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatMessage,
    WorkflowOutputEvent,
)

task = (
    "I am preparing a report on the energy efficiency of different machine learning model architectures. "
    "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
    "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
    "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
    "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
    "per task type (image classification, text classification, and text generation)."
)

# State for streaming callback
last_stream_agent_id: str | None = None
stream_line_open: bool = False
output: str | None = None

async for event in workflow.run_stream(task):
    if isinstance(event, AgentRunUpdateEvent):
        props = event.data.additional_properties if event.data else None
        event_type = props.get("magentic_event_type") if props else None

        if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
            # Manager's planning and coordination messages
            kind = props.get("orchestrator_message_kind", "") if props else ""
            text = event.data.text if event.data else ""
            print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")

        elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
            # Streaming tokens from agents
            agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
            if last_stream_agent_id != agent_id or not stream_line_open:
                if stream_line_open:
                    print()
                print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
                last_stream_agent_id = agent_id
                stream_line_open = True
            if event.data and event.data.text:
                print(event.data.text, end="", flush=True)

        elif event.data and event.data.text:
            print(event.data.text, end="", flush=True)

    elif isinstance(event, WorkflowOutputEvent):
        output_messages = cast(list[ChatMessage], event.data)
        if output_messages:
            output = output_messages[-1].text

if stream_line_open:
    print()

if output is not None:
    print(f"Workflow completed with result:\n\n{output}")

Zaawansowane: Przegląd planu z udziałem człowieka

Włącz możliwość przeglądu i zatwierdzenia planu przez menedżera przed jego realizacją.

Konfigurowanie przeglądu planu

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    MagenticHumanInterventionDecision,
    MagenticHumanInterventionKind,
    MagenticHumanInterventionReply,
    MagenticHumanInterventionRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,
        max_stall_count=3,
        max_reset_count=2,
    )
    .with_plan_review()  # Enable plan review
    .build()
)

Obsługa żądań przeglądu planu

pending_request: RequestInfoEvent | None = None
pending_responses: dict[str, MagenticHumanInterventionReply] | None = None
completed = False
workflow_output: str | None = None

while not completed:
    # Use streaming for both initial run and response sending
    if pending_responses is not None:
        stream = workflow.send_responses_streaming(pending_responses)
    else:
        stream = workflow.run_stream(task)

    async for event in stream:
        if isinstance(event, AgentRunUpdateEvent):
            # Handle streaming events as shown above
            pass
        elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
            request = cast(MagenticHumanInterventionRequest, event.data)
            if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
                pending_request = event
                if request.plan_text:
                    print(f"\n=== PLAN REVIEW REQUEST ===\n{request.plan_text}\n")
        elif isinstance(event, WorkflowOutputEvent):
            workflow_output = str(event.data) if event.data else None
            completed = True

    pending_responses = None

    # Handle pending plan review request
    if pending_request is not None:
        # Collect human decision (approve/reject/modify)
        # For demo, we auto-approve:
        reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE)

        # Or approve with comments:
        # reply = MagenticHumanInterventionReply(
        #     decision=MagenticHumanInterventionDecision.APPROVE,
        #     comments="Looks good, but prioritize efficiency metrics."
        # )

        # Or request revision:
        # reply = MagenticHumanInterventionReply(
        #     decision=MagenticHumanInterventionDecision.REVISE,
        #     comments="Please include a comparison with newer models like LLaMA."
        # )

        pending_responses = {pending_request.request_id: reply}
        pending_request = None

Zaawansowane: Wyjaśnienie agenta przez zatwierdzenie narzędzia

Agenci mogą zadawać użytkownikom pytania wyjaśniające w trakcie realizacji z użyciem zatwierdzania narzędzi. Umożliwia to interakcję Human-in-the-Loop (HITL), w której agent może zażądać dodatkowych informacji przed kontynuowaniem.

Definiowanie narzędzia z wymaganym zatwierdzeniem

from typing import Annotated
from agent_framework import ai_function

@ai_function(approval_mode="always_require")
def ask_user(question: Annotated[str, "The question to ask the user for clarification"]) -> str:
    """Ask the user a clarifying question to gather missing information.

    Use this tool when you need additional information from the user to complete
    your task effectively.
    """
    # This function body is a placeholder - the actual interaction happens via HITL.
    return f"User was asked: {question}"

Tworzenie agenta za pomocą narzędzia

onboarding_agent = ChatAgent(
    name="OnboardingAgent",
    description="HR specialist who handles employee onboarding",
    instructions=(
        "You are an HR Onboarding Specialist. Your job is to onboard new employees.\n\n"
        "IMPORTANT: When given an onboarding request, you MUST gather the following "
        "information before proceeding:\n"
        "1. Department (e.g., Engineering, Sales, Marketing)\n"
        "2. Role/Title (e.g., Software Engineer, Account Executive)\n\n"
        "Use the ask_user tool to request ANY missing information."
    ),
    chat_client=OpenAIChatClient(model_id="gpt-4o"),
    tools=[ask_user],
)

Obsługa żądań zatwierdzania narzędzi

async for event in workflow.run_stream("Onboard Jessica Smith"):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)

        if req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL:
            print(f"Agent: {req.agent_id}")
            print(f"Question: {req.prompt}")

            # Get user's answer
            answer = input("> ").strip()

            # Send the answer back - it will be fed to the agent as the function result
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.APPROVE,
                response_text=answer,
            )
            pending_responses = {event.request_id: reply}

            # Continue workflow with response
            async for ev in workflow.send_responses_streaming(pending_responses):
                # Handle continuation events
                pass

Zaawansowane: Interwencja człowieka na straganie

Włącz interwencję człowieka, gdy przepływ pracy wykryje, że agenci nie robią postępów:

Konfigurowanie interwencji przeciwdziałającej przeciągnięciu

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, analyst=analyst_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,
        max_stall_count=1,  # Stall detection after 1 round without progress
        max_reset_count=2,
    )
    .with_human_input_on_stall()  # Request human input when stalled
    .build()
)

Obsługa żądań interwencji w przypadku zatrzymania

async for event in workflow.run_stream(task):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)

        if req.kind == MagenticHumanInterventionKind.STALL:
            print(f"Workflow stalled after {req.stall_count} rounds")
            print(f"Reason: {req.stall_reason}")
            if req.plan_text:
                print(f"Current plan:\n{req.plan_text}")

            # Choose response: CONTINUE, REPLAN, or GUIDANCE
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.GUIDANCE,
                comments="Focus on completing the research step first before moving to analysis.",
            )
            pending_responses = {event.request_id: reply}

Kluczowe pojęcia

  • Dynamiczna koordynacja: Menedżer magentyczny dynamicznie wybiera agenta, który powinien działać dalej w oparciu o zmieniający się kontekst
  • Uściślenie iteracyjne: system może podzielić złożone problemy i iteracyjnie uściślić rozwiązania za pomocą wielu rund
  • Śledzenie postępu: wbudowane mechanizmy wykrywania zatrzymań i resetowania planu w razie potrzeby
  • Elastyczna współpraca: agenci mogą być wywoływani wiele razy w dowolnej kolejności określonej przez menedżera
  • Nadzór ludzki: opcjonalne mechanizmy uwzględniające człowieka, w tym przegląd planu, zatwierdzanie narzędzi i interwencja w przypadku zatrzymania.
  • Ujednolicony system zdarzeń: służy do AgentRunUpdateEventmagentic_event_type obsługi zdarzeń orkiestratora i agenta przesyłania strumieniowego

Przepływ wykonywania zadań w przepływie pracy

Orkiestracja Magentic jest zgodna z tym wzorcem wykonania:

  1. Faza planowania: Menedżer analizuje zadanie i tworzy plan początkowy
  2. Przegląd planu opcjonalnego: w przypadku włączenia użytkownicy mogą przeglądać i zatwierdzać/modyfikować plan
  3. Wybór agenta: Menedżer wybiera najbardziej odpowiedniego agenta dla każdej podtaki
  4. Wykonanie: wybrany agent wykonuje część zadania
  5. Ocena postępu: menedżer ocenia postęp i aktualizuje plan
  6. Wykrywanie zatrzymania: jeśli postęp zostanie zatrzymany, automatycznie przeplanować lub zażądać interwencji człowieka
  7. Iteracja: kroki 3–6 powtarzają się do momentu zakończenia zadania lub osiągnięcia limitów
  8. Końcowa synteza: Menedżer syntetyzuje wszystkie dane wyjściowe agenta do końcowego wyniku

Kompletny przykład

Oto pełny przykład, który łączy wszystkie koncepcje:

import asyncio
import logging
from typing import cast

from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    ChatMessage,
    HostedCodeInterpreterTool,
    MagenticBuilder,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)

async def main() -> None:
    # Define specialized agents
    researcher_agent = ChatAgent(
        name="ResearcherAgent",
        description="Specialist in research and information gathering",
        instructions=(
            "You are a Researcher. You find information without additional "
            "computation or quantitative analysis."
        ),
        chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
    )

    coder_agent = ChatAgent(
        name="CoderAgent",
        description="A helpful assistant that writes and executes code to process and analyze data.",
        instructions="You solve questions using code. Please provide detailed analysis and computation process.",
        chat_client=OpenAIResponsesClient(),
        tools=HostedCodeInterpreterTool(),
    )

    # Create a manager agent for orchestration
    manager_agent = ChatAgent(
        name="MagenticManager",
        description="Orchestrator that coordinates the research and coding workflow",
        instructions="You coordinate a team to complete complex tasks efficiently.",
        chat_client=OpenAIChatClient(),
    )

    # State for streaming output
    last_stream_agent_id: str | None = None
    stream_line_open: bool = False

    # Build the workflow
    print("\nBuilding Magentic Workflow...")

    workflow = (
        MagenticBuilder()
        .participants(researcher=researcher_agent, coder=coder_agent)
        .with_standard_manager(
            agent=manager_agent,
            max_round_count=10,
            max_stall_count=3,
            max_reset_count=2,
        )
        .build()
    )

    # Define the task
    task = (
        "I am preparing a report on the energy efficiency of different machine learning model architectures. "
        "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
        "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
        "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
        "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
        "per task type (image classification, text classification, and text generation)."
    )

    print(f"\nTask: {task}")
    print("\nStarting workflow execution...")

    # Run the workflow
    try:
        output: str | None = None
        async for event in workflow.run_stream(task):
            if isinstance(event, AgentRunUpdateEvent):
                props = event.data.additional_properties if event.data else None
                event_type = props.get("magentic_event_type") if props else None

                if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
                    kind = props.get("orchestrator_message_kind", "") if props else ""
                    text = event.data.text if event.data else ""
                    print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
                elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
                    agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
                    if last_stream_agent_id != agent_id or not stream_line_open:
                        if stream_line_open:
                            print()
                        print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
                        last_stream_agent_id = agent_id
                        stream_line_open = True
                    if event.data and event.data.text:
                        print(event.data.text, end="", flush=True)
                elif event.data and event.data.text:
                    print(event.data.text, end="", flush=True)
            elif isinstance(event, WorkflowOutputEvent):
                output_messages = cast(list[ChatMessage], event.data)
                if output_messages:
                    output = output_messages[-1].text

        if stream_line_open:
            print()

        if output is not None:
            print(f"Workflow completed with result:\n\n{output}")

    except Exception as e:
        print(f"Workflow execution failed: {e}")
        logger.exception("Workflow exception", exc_info=e)

if __name__ == "__main__":
    asyncio.run(main())

Opcje konfiguracji

Parametry menedżera

  • max_round_count: Maksymalna liczba rund współpracy (wartość domyślna: 10)
  • max_stall_count: Maksymalna liczba rund bez postępu przed uruchomieniem obsługiwaniem zastoju (ustawienie domyślne: 3)
  • max_reset_count: Dozwolona maksymalna liczba zresetowań planu (wartość domyślna: 2)

Rodzaje interwencji człowieka

  • PLAN_REVIEW: Przeglądanie i zatwierdzanie/poprawianie początkowego planu
  • TOOL_APPROVAL: Zatwierdź wywołanie narzędzia/funkcji (dla wyjaśnienia agenta)
  • STALL: Przepływ pracy został zatrzymany i wymaga wskazówek

Decyzje dotyczące interwencji człowieka

  • APPROVE: Zaakceptuj wywołanie planu lub narzędzia bez zmian
  • REVISE: Żądanie poprawki z opiniami (przegląd planu)
  • REJECT: Odrzuć/odmów (zatwierdzenie narzędzia)
  • CONTINUE: Kontynuuj z bieżącym stanem (zawieszenie)
  • REPLAN: Wyzwalanie ponownego planowania (zatrzymanie)
  • GUIDANCE: Podaj tekst wskazówek (zatrzymanie, zatwierdzanie narzędzi)

Typy zdarzeń

Zdarzenia są emitowane przez AgentRunUpdateEvent z metadanymi w additional_properties.

  • magentic_event_type MAGENTIC_EVENT_TYPE_ORCHESTRATOR: albo lubMAGENTIC_EVENT_TYPE_AGENT_DELTA
  • orchestrator_message_kind: W przypadku zdarzeń orkiestratora wskazuje typ komunikatu (np. "instrukcja", "powiadomienie", "task_ledger")
  • agent_id: w przypadku zdarzeń różnicowych agenta identyfikuje agenta przesyłania strumieniowego

Przykładowe dane wyjściowe

Wkrótce...

Dalsze kroki