마젠틱 오케스트레이션은 AutoGen에서 발명한 Magentic-One 시스템을 기반으로 설계되었습니다. 동적 협업이 필요한 복잡한 개방형 작업을 위해 설계된 유연한 범용 다중 에이전트 패턴입니다. 이 패턴에서 전용 Magentic 관리자는 특수 에이전트 팀을 조정하여 진화하는 컨텍스트, 작업 진행률 및 에이전트 기능에 따라 다음에 조치를 취해야 하는 에이전트를 선택합니다.
Magentic 관리자는 공유 컨텍스트를 유지하고, 진행률을 추적하고, 워크플로를 실시간으로 조정합니다. 이를 통해 시스템은 복잡한 문제를 분석하고, 하위 작업을 위임하고, 에이전트 협업을 통해 솔루션을 반복적으로 구체화할 수 있습니다. 오케스트레이션은 솔루션 경로를 미리 알 수 없으며 여러 라운드의 추론, 연구 및 계산이 필요할 수 있는 시나리오에 특히 적합합니다.
학습 내용
- 여러 특수 에이전트를 조정하도록 Magentic 관리자를 설정하는 방법
- 를 사용하여 스트리밍 이벤트를 처리하는 방법
AgentRunUpdateEvent - 휴먼 인 더 루프 계획 검토, 도구 승인 및 중단 개입을 구현하는 방법
- 복잡한 작업을 통해 에이전트 공동 작업 및 진행률을 추적하는 방법
특수 에이전트 정의
곧 출시 예정...
Magentic 오케스트레이션에서는 관리자가 작업 요구 사항에 따라 동적으로 선택할 수 있는 특수 에이전트를 정의합니다.
from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
researcher_agent = ChatAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional computation or quantitative analysis."
),
# This agent requires the gpt-4o-search-preview model to perform web searches
chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)
coder_agent = ChatAgent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
chat_client=OpenAIResponsesClient(),
tools=HostedCodeInterpreterTool(),
)
# Create a manager agent for orchestration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the research and coding workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
chat_client=OpenAIChatClient(),
)
마그네틱 워크플로 구축
표준 관리자를 사용하여 워크플로를 구성하는 데 사용합니다 MagenticBuilder .
from agent_framework import MagenticBuilder
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10, # Maximum collaboration rounds
max_stall_count=3, # Maximum rounds without progress
max_reset_count=2, # Maximum plan resets allowed
)
.build()
)
이벤트 스트리밍을 사용하여 워크플로 실행
복잡한 작업을 실행하고 스트리밍 출력 및 오케스트레이션 업데이트에 대한 이벤트를 처리합니다.
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatMessage,
WorkflowOutputEvent,
)
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
# State for streaming callback
last_stream_agent_id: str | None = None
stream_line_open: bool = False
output: str | None = None
async for event in workflow.run_stream(task):
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
# Manager's planning and coordination messages
kind = props.get("orchestrator_message_kind", "") if props else ""
text = event.data.text if event.data else ""
print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
# Streaming tokens from agents
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if last_stream_agent_id != agent_id or not stream_line_open:
if stream_line_open:
print()
print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
last_stream_agent_id = agent_id
stream_line_open = True
if event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif isinstance(event, WorkflowOutputEvent):
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
if stream_line_open:
print()
if output is not None:
print(f"Workflow completed with result:\n\n{output}")
고급: 휴먼 인 더 루프 계획 검토
실행하기 전에 관리자의 계획에 대한 사용자 검토 및 승인을 사용하도록 설정합니다.
계획 검토 설정
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
MagenticHumanInterventionDecision,
MagenticHumanInterventionKind,
MagenticHumanInterventionReply,
MagenticHumanInterventionRequest,
RequestInfoEvent,
WorkflowOutputEvent,
)
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
.with_plan_review() # Enable plan review
.build()
)
계획 검토 요청 처리
pending_request: RequestInfoEvent | None = None
pending_responses: dict[str, MagenticHumanInterventionReply] | None = None
completed = False
workflow_output: str | None = None
while not completed:
# Use streaming for both initial run and response sending
if pending_responses is not None:
stream = workflow.send_responses_streaming(pending_responses)
else:
stream = workflow.run_stream(task)
async for event in stream:
if isinstance(event, AgentRunUpdateEvent):
# Handle streaming events as shown above
pass
elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
request = cast(MagenticHumanInterventionRequest, event.data)
if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
pending_request = event
if request.plan_text:
print(f"\n=== PLAN REVIEW REQUEST ===\n{request.plan_text}\n")
elif isinstance(event, WorkflowOutputEvent):
workflow_output = str(event.data) if event.data else None
completed = True
pending_responses = None
# Handle pending plan review request
if pending_request is not None:
# Collect human decision (approve/reject/modify)
# For demo, we auto-approve:
reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE)
# Or approve with comments:
# reply = MagenticHumanInterventionReply(
# decision=MagenticHumanInterventionDecision.APPROVE,
# comments="Looks good, but prioritize efficiency metrics."
# )
# Or request revision:
# reply = MagenticHumanInterventionReply(
# decision=MagenticHumanInterventionDecision.REVISE,
# comments="Please include a comparison with newer models like LLaMA."
# )
pending_responses = {pending_request.request_id: reply}
pending_request = None
고급: 도구 승인을 통한 에이전트 명확화
에이전트는 도구 승인을 사용하여 실행하는 동안 사용자에게 명확한 질문을 할 수 있습니다. 이렇게 하면 에이전트가 계속하기 전에 추가 정보를 요청할 수 있는 HITL(Human-in-the-Loop) 상호 작용이 가능합니다.
승인이 필요한 도구 정의
from typing import Annotated
from agent_framework import ai_function
@ai_function(approval_mode="always_require")
def ask_user(question: Annotated[str, "The question to ask the user for clarification"]) -> str:
"""Ask the user a clarifying question to gather missing information.
Use this tool when you need additional information from the user to complete
your task effectively.
"""
# This function body is a placeholder - the actual interaction happens via HITL.
return f"User was asked: {question}"
도구를 사용하여 에이전트 만들기
onboarding_agent = ChatAgent(
name="OnboardingAgent",
description="HR specialist who handles employee onboarding",
instructions=(
"You are an HR Onboarding Specialist. Your job is to onboard new employees.\n\n"
"IMPORTANT: When given an onboarding request, you MUST gather the following "
"information before proceeding:\n"
"1. Department (e.g., Engineering, Sales, Marketing)\n"
"2. Role/Title (e.g., Software Engineer, Account Executive)\n\n"
"Use the ask_user tool to request ANY missing information."
),
chat_client=OpenAIChatClient(model_id="gpt-4o"),
tools=[ask_user],
)
도구 승인 요청 처리
async for event in workflow.run_stream("Onboard Jessica Smith"):
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
req = cast(MagenticHumanInterventionRequest, event.data)
if req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL:
print(f"Agent: {req.agent_id}")
print(f"Question: {req.prompt}")
# Get user's answer
answer = input("> ").strip()
# Send the answer back - it will be fed to the agent as the function result
reply = MagenticHumanInterventionReply(
decision=MagenticHumanInterventionDecision.APPROVE,
response_text=answer,
)
pending_responses = {event.request_id: reply}
# Continue workflow with response
async for ev in workflow.send_responses_streaming(pending_responses):
# Handle continuation events
pass
고급: 정지에 대한 인간의 개입
워크플로에서 에이전트가 진행되지 않는 것을 감지할 때 사용자 개입을 사용하도록 설정합니다.
중단 조치 구성
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, analyst=analyst_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=1, # Stall detection after 1 round without progress
max_reset_count=2,
)
.with_human_input_on_stall() # Request human input when stalled
.build()
)
중단 개입 요청 관리
async for event in workflow.run_stream(task):
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
req = cast(MagenticHumanInterventionRequest, event.data)
if req.kind == MagenticHumanInterventionKind.STALL:
print(f"Workflow stalled after {req.stall_count} rounds")
print(f"Reason: {req.stall_reason}")
if req.plan_text:
print(f"Current plan:\n{req.plan_text}")
# Choose response: CONTINUE, REPLAN, or GUIDANCE
reply = MagenticHumanInterventionReply(
decision=MagenticHumanInterventionDecision.GUIDANCE,
comments="Focus on completing the research step first before moving to analysis.",
)
pending_responses = {event.request_id: reply}
주요 개념
- 동적 조정: Magentic 관리자는 진화하는 컨텍스트에 따라 다음에 작동해야 하는 에이전트를 동적으로 선택합니다.
- 반복적 구체화: 시스템은 복잡한 문제를 분석하고 여러 라운드를 통해 반복적으로 솔루션을 구체화할 수 있습니다.
- 진행률 추적: 중단을 감지하고 필요한 경우 계획을 다시 설정하는 기본 제공 메커니즘
- 유연한 공동 작업: 관리자가 결정한 대로 에이전트를 순서대로 여러 번 호출할 수 있습니다.
- 사용자 감독: 계획 검토, 도구 승인 및 중단 개입을 포함한 선택적 휴먼 인 더 루프 메커니즘
-
통합 이벤트 시스템: 오케스트레이터 및 에이전트 스트리밍 이벤트를 처리하는 데 사용
AgentRunUpdateEventmagentic_event_type
워크플로 실행 흐름
Magentic 오케스트레이션은 다음 실행 패턴을 따릅니다.
- 계획 단계: 관리자는 작업을 분석하고 초기 계획을 만듭니다.
- 선택적 계획 검토: 사용하도록 설정된 경우 사용자는 계획을 검토하고 승인/수정할 수 있습니다.
- 에이전트 선택: 관리자는 각 하위 작업에 가장 적합한 에이전트를 선택합니다.
- 실행: 선택한 에이전트가 작업의 해당 부분을 실행합니다.
- 진행률 평가: 관리자가 진행률을 평가하고 계획을 업데이트합니다.
- 중단 감지: 진행이 중단되면 자동으로 계획을 수정하거나 인간의 개입을 요청합니다.
- 반복: 작업이 완료되거나 제한에 도달할 때까지 3-6단계가 반복됩니다.
- 최종 합성: 관리자는 모든 에이전트 출력을 최종 결과로 합성합니다.
완성된 예시
다음은 모든 개념을 결합하는 전체 예제입니다.
import asyncio
import logging
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
HostedCodeInterpreterTool,
MagenticBuilder,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
async def main() -> None:
# Define specialized agents
researcher_agent = ChatAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional "
"computation or quantitative analysis."
),
chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)
coder_agent = ChatAgent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
chat_client=OpenAIResponsesClient(),
tools=HostedCodeInterpreterTool(),
)
# Create a manager agent for orchestration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the research and coding workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
chat_client=OpenAIChatClient(),
)
# State for streaming output
last_stream_agent_id: str | None = None
stream_line_open: bool = False
# Build the workflow
print("\nBuilding Magentic Workflow...")
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
# Define the task
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
print(f"\nTask: {task}")
print("\nStarting workflow execution...")
# Run the workflow
try:
output: str | None = None
async for event in workflow.run_stream(task):
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
kind = props.get("orchestrator_message_kind", "") if props else ""
text = event.data.text if event.data else ""
print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if last_stream_agent_id != agent_id or not stream_line_open:
if stream_line_open:
print()
print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
last_stream_agent_id = agent_id
stream_line_open = True
if event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif isinstance(event, WorkflowOutputEvent):
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
if stream_line_open:
print()
if output is not None:
print(f"Workflow completed with result:\n\n{output}")
except Exception as e:
print(f"Workflow execution failed: {e}")
logger.exception("Workflow exception", exc_info=e)
if __name__ == "__main__":
asyncio.run(main())
구성 옵션
관리자 매개 변수
-
max_round_count: 최대 공동 작업 라운드 수(기본값: 10) -
max_stall_count: 중단 처리를 트리거하기 전에 진행률 없이 최대 라운드(기본값: 3) -
max_reset_count: 허용되는 최대 계획 재설정 수(기본값: 2)
인간 개입 종류
-
PLAN_REVIEW: 초기 계획 검토 및 승인/수정 -
TOOL_APPROVAL: 도구/함수 호출 승인(에이전트 설명에 사용) -
STALL: 워크플로가 중단되었으며 지침이 필요합니다.
인간 개입 결정
-
APPROVE: 계획이나 도구 호출을 있는 그대로 수락 -
REVISE: 피드백을 사용하여 수정 요청(계획 검토) -
REJECT: 거부/거부(도구 승인) -
CONTINUE: 현재 상태로 계속(중단) -
REPLAN: 재계획 트리거 (정지) -
GUIDANCE: 지침 텍스트 제공(중단, 도구 승인)
이벤트 유형
이벤트는 다음의 메타데이터를 통해 AgentRunUpdateEvent 내보내집니다.additional_properties
-
magentic_event_type: 다음 중 하나MAGENTIC_EVENT_TYPE_ORCHESTRATOR또는MAGENTIC_EVENT_TYPE_AGENT_DELTA -
orchestrator_message_kind: 오케스트레이터 이벤트의 경우 메시지 유형(예: "명령", "알림", "task_ledger")을 나타냅니다. -
agent_id: 에이전트 델타 이벤트의 경우 스트리밍 에이전트를 식별합니다.
샘플 출력
곧 출시 예정...