Compartir a través de


Orquestaciones de flujos de trabajo de Microsoft Agent Framework: Magentic

La orquestación magentic está diseñada basada en el sistema Magentic-One inventado por AutoGen. Es un patrón multiagente flexible y de uso general diseñado para tareas complejas y abiertas que requieren colaboración dinámica. En este patrón, un administrador de Magentic dedicado coordina un equipo de agentes especializados, seleccionando qué agente debe actuar a continuación en función del contexto en constante evolución, el progreso de las tareas y las funcionalidades del agente.

El administrador de Magentic mantiene un contexto compartido, realiza un seguimiento del progreso y adapta el flujo de trabajo en tiempo real. Esto permite al sistema desglosar problemas complejos, delegar subtareas y refinar soluciones de forma iterativa a través de la colaboración del agente. La orquestación es especialmente adecuada para escenarios en los que la ruta de solución no se conoce de antemano y puede requerir varias rondas de razonamiento, investigación y cálculo.

Orquestación Magnética

Temas que se abordarán

  • Cómo configurar un administrador magentic para coordinar varios agentes especializados
  • Cómo controlar eventos de streaming con AgentRunUpdateEvent
  • Cómo implementar la revisión de planes con humano en el bucle, la aprobación de herramientas y la intervención en caso de interrupción.
  • Cómo realizar un seguimiento de la colaboración del agente y el progreso a través de tareas complejas

Definir los agentes especializados

Próximamente...

En la orquestación magentic, se definen agentes especializados que el administrador puede seleccionar dinámicamente en función de los requisitos de la tarea:

from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

researcher_agent = ChatAgent(
    name="ResearcherAgent",
    description="Specialist in research and information gathering",
    instructions=(
        "You are a Researcher. You find information without additional computation or quantitative analysis."
    ),
    # This agent requires the gpt-4o-search-preview model to perform web searches
    chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)

coder_agent = ChatAgent(
    name="CoderAgent",
    description="A helpful assistant that writes and executes code to process and analyze data.",
    instructions="You solve questions using code. Please provide detailed analysis and computation process.",
    chat_client=OpenAIResponsesClient(),
    tools=HostedCodeInterpreterTool(),
)

# Create a manager agent for orchestration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator that coordinates the research and coding workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    chat_client=OpenAIChatClient(),
)

Construir el flujo de trabajo magnético

Use MagenticBuilder para configurar el flujo de trabajo con un administrador estándar:

from agent_framework import MagenticBuilder

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,  # Maximum collaboration rounds
        max_stall_count=3,   # Maximum rounds without progress
        max_reset_count=2,   # Maximum plan resets allowed
    )
    .build()
)

Ejecución del flujo de trabajo con streaming de eventos

Ejecute una tarea compleja y maneje eventos para la salida de streaming y actualizaciones de la orquestación.

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatMessage,
    WorkflowOutputEvent,
)

task = (
    "I am preparing a report on the energy efficiency of different machine learning model architectures. "
    "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
    "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
    "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
    "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
    "per task type (image classification, text classification, and text generation)."
)

# State for streaming callback
last_stream_agent_id: str | None = None
stream_line_open: bool = False
output: str | None = None

async for event in workflow.run_stream(task):
    if isinstance(event, AgentRunUpdateEvent):
        props = event.data.additional_properties if event.data else None
        event_type = props.get("magentic_event_type") if props else None

        if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
            # Manager's planning and coordination messages
            kind = props.get("orchestrator_message_kind", "") if props else ""
            text = event.data.text if event.data else ""
            print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")

        elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
            # Streaming tokens from agents
            agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
            if last_stream_agent_id != agent_id or not stream_line_open:
                if stream_line_open:
                    print()
                print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
                last_stream_agent_id = agent_id
                stream_line_open = True
            if event.data and event.data.text:
                print(event.data.text, end="", flush=True)

        elif event.data and event.data.text:
            print(event.data.text, end="", flush=True)

    elif isinstance(event, WorkflowOutputEvent):
        output_messages = cast(list[ChatMessage], event.data)
        if output_messages:
            output = output_messages[-1].text

if stream_line_open:
    print()

if output is not None:
    print(f"Workflow completed with result:\n\n{output}")

Avanzado: Revisión del plan de human-in-the-loop

Habilite la revisión y aprobación humanas del plan del administrador antes de la ejecución:

Configurar revisión del plan

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    MagenticHumanInterventionDecision,
    MagenticHumanInterventionKind,
    MagenticHumanInterventionReply,
    MagenticHumanInterventionRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,
        max_stall_count=3,
        max_reset_count=2,
    )
    .with_plan_review()  # Enable plan review
    .build()
)

Gestionar solicitudes de revisión de plan

pending_request: RequestInfoEvent | None = None
pending_responses: dict[str, MagenticHumanInterventionReply] | None = None
completed = False
workflow_output: str | None = None

while not completed:
    # Use streaming for both initial run and response sending
    if pending_responses is not None:
        stream = workflow.send_responses_streaming(pending_responses)
    else:
        stream = workflow.run_stream(task)

    async for event in stream:
        if isinstance(event, AgentRunUpdateEvent):
            # Handle streaming events as shown above
            pass
        elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
            request = cast(MagenticHumanInterventionRequest, event.data)
            if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
                pending_request = event
                if request.plan_text:
                    print(f"\n=== PLAN REVIEW REQUEST ===\n{request.plan_text}\n")
        elif isinstance(event, WorkflowOutputEvent):
            workflow_output = str(event.data) if event.data else None
            completed = True

    pending_responses = None

    # Handle pending plan review request
    if pending_request is not None:
        # Collect human decision (approve/reject/modify)
        # For demo, we auto-approve:
        reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE)

        # Or approve with comments:
        # reply = MagenticHumanInterventionReply(
        #     decision=MagenticHumanInterventionDecision.APPROVE,
        #     comments="Looks good, but prioritize efficiency metrics."
        # )

        # Or request revision:
        # reply = MagenticHumanInterventionReply(
        #     decision=MagenticHumanInterventionDecision.REVISE,
        #     comments="Please include a comparison with newer models like LLaMA."
        # )

        pending_responses = {pending_request.request_id: reply}
        pending_request = None

Avanzado: Clarificación del agente mediante la aprobación de herramientas

Los agentes pueden formular preguntas aclarantes a los usuarios durante la ejecución mediante la aprobación de herramientas. Esto habilita las interacciones de Human-in-the-Loop (HITL) en las que el agente puede solicitar información adicional antes de continuar.

Definición de una herramienta con aprobación requerida

from typing import Annotated
from agent_framework import ai_function

@ai_function(approval_mode="always_require")
def ask_user(question: Annotated[str, "The question to ask the user for clarification"]) -> str:
    """Ask the user a clarifying question to gather missing information.

    Use this tool when you need additional information from the user to complete
    your task effectively.
    """
    # This function body is a placeholder - the actual interaction happens via HITL.
    return f"User was asked: {question}"

Creación de un agente con la herramienta

onboarding_agent = ChatAgent(
    name="OnboardingAgent",
    description="HR specialist who handles employee onboarding",
    instructions=(
        "You are an HR Onboarding Specialist. Your job is to onboard new employees.\n\n"
        "IMPORTANT: When given an onboarding request, you MUST gather the following "
        "information before proceeding:\n"
        "1. Department (e.g., Engineering, Sales, Marketing)\n"
        "2. Role/Title (e.g., Software Engineer, Account Executive)\n\n"
        "Use the ask_user tool to request ANY missing information."
    ),
    chat_client=OpenAIChatClient(model_id="gpt-4o"),
    tools=[ask_user],
)

Gestionar solicitudes de aprobación de herramientas

async for event in workflow.run_stream("Onboard Jessica Smith"):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)

        if req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL:
            print(f"Agent: {req.agent_id}")
            print(f"Question: {req.prompt}")

            # Get user's answer
            answer = input("> ").strip()

            # Send the answer back - it will be fed to the agent as the function result
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.APPROVE,
                response_text=answer,
            )
            pending_responses = {event.request_id: reply}

            # Continue workflow with response
            async for ev in workflow.send_responses_streaming(pending_responses):
                # Handle continuation events
                pass

Avanzado: Intervención humana en el puesto

Habilite la intervención humana cuando el flujo de trabajo detecte que los agentes no están progresando:

Configurar la intervención del puesto

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, analyst=analyst_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,
        max_stall_count=1,  # Stall detection after 1 round without progress
        max_reset_count=2,
    )
    .with_human_input_on_stall()  # Request human input when stalled
    .build()
)

Manejar solicitudes de intervención por estancamiento

async for event in workflow.run_stream(task):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)

        if req.kind == MagenticHumanInterventionKind.STALL:
            print(f"Workflow stalled after {req.stall_count} rounds")
            print(f"Reason: {req.stall_reason}")
            if req.plan_text:
                print(f"Current plan:\n{req.plan_text}")

            # Choose response: CONTINUE, REPLAN, or GUIDANCE
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.GUIDANCE,
                comments="Focus on completing the research step first before moving to analysis.",
            )
            pending_responses = {event.request_id: reply}

Conceptos clave

  • Coordinación dinámica: el administrador magentic selecciona dinámicamente qué agente debe actuar a continuación en función del contexto en evolución.
  • Refinamiento iterativo: el sistema puede desglosar problemas complejos y refinar soluciones iterativamente a través de varias rondas
  • Seguimiento de progreso: mecanismos integrados para detectar paradas y restablecer el plan si es necesario
  • Colaboración flexible: los agentes se pueden llamar varias veces en cualquier orden determinado por el administrador.
  • Supervisión humana: mecanismos opcionales de intervención humana, incluida la revisión del plan, la aprobación de herramientas y la intervención para la detención
  • Sistema de eventos unificado: use AgentRunUpdateEvent con magentic_event_type para controlar los eventos de streaming del orquestador y del agente.

Flujo de ejecución del flujo de trabajo

La orquestación magnética sigue este patrón de ejecución:

  1. Fase de planeación: el administrador analiza la tarea y crea un plan inicial
  2. Revisión opcional del plan: si está habilitada, los seres humanos pueden revisar y aprobar o modificar el plan.
  3. Selección del agente: el administrador selecciona el agente más adecuado para cada subtarea.
  4. Ejecución: el agente seleccionado ejecuta su parte de la tarea.
  5. Evaluación del progreso: el administrador evalúa el progreso y actualiza el plan.
  6. Detección de puestos: si el progreso se detiene, vuelva a planeación automática o solicite la intervención humana.
  7. Iteración: pasos 3-6 repetir hasta que se complete la tarea o se alcancen los límites
  8. Síntesis final: el administrador sintetiza todas las salidas del agente en un resultado final

Ejemplo completo

Este es un ejemplo completo que reúne todos los conceptos:

import asyncio
import logging
from typing import cast

from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    ChatMessage,
    HostedCodeInterpreterTool,
    MagenticBuilder,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)

async def main() -> None:
    # Define specialized agents
    researcher_agent = ChatAgent(
        name="ResearcherAgent",
        description="Specialist in research and information gathering",
        instructions=(
            "You are a Researcher. You find information without additional "
            "computation or quantitative analysis."
        ),
        chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
    )

    coder_agent = ChatAgent(
        name="CoderAgent",
        description="A helpful assistant that writes and executes code to process and analyze data.",
        instructions="You solve questions using code. Please provide detailed analysis and computation process.",
        chat_client=OpenAIResponsesClient(),
        tools=HostedCodeInterpreterTool(),
    )

    # Create a manager agent for orchestration
    manager_agent = ChatAgent(
        name="MagenticManager",
        description="Orchestrator that coordinates the research and coding workflow",
        instructions="You coordinate a team to complete complex tasks efficiently.",
        chat_client=OpenAIChatClient(),
    )

    # State for streaming output
    last_stream_agent_id: str | None = None
    stream_line_open: bool = False

    # Build the workflow
    print("\nBuilding Magentic Workflow...")

    workflow = (
        MagenticBuilder()
        .participants(researcher=researcher_agent, coder=coder_agent)
        .with_standard_manager(
            agent=manager_agent,
            max_round_count=10,
            max_stall_count=3,
            max_reset_count=2,
        )
        .build()
    )

    # Define the task
    task = (
        "I am preparing a report on the energy efficiency of different machine learning model architectures. "
        "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
        "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
        "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
        "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
        "per task type (image classification, text classification, and text generation)."
    )

    print(f"\nTask: {task}")
    print("\nStarting workflow execution...")

    # Run the workflow
    try:
        output: str | None = None
        async for event in workflow.run_stream(task):
            if isinstance(event, AgentRunUpdateEvent):
                props = event.data.additional_properties if event.data else None
                event_type = props.get("magentic_event_type") if props else None

                if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
                    kind = props.get("orchestrator_message_kind", "") if props else ""
                    text = event.data.text if event.data else ""
                    print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
                elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
                    agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
                    if last_stream_agent_id != agent_id or not stream_line_open:
                        if stream_line_open:
                            print()
                        print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
                        last_stream_agent_id = agent_id
                        stream_line_open = True
                    if event.data and event.data.text:
                        print(event.data.text, end="", flush=True)
                elif event.data and event.data.text:
                    print(event.data.text, end="", flush=True)
            elif isinstance(event, WorkflowOutputEvent):
                output_messages = cast(list[ChatMessage], event.data)
                if output_messages:
                    output = output_messages[-1].text

        if stream_line_open:
            print()

        if output is not None:
            print(f"Workflow completed with result:\n\n{output}")

    except Exception as e:
        print(f"Workflow execution failed: {e}")
        logger.exception("Workflow exception", exc_info=e)

if __name__ == "__main__":
    asyncio.run(main())

Opciones de configuración

Parámetros del administrador

  • max_round_count: número máximo de rondas de colaboración (valor predeterminado: 10)
  • max_stall_count: número máximo de rondas sin progreso antes de desencadenar el control del bloqueo (valor predeterminado: 3)
  • max_reset_count: número máximo de restablecimientos de plan permitidos (valor predeterminado: 2)

Tipos de intervención humana

  • PLAN_REVIEW: revisar y aprobar o revisar el plan inicial
  • TOOL_APPROVAL: Aprobar el uso de una herramienta o función (se usa para la aclaración del agente)
  • STALL: el flujo de trabajo se ha detenido y necesita instrucciones

Decisiones de intervención humana

  • APPROVE: acepte el plan o la llamada de herramienta tal cual
  • REVISE: solicitar revisión con comentarios (revisión del plan)
  • REJECT: rechazar/denegar (aprobación de herramientas)
  • CONTINUE: continuar con el estado actual (parada)
  • REPLAN: desencadenar replanificación (bloqueo)
  • GUIDANCE: proporcionar texto de guía (parada, aprobación de herramientas)

Tipos de eventos

Los eventos se emiten a través de AgentRunUpdateEvent con metadatos en additional_properties:

  • magentic_event_type: o MAGENTIC_EVENT_TYPE_ORCHESTRATORMAGENTIC_EVENT_TYPE_AGENT_DELTA
  • orchestrator_message_kind: para los eventos de orquestador, indica el tipo de mensaje (por ejemplo, "instrucción", "aviso", "task_ledger")
  • agent_id: para los eventos delta del agente, identifica el agente de streaming.

Salida de ejemplo

Próximamente...

Pasos siguientes