Partilhar via


Orquestrações de fluxos de trabalho do Microsoft Agent Framework - Magentic

A orquestração Magentic é projetada com base no sistema Magentic-One inventado pela AutoGen. É um padrão multiagente flexível e de uso geral projetado para tarefas complexas e abertas que exigem colaboração dinâmica. Nesse padrão, um gerente Magentic dedicado coordena uma equipe de agentes especializados, selecionando qual agente deve agir em seguida com base no contexto em evolução, no progresso da tarefa e nas capacidades do agente.

O gerente Magentic mantém um contexto compartilhado, acompanha o progresso e adapta o fluxo de trabalho em tempo real. Isso permite que o sistema quebre problemas complexos, delegue subtarefas e refine iterativamente soluções por meio da colaboração de agentes. A orquestração é especialmente adequada para cenários em que o caminho da solução não é conhecido com antecedência e pode exigir várias rodadas de raciocínio, pesquisa e computação.

Orquestração Magentica

O que você vai aprender

  • Como configurar um gerente Magentic para coordenar vários agentes especializados
  • Como gerir eventos de streaming com AgentRunUpdateEvent
  • Como implementar a revisão de planos com intervenção humana, aprovação de ferramentas e intervenção em caso de bloqueios
  • Como acompanhar a colaboração do agente e o progresso em tarefas complexas

Defina seus agentes especializados

Brevemente...

Na orquestração Magentic, você define agentes especializados que o gerente pode selecionar dinamicamente com base nos requisitos da tarefa:

from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

researcher_agent = ChatAgent(
    name="ResearcherAgent",
    description="Specialist in research and information gathering",
    instructions=(
        "You are a Researcher. You find information without additional computation or quantitative analysis."
    ),
    # This agent requires the gpt-4o-search-preview model to perform web searches
    chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)

coder_agent = ChatAgent(
    name="CoderAgent",
    description="A helpful assistant that writes and executes code to process and analyze data.",
    instructions="You solve questions using code. Please provide detailed analysis and computation process.",
    chat_client=OpenAIResponsesClient(),
    tools=HostedCodeInterpreterTool(),
)

# Create a manager agent for orchestration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator that coordinates the research and coding workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    chat_client=OpenAIChatClient(),
)

Crie o fluxo de trabalho Magentic

Use MagenticBuilder para configurar o fluxo de trabalho com um gerenciador padrão:

from agent_framework import MagenticBuilder

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,  # Maximum collaboration rounds
        max_stall_count=3,   # Maximum rounds without progress
        max_reset_count=2,   # Maximum plan resets allowed
    )
    .build()
)

Execute o Fluxo de Trabalho com Streaming de Eventos

Realize uma tarefa complexa e faça a gestão de eventos para saída de streaming e atualizações de orquestração.

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatMessage,
    WorkflowOutputEvent,
)

task = (
    "I am preparing a report on the energy efficiency of different machine learning model architectures. "
    "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
    "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
    "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
    "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
    "per task type (image classification, text classification, and text generation)."
)

# State for streaming callback
last_stream_agent_id: str | None = None
stream_line_open: bool = False
output: str | None = None

async for event in workflow.run_stream(task):
    if isinstance(event, AgentRunUpdateEvent):
        props = event.data.additional_properties if event.data else None
        event_type = props.get("magentic_event_type") if props else None

        if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
            # Manager's planning and coordination messages
            kind = props.get("orchestrator_message_kind", "") if props else ""
            text = event.data.text if event.data else ""
            print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")

        elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
            # Streaming tokens from agents
            agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
            if last_stream_agent_id != agent_id or not stream_line_open:
                if stream_line_open:
                    print()
                print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
                last_stream_agent_id = agent_id
                stream_line_open = True
            if event.data and event.data.text:
                print(event.data.text, end="", flush=True)

        elif event.data and event.data.text:
            print(event.data.text, end="", flush=True)

    elif isinstance(event, WorkflowOutputEvent):
        output_messages = cast(list[ChatMessage], event.data)
        if output_messages:
            output = output_messages[-1].text

if stream_line_open:
    print()

if output is not None:
    print(f"Workflow completed with result:\n\n{output}")

Avançado: Revisão do plano Human-in-the-Loop

Permitir a revisão humana e aprovação do plano do gerente antes da execução:

Configurar revisão do plano

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    MagenticHumanInterventionDecision,
    MagenticHumanInterventionKind,
    MagenticHumanInterventionReply,
    MagenticHumanInterventionRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, coder=coder_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,
        max_stall_count=3,
        max_reset_count=2,
    )
    .with_plan_review()  # Enable plan review
    .build()
)

Lidar com solicitações de revisão de plano

pending_request: RequestInfoEvent | None = None
pending_responses: dict[str, MagenticHumanInterventionReply] | None = None
completed = False
workflow_output: str | None = None

while not completed:
    # Use streaming for both initial run and response sending
    if pending_responses is not None:
        stream = workflow.send_responses_streaming(pending_responses)
    else:
        stream = workflow.run_stream(task)

    async for event in stream:
        if isinstance(event, AgentRunUpdateEvent):
            # Handle streaming events as shown above
            pass
        elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
            request = cast(MagenticHumanInterventionRequest, event.data)
            if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
                pending_request = event
                if request.plan_text:
                    print(f"\n=== PLAN REVIEW REQUEST ===\n{request.plan_text}\n")
        elif isinstance(event, WorkflowOutputEvent):
            workflow_output = str(event.data) if event.data else None
            completed = True

    pending_responses = None

    # Handle pending plan review request
    if pending_request is not None:
        # Collect human decision (approve/reject/modify)
        # For demo, we auto-approve:
        reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE)

        # Or approve with comments:
        # reply = MagenticHumanInterventionReply(
        #     decision=MagenticHumanInterventionDecision.APPROVE,
        #     comments="Looks good, but prioritize efficiency metrics."
        # )

        # Or request revision:
        # reply = MagenticHumanInterventionReply(
        #     decision=MagenticHumanInterventionDecision.REVISE,
        #     comments="Please include a comparison with newer models like LLaMA."
        # )

        pending_responses = {pending_request.request_id: reply}
        pending_request = None

Avançado: Esclarecimento do Agente através da Aprovação da Ferramenta

Os agentes podem colocar perguntas esclarecedoras aos utilizadores durante a execução, utilizando a aprovação da ferramenta. Isto permite interações Human-in-the-Loop (HITL), onde o agente pode solicitar informações adicionais antes de prosseguir.

Defina uma Ferramenta com Aprovação Necessária

from typing import Annotated
from agent_framework import ai_function

@ai_function(approval_mode="always_require")
def ask_user(question: Annotated[str, "The question to ask the user for clarification"]) -> str:
    """Ask the user a clarifying question to gather missing information.

    Use this tool when you need additional information from the user to complete
    your task effectively.
    """
    # This function body is a placeholder - the actual interaction happens via HITL.
    return f"User was asked: {question}"

Crie um Agente com a Ferramenta

onboarding_agent = ChatAgent(
    name="OnboardingAgent",
    description="HR specialist who handles employee onboarding",
    instructions=(
        "You are an HR Onboarding Specialist. Your job is to onboard new employees.\n\n"
        "IMPORTANT: When given an onboarding request, you MUST gather the following "
        "information before proceeding:\n"
        "1. Department (e.g., Engineering, Sales, Marketing)\n"
        "2. Role/Title (e.g., Software Engineer, Account Executive)\n\n"
        "Use the ask_user tool to request ANY missing information."
    ),
    chat_client=OpenAIChatClient(model_id="gpt-4o"),
    tools=[ask_user],
)

Gerir Pedidos de Aprovação de Ferramentas

async for event in workflow.run_stream("Onboard Jessica Smith"):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)

        if req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL:
            print(f"Agent: {req.agent_id}")
            print(f"Question: {req.prompt}")

            # Get user's answer
            answer = input("> ").strip()

            # Send the answer back - it will be fed to the agent as the function result
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.APPROVE,
                response_text=answer,
            )
            pending_responses = {event.request_id: reply}

            # Continue workflow with response
            async for ev in workflow.send_responses_streaming(pending_responses):
                # Handle continuation events
                pass

Avançado: Intervenção Humana na Estagem

Permitir a intervenção humana quando o fluxo de trabalho detetar que os agentes não estão a fazer progressos:

Configurar a Intervenção em Estol

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher_agent, analyst=analyst_agent)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=10,
        max_stall_count=1,  # Stall detection after 1 round without progress
        max_reset_count=2,
    )
    .with_human_input_on_stall()  # Request human input when stalled
    .build()
)

Lidar com Pedidos de Intervenção em Interrupções

async for event in workflow.run_stream(task):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)

        if req.kind == MagenticHumanInterventionKind.STALL:
            print(f"Workflow stalled after {req.stall_count} rounds")
            print(f"Reason: {req.stall_reason}")
            if req.plan_text:
                print(f"Current plan:\n{req.plan_text}")

            # Choose response: CONTINUE, REPLAN, or GUIDANCE
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.GUIDANCE,
                comments="Focus on completing the research step first before moving to analysis.",
            )
            pending_responses = {event.request_id: reply}

Conceitos-chave

  • Coordenação Dinâmica: O gerente Magentic seleciona dinamicamente qual agente deve agir em seguida com base no contexto em evolução
  • Refinamento iterativo: O sistema pode decompor problemas complexos e refinar iterativamente soluções através de várias rodadas
  • Acompanhamento do progresso: mecanismos integrados para detetar interrupções e redefinir o plano, se necessário
  • Colaboração flexível: os agentes podem ser chamados várias vezes em qualquer ordem, conforme determinado pelo gerente
  • Supervisão Humana: Mecanismos opcionais de intervenção do ser humano no ciclo de operações, incluindo revisão de planos, aprovação de ferramentas e intervenção em caso de interrupção
  • Sistema Unificado de Eventos: Usar AgentRunUpdateEvent com magentic_event_type para gerir eventos de streaming de orquestradores e agentes

Fluxo de execução do trabalho

A orquestração Magentic segue este padrão de execução:

  1. Fase de planejamento: O gerente analisa a tarefa e cria um plano inicial
  2. Revisão Opcional do Plano: Se ativado, os humanos podem rever e aprovar/modificar o plano
  3. Seleção do agente: o gerente seleciona o agente mais apropriado para cada subtarefa
  4. Execução: O agente selecionado executa sua parte da tarefa
  5. Avaliação de progresso: O gestor avalia o progresso e atualiza o plano
  6. Deteção de Estalo: Se o progresso estagnar, pode replanear automaticamente ou pedir intervenção humana
  7. Iteração: Os passos 3-6 repetem-se até a tarefa estar concluída ou até serem atingidos limites
  8. Síntese Final: O gerente sintetiza todas as saídas do agente em um resultado final

Exemplo completo

Aqui está um exemplo completo que reúne todos os conceitos:

import asyncio
import logging
from typing import cast

from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    ChatMessage,
    HostedCodeInterpreterTool,
    MagenticBuilder,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient

logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)

async def main() -> None:
    # Define specialized agents
    researcher_agent = ChatAgent(
        name="ResearcherAgent",
        description="Specialist in research and information gathering",
        instructions=(
            "You are a Researcher. You find information without additional "
            "computation or quantitative analysis."
        ),
        chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
    )

    coder_agent = ChatAgent(
        name="CoderAgent",
        description="A helpful assistant that writes and executes code to process and analyze data.",
        instructions="You solve questions using code. Please provide detailed analysis and computation process.",
        chat_client=OpenAIResponsesClient(),
        tools=HostedCodeInterpreterTool(),
    )

    # Create a manager agent for orchestration
    manager_agent = ChatAgent(
        name="MagenticManager",
        description="Orchestrator that coordinates the research and coding workflow",
        instructions="You coordinate a team to complete complex tasks efficiently.",
        chat_client=OpenAIChatClient(),
    )

    # State for streaming output
    last_stream_agent_id: str | None = None
    stream_line_open: bool = False

    # Build the workflow
    print("\nBuilding Magentic Workflow...")

    workflow = (
        MagenticBuilder()
        .participants(researcher=researcher_agent, coder=coder_agent)
        .with_standard_manager(
            agent=manager_agent,
            max_round_count=10,
            max_stall_count=3,
            max_reset_count=2,
        )
        .build()
    )

    # Define the task
    task = (
        "I am preparing a report on the energy efficiency of different machine learning model architectures. "
        "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
        "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
        "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
        "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
        "per task type (image classification, text classification, and text generation)."
    )

    print(f"\nTask: {task}")
    print("\nStarting workflow execution...")

    # Run the workflow
    try:
        output: str | None = None
        async for event in workflow.run_stream(task):
            if isinstance(event, AgentRunUpdateEvent):
                props = event.data.additional_properties if event.data else None
                event_type = props.get("magentic_event_type") if props else None

                if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
                    kind = props.get("orchestrator_message_kind", "") if props else ""
                    text = event.data.text if event.data else ""
                    print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
                elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
                    agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
                    if last_stream_agent_id != agent_id or not stream_line_open:
                        if stream_line_open:
                            print()
                        print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
                        last_stream_agent_id = agent_id
                        stream_line_open = True
                    if event.data and event.data.text:
                        print(event.data.text, end="", flush=True)
                elif event.data and event.data.text:
                    print(event.data.text, end="", flush=True)
            elif isinstance(event, WorkflowOutputEvent):
                output_messages = cast(list[ChatMessage], event.data)
                if output_messages:
                    output = output_messages[-1].text

        if stream_line_open:
            print()

        if output is not None:
            print(f"Workflow completed with result:\n\n{output}")

    except Exception as e:
        print(f"Workflow execution failed: {e}")
        logger.exception("Workflow exception", exc_info=e)

if __name__ == "__main__":
    asyncio.run(main())

Opções de configuração

Parâmetros do Gestor

  • max_round_count: Número máximo de rodadas de colaboração (padrão: 10)
  • max_stall_count: Máximo de rondas sem progresso antes de ativar o tratamento de paragem (padrão: 3)
  • max_reset_count: Número máximo de redefinições de plano permitidas (padrão: 2)

Tipos de Intervenção Humana

  • PLAN_REVIEW: Revisar e aprovar/revisar o plano inicial
  • TOOL_APPROVAL: Aprovar uma chamada de ferramenta/função (usada para esclarecimento para o agente)
  • STALL: O fluxo de trabalho estagnou e precisa de orientação

Decisões de Intervenção Humana

  • APPROVE: Aceitar o plano ou a ferramenta como estão
  • REVISE: Solicitar revisão com feedback (revisão do plano)
  • REJECT: Rejeitar/negar (aprovação da ferramenta)
  • CONTINUE: Continuar com o estado atual (parar)
  • REPLAN: Acionar o replaneamento (interrupção)
  • GUIDANCE: Fornecer texto de orientação (adiamento, aprovação de ferramenta)

Tipos de Eventos

Os eventos são emitidos via AgentRunUpdateEvent com metadados em additional_properties:

  • magentic_event_type: Ou MAGENTIC_EVENT_TYPE_ORCHESTRATOR ou MAGENTIC_EVENT_TYPE_AGENT_DELTA
  • orchestrator_message_kind: Para eventos orquestradores, indica o tipo de mensagem (por exemplo, "instrução", "aviso", "task_ledger")
  • agent_id: Para eventos do agente delta, identifica o agente de streaming

Saída de amostra

Brevemente...

Próximos passos