Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
A orquestração magêntica foi projetada com base no sistema Magentic-One inventado pela AutoGen. É um padrão flexível de vários agentes de uso geral projetado para tarefas complexas e abertas que exigem colaboração dinâmica. Nesse padrão, um gerenciador magêntico dedicado coordena uma equipe de agentes especializados, selecionando qual agente deve agir em seguida com base no contexto em evolução, no progresso da tarefa e nas funcionalidades do agente.
O gerenciador do Magentic mantém um contexto compartilhado, acompanha o progresso e adapta o fluxo de trabalho em tempo real. Isso permite que o sistema divida problemas complexos, delegar subtarefas e refinar iterativamente soluções por meio da colaboração do agente. A orquestração é especialmente adequada para cenários em que o caminho da solução não é conhecido com antecedência e pode exigir várias rodadas de raciocínio, pesquisa e computação.
O que você vai aprender
- Como configurar um gerente do Magentic para coordenar vários agentes especializados
- Como lidar com eventos de streaming com
AgentRunUpdateEvent - Como implementar a revisão do plano com interação humana, a aprovação da ferramenta e a intervenção de paralisação
- Como acompanhar a colaboração e o progresso do agente por meio de tarefas complexas
Definir seus agentes especializados
Em breve...
Na orquestração magêntica, você define agentes especializados que o gerente pode selecionar dinamicamente com base nos requisitos da tarefa:
from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
researcher_agent = ChatAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional computation or quantitative analysis."
),
# This agent requires the gpt-4o-search-preview model to perform web searches
chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)
coder_agent = ChatAgent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
chat_client=OpenAIResponsesClient(),
tools=HostedCodeInterpreterTool(),
)
# Create a manager agent for orchestration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the research and coding workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
chat_client=OpenAIChatClient(),
)
Criar o fluxo de trabalho magêntico
Use MagenticBuilder para configurar o fluxo de trabalho com um gerenciador padrão:
from agent_framework import MagenticBuilder
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10, # Maximum collaboration rounds
max_stall_count=3, # Maximum rounds without progress
max_reset_count=2, # Maximum plan resets allowed
)
.build()
)
Executar o fluxo de trabalho com streaming de eventos
Execute uma tarefa complexa e gerencie eventos para saídas em streaming e atualizações de orquestração.
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatMessage,
WorkflowOutputEvent,
)
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
# State for streaming callback
last_stream_agent_id: str | None = None
stream_line_open: bool = False
output: str | None = None
async for event in workflow.run_stream(task):
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
# Manager's planning and coordination messages
kind = props.get("orchestrator_message_kind", "") if props else ""
text = event.data.text if event.data else ""
print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
# Streaming tokens from agents
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if last_stream_agent_id != agent_id or not stream_line_open:
if stream_line_open:
print()
print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
last_stream_agent_id = agent_id
stream_line_open = True
if event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif isinstance(event, WorkflowOutputEvent):
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
if stream_line_open:
print()
if output is not None:
print(f"Workflow completed with result:\n\n{output}")
Avançado: Revisão de Plano Humano-no-Processo
Habilite a revisão humana e a aprovação do plano do gerente antes da execução:
Configurar revisão de plano
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
MagenticHumanInterventionDecision,
MagenticHumanInterventionKind,
MagenticHumanInterventionReply,
MagenticHumanInterventionRequest,
RequestInfoEvent,
WorkflowOutputEvent,
)
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
.with_plan_review() # Enable plan review
.build()
)
Gerenciar solicitações de revisão de plano
pending_request: RequestInfoEvent | None = None
pending_responses: dict[str, MagenticHumanInterventionReply] | None = None
completed = False
workflow_output: str | None = None
while not completed:
# Use streaming for both initial run and response sending
if pending_responses is not None:
stream = workflow.send_responses_streaming(pending_responses)
else:
stream = workflow.run_stream(task)
async for event in stream:
if isinstance(event, AgentRunUpdateEvent):
# Handle streaming events as shown above
pass
elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
request = cast(MagenticHumanInterventionRequest, event.data)
if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
pending_request = event
if request.plan_text:
print(f"\n=== PLAN REVIEW REQUEST ===\n{request.plan_text}\n")
elif isinstance(event, WorkflowOutputEvent):
workflow_output = str(event.data) if event.data else None
completed = True
pending_responses = None
# Handle pending plan review request
if pending_request is not None:
# Collect human decision (approve/reject/modify)
# For demo, we auto-approve:
reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE)
# Or approve with comments:
# reply = MagenticHumanInterventionReply(
# decision=MagenticHumanInterventionDecision.APPROVE,
# comments="Looks good, but prioritize efficiency metrics."
# )
# Or request revision:
# reply = MagenticHumanInterventionReply(
# decision=MagenticHumanInterventionDecision.REVISE,
# comments="Please include a comparison with newer models like LLaMA."
# )
pending_responses = {pending_request.request_id: reply}
pending_request = None
Avançado: Esclarecimento do agente por meio da aprovação da ferramenta
Os agentes podem fazer perguntas esclarecedoras aos usuários durante a execução usando a aprovação da ferramenta. Isso permite interações HITL (Human-in-the-Loop), em que o agente pode solicitar informações adicionais antes de continuar.
Definir uma ferramenta com aprovação necessária
from typing import Annotated
from agent_framework import ai_function
@ai_function(approval_mode="always_require")
def ask_user(question: Annotated[str, "The question to ask the user for clarification"]) -> str:
"""Ask the user a clarifying question to gather missing information.
Use this tool when you need additional information from the user to complete
your task effectively.
"""
# This function body is a placeholder - the actual interaction happens via HITL.
return f"User was asked: {question}"
Criar um agente com a ferramenta
onboarding_agent = ChatAgent(
name="OnboardingAgent",
description="HR specialist who handles employee onboarding",
instructions=(
"You are an HR Onboarding Specialist. Your job is to onboard new employees.\n\n"
"IMPORTANT: When given an onboarding request, you MUST gather the following "
"information before proceeding:\n"
"1. Department (e.g., Engineering, Sales, Marketing)\n"
"2. Role/Title (e.g., Software Engineer, Account Executive)\n\n"
"Use the ask_user tool to request ANY missing information."
),
chat_client=OpenAIChatClient(model_id="gpt-4o"),
tools=[ask_user],
)
Gerenciar solicitações de aprovação de ferramentas
async for event in workflow.run_stream("Onboard Jessica Smith"):
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
req = cast(MagenticHumanInterventionRequest, event.data)
if req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL:
print(f"Agent: {req.agent_id}")
print(f"Question: {req.prompt}")
# Get user's answer
answer = input("> ").strip()
# Send the answer back - it will be fed to the agent as the function result
reply = MagenticHumanInterventionReply(
decision=MagenticHumanInterventionDecision.APPROVE,
response_text=answer,
)
pending_responses = {event.request_id: reply}
# Continue workflow with response
async for ev in workflow.send_responses_streaming(pending_responses):
# Handle continuation events
pass
Avançado: Intervenção humana em stall
Habilite a intervenção humana quando o fluxo de trabalho detectar que os agentes não estão progredindo:
Configurar a intervenção de parada
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, analyst=analyst_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=1, # Stall detection after 1 round without progress
max_reset_count=2,
)
.with_human_input_on_stall() # Request human input when stalled
.build()
)
Manipular solicitações de intervenção de parada
async for event in workflow.run_stream(task):
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
req = cast(MagenticHumanInterventionRequest, event.data)
if req.kind == MagenticHumanInterventionKind.STALL:
print(f"Workflow stalled after {req.stall_count} rounds")
print(f"Reason: {req.stall_reason}")
if req.plan_text:
print(f"Current plan:\n{req.plan_text}")
# Choose response: CONTINUE, REPLAN, or GUIDANCE
reply = MagenticHumanInterventionReply(
decision=MagenticHumanInterventionDecision.GUIDANCE,
comments="Focus on completing the research step first before moving to analysis.",
)
pending_responses = {event.request_id: reply}
Conceitos Principais
- Coordenação Dinâmica: o gerente do Magentic seleciona dinamicamente qual agente deve agir em seguida com base no contexto em evolução
- Refinamento iterativo: o sistema pode dividir problemas complexos e refinar soluções iterativamente por meio de várias rodadas
- Acompanhamento de Progresso: mecanismos internos para detectar paralisações e redefinir o plano, se necessário
- Colaboração flexível: os agentes podem ser chamados várias vezes em qualquer ordem, conforme determinado pelo gerente
- Supervisão Humana: mecanismos opcionais de participação humana, incluindo revisão de plano, aprovação de ferramentas e intervenção em casos de paralisação
-
Sistema de Eventos Unificado: use
AgentRunUpdateEventpara lidar commagentic_event_typeeventos de streaming de orquestrador e agente
Fluxo de Execução do Processo de Trabalho
A orquestração magêntica segue este padrão de execução:
- Fase de planejamento: o gerente analisa a tarefa e cria um plano inicial
- Revisão de plano opcional: se habilitado, os humanos podem examinar e aprovar/modificar o plano
- Seleção de Agente: o gerente seleciona o agente mais apropriado para cada subtarefa
- Execução: o agente selecionado executa sua parte da tarefa
- Avaliação de Progresso: o gerente avalia o progresso e atualiza o plano
- Detecção de paralisação: se o progresso parar, replaneje automaticamente ou solicite intervenção humana
- Iteração: as etapas 3 a 6 se repetem até que a tarefa seja concluída ou os limites sejam atingidos
- Síntese Final: o gerenciador sintetiza todas as saídas do agente em um resultado final
Exemplo completo
Aqui está um exemplo completo que reúne todos os conceitos:
import asyncio
import logging
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
HostedCodeInterpreterTool,
MagenticBuilder,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
async def main() -> None:
# Define specialized agents
researcher_agent = ChatAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional "
"computation or quantitative analysis."
),
chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)
coder_agent = ChatAgent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
chat_client=OpenAIResponsesClient(),
tools=HostedCodeInterpreterTool(),
)
# Create a manager agent for orchestration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the research and coding workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
chat_client=OpenAIChatClient(),
)
# State for streaming output
last_stream_agent_id: str | None = None
stream_line_open: bool = False
# Build the workflow
print("\nBuilding Magentic Workflow...")
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
# Define the task
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
print(f"\nTask: {task}")
print("\nStarting workflow execution...")
# Run the workflow
try:
output: str | None = None
async for event in workflow.run_stream(task):
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
kind = props.get("orchestrator_message_kind", "") if props else ""
text = event.data.text if event.data else ""
print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if last_stream_agent_id != agent_id or not stream_line_open:
if stream_line_open:
print()
print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
last_stream_agent_id = agent_id
stream_line_open = True
if event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif isinstance(event, WorkflowOutputEvent):
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
if stream_line_open:
print()
if output is not None:
print(f"Workflow completed with result:\n\n{output}")
except Exception as e:
print(f"Workflow execution failed: {e}")
logger.exception("Workflow exception", exc_info=e)
if __name__ == "__main__":
asyncio.run(main())
Opções de configuração
Parâmetros do Gerenciador
-
max_round_count: número máximo de rodadas de colaboração (padrão: 10) -
max_stall_count: Rodadas máximas sem progresso antes de iniciar o tratamento de estagnação (padrão: 3) -
max_reset_count: número máximo de redefinições de plano permitidas (padrão: 2)
Tipos de intervenção humana
-
PLAN_REVIEW: examine e aprove/revise o plano inicial -
TOOL_APPROVAL: aprovar uma chamada de ferramenta/função (usada para esclarecimento do agente) -
STALL: o fluxo de trabalho estagnou e precisa de orientação
Decisões de intervenção humana
-
APPROVE: aceite o plano ou a chamada de ferramenta como está -
REVISE: solicitar revisão com comentários (revisão de plano) -
REJECT: rejeição/negação (aprovação da ferramenta) -
CONTINUE: continue com o estado atual (parada) -
REPLAN: replanejamento do gatilho (pausa) -
GUIDANCE: Fornecer texto de orientação (parada, aprovação da ferramenta)
Tipos de evento
Os eventos são emitidos via AgentRunUpdateEvent com metadados em additional_properties:
-
magentic_event_type: ouMAGENTIC_EVENT_TYPE_ORCHESTRATORMAGENTIC_EVENT_TYPE_AGENT_DELTA -
orchestrator_message_kind: para eventos de orquestrador, indica o tipo de mensagem (por exemplo, "instrução", "aviso", "task_ledger") -
agent_id: Para eventos do tipo delta do agente, identifica o agente de streaming
Saída de exemplo
Em breve...