Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Orkiestracja magentyczna została zaprojektowana na podstawie systemu Magentic-One opracowanego przez AutoGen. Jest to elastyczny wzorzec wieloaplikowy ogólnego przeznaczenia przeznaczony dla złożonych, otwartych zadań wymagających dynamicznej współpracy. W tym wzorcu dedykowany menedżer magentyczny koordynuje zespół wyspecjalizowanych agentów, wybierając agenta, który powinien działać dalej na podstawie zmieniającego się kontekstu, postępu zadań i możliwości agenta.
Menedżer Magentic utrzymuje wspólny kontekst, śledzi postęp i dostosowuje przepływ pracy w czasie rzeczywistym. Dzięki temu system może rozdzielić złożone problemy, delegować podzadania i iteracyjnie uściślić rozwiązania za pośrednictwem współpracy agentów. Orkiestracja jest szczególnie odpowiednia dla scenariuszy, w których ścieżka rozwiązania nie jest znana z wyprzedzeniem i może wymagać wielu rund rozumowania, badań i obliczeń.
Czego nauczysz się
- Jak skonfigurować menedżera magentycznego w celu koordynowania wielu wyspecjalizowanych agentów
- Jak obsługiwać zdarzenia przesyłania strumieniowego za pomocą polecenia
AgentRunUpdateEvent - Jak wdrożyć przegląd planów z udziałem człowieka, zatwierdzanie narzędzi i interwencję zapobiegającą zatrzymaniu procesu
- Jak śledzić współpracę i postęp agenta za pośrednictwem złożonych zadań
Definiowanie wyspecjalizowanych agentów
Wkrótce...
W orkiestracji magentycznej definiujesz wyspecjalizowanych agentów, których menedżer może dynamicznie wybierać na podstawie wymagań dotyczących zadań:
from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
researcher_agent = ChatAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional computation or quantitative analysis."
),
# This agent requires the gpt-4o-search-preview model to perform web searches
chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)
coder_agent = ChatAgent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
chat_client=OpenAIResponsesClient(),
tools=HostedCodeInterpreterTool(),
)
# Create a manager agent for orchestration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the research and coding workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
chat_client=OpenAIChatClient(),
)
Tworzenie przepływu pracy magentycznego
Użyj MagenticBuilder polecenia , aby skonfigurować przepływ pracy za pomocą menedżera standardowego:
from agent_framework import MagenticBuilder
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10, # Maximum collaboration rounds
max_stall_count=3, # Maximum rounds without progress
max_reset_count=2, # Maximum plan resets allowed
)
.build()
)
Uruchamianie przepływu pracy za pomocą przesyłania strumieniowego zdarzeń
Wykonaj złożone zadanie i obsłuż zdarzenia dla strumieniowego przesyłania danych wyjściowych i aktualizacji orkiestracji.
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatMessage,
WorkflowOutputEvent,
)
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
# State for streaming callback
last_stream_agent_id: str | None = None
stream_line_open: bool = False
output: str | None = None
async for event in workflow.run_stream(task):
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
# Manager's planning and coordination messages
kind = props.get("orchestrator_message_kind", "") if props else ""
text = event.data.text if event.data else ""
print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
# Streaming tokens from agents
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if last_stream_agent_id != agent_id or not stream_line_open:
if stream_line_open:
print()
print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
last_stream_agent_id = agent_id
stream_line_open = True
if event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif isinstance(event, WorkflowOutputEvent):
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
if stream_line_open:
print()
if output is not None:
print(f"Workflow completed with result:\n\n{output}")
Zaawansowane: Przegląd planu z udziałem człowieka
Włącz możliwość przeglądu i zatwierdzenia planu przez menedżera przed jego realizacją.
Konfigurowanie przeglądu planu
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
MagenticHumanInterventionDecision,
MagenticHumanInterventionKind,
MagenticHumanInterventionReply,
MagenticHumanInterventionRequest,
RequestInfoEvent,
WorkflowOutputEvent,
)
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
.with_plan_review() # Enable plan review
.build()
)
Obsługa żądań przeglądu planu
pending_request: RequestInfoEvent | None = None
pending_responses: dict[str, MagenticHumanInterventionReply] | None = None
completed = False
workflow_output: str | None = None
while not completed:
# Use streaming for both initial run and response sending
if pending_responses is not None:
stream = workflow.send_responses_streaming(pending_responses)
else:
stream = workflow.run_stream(task)
async for event in stream:
if isinstance(event, AgentRunUpdateEvent):
# Handle streaming events as shown above
pass
elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
request = cast(MagenticHumanInterventionRequest, event.data)
if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
pending_request = event
if request.plan_text:
print(f"\n=== PLAN REVIEW REQUEST ===\n{request.plan_text}\n")
elif isinstance(event, WorkflowOutputEvent):
workflow_output = str(event.data) if event.data else None
completed = True
pending_responses = None
# Handle pending plan review request
if pending_request is not None:
# Collect human decision (approve/reject/modify)
# For demo, we auto-approve:
reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE)
# Or approve with comments:
# reply = MagenticHumanInterventionReply(
# decision=MagenticHumanInterventionDecision.APPROVE,
# comments="Looks good, but prioritize efficiency metrics."
# )
# Or request revision:
# reply = MagenticHumanInterventionReply(
# decision=MagenticHumanInterventionDecision.REVISE,
# comments="Please include a comparison with newer models like LLaMA."
# )
pending_responses = {pending_request.request_id: reply}
pending_request = None
Zaawansowane: Wyjaśnienie agenta przez zatwierdzenie narzędzia
Agenci mogą zadawać użytkownikom pytania wyjaśniające w trakcie realizacji z użyciem zatwierdzania narzędzi. Umożliwia to interakcję Human-in-the-Loop (HITL), w której agent może zażądać dodatkowych informacji przed kontynuowaniem.
Definiowanie narzędzia z wymaganym zatwierdzeniem
from typing import Annotated
from agent_framework import ai_function
@ai_function(approval_mode="always_require")
def ask_user(question: Annotated[str, "The question to ask the user for clarification"]) -> str:
"""Ask the user a clarifying question to gather missing information.
Use this tool when you need additional information from the user to complete
your task effectively.
"""
# This function body is a placeholder - the actual interaction happens via HITL.
return f"User was asked: {question}"
Tworzenie agenta za pomocą narzędzia
onboarding_agent = ChatAgent(
name="OnboardingAgent",
description="HR specialist who handles employee onboarding",
instructions=(
"You are an HR Onboarding Specialist. Your job is to onboard new employees.\n\n"
"IMPORTANT: When given an onboarding request, you MUST gather the following "
"information before proceeding:\n"
"1. Department (e.g., Engineering, Sales, Marketing)\n"
"2. Role/Title (e.g., Software Engineer, Account Executive)\n\n"
"Use the ask_user tool to request ANY missing information."
),
chat_client=OpenAIChatClient(model_id="gpt-4o"),
tools=[ask_user],
)
Obsługa żądań zatwierdzania narzędzi
async for event in workflow.run_stream("Onboard Jessica Smith"):
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
req = cast(MagenticHumanInterventionRequest, event.data)
if req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL:
print(f"Agent: {req.agent_id}")
print(f"Question: {req.prompt}")
# Get user's answer
answer = input("> ").strip()
# Send the answer back - it will be fed to the agent as the function result
reply = MagenticHumanInterventionReply(
decision=MagenticHumanInterventionDecision.APPROVE,
response_text=answer,
)
pending_responses = {event.request_id: reply}
# Continue workflow with response
async for ev in workflow.send_responses_streaming(pending_responses):
# Handle continuation events
pass
Zaawansowane: Interwencja człowieka na straganie
Włącz interwencję człowieka, gdy przepływ pracy wykryje, że agenci nie robią postępów:
Konfigurowanie interwencji przeciwdziałającej przeciągnięciu
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, analyst=analyst_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=1, # Stall detection after 1 round without progress
max_reset_count=2,
)
.with_human_input_on_stall() # Request human input when stalled
.build()
)
Obsługa żądań interwencji w przypadku zatrzymania
async for event in workflow.run_stream(task):
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
req = cast(MagenticHumanInterventionRequest, event.data)
if req.kind == MagenticHumanInterventionKind.STALL:
print(f"Workflow stalled after {req.stall_count} rounds")
print(f"Reason: {req.stall_reason}")
if req.plan_text:
print(f"Current plan:\n{req.plan_text}")
# Choose response: CONTINUE, REPLAN, or GUIDANCE
reply = MagenticHumanInterventionReply(
decision=MagenticHumanInterventionDecision.GUIDANCE,
comments="Focus on completing the research step first before moving to analysis.",
)
pending_responses = {event.request_id: reply}
Kluczowe pojęcia
- Dynamiczna koordynacja: Menedżer magentyczny dynamicznie wybiera agenta, który powinien działać dalej w oparciu o zmieniający się kontekst
- Uściślenie iteracyjne: system może podzielić złożone problemy i iteracyjnie uściślić rozwiązania za pomocą wielu rund
- Śledzenie postępu: wbudowane mechanizmy wykrywania zatrzymań i resetowania planu w razie potrzeby
- Elastyczna współpraca: agenci mogą być wywoływani wiele razy w dowolnej kolejności określonej przez menedżera
- Nadzór ludzki: opcjonalne mechanizmy uwzględniające człowieka, w tym przegląd planu, zatwierdzanie narzędzi i interwencja w przypadku zatrzymania.
-
Ujednolicony system zdarzeń: służy do
AgentRunUpdateEventmagentic_event_typeobsługi zdarzeń orkiestratora i agenta przesyłania strumieniowego
Przepływ wykonywania zadań w przepływie pracy
Orkiestracja Magentic jest zgodna z tym wzorcem wykonania:
- Faza planowania: Menedżer analizuje zadanie i tworzy plan początkowy
- Przegląd planu opcjonalnego: w przypadku włączenia użytkownicy mogą przeglądać i zatwierdzać/modyfikować plan
- Wybór agenta: Menedżer wybiera najbardziej odpowiedniego agenta dla każdej podtaki
- Wykonanie: wybrany agent wykonuje część zadania
- Ocena postępu: menedżer ocenia postęp i aktualizuje plan
- Wykrywanie zatrzymania: jeśli postęp zostanie zatrzymany, automatycznie przeplanować lub zażądać interwencji człowieka
- Iteracja: kroki 3–6 powtarzają się do momentu zakończenia zadania lub osiągnięcia limitów
- Końcowa synteza: Menedżer syntetyzuje wszystkie dane wyjściowe agenta do końcowego wyniku
Kompletny przykład
Oto pełny przykład, który łączy wszystkie koncepcje:
import asyncio
import logging
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
HostedCodeInterpreterTool,
MagenticBuilder,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
async def main() -> None:
# Define specialized agents
researcher_agent = ChatAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional "
"computation or quantitative analysis."
),
chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"),
)
coder_agent = ChatAgent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
chat_client=OpenAIResponsesClient(),
tools=HostedCodeInterpreterTool(),
)
# Create a manager agent for orchestration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the research and coding workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
chat_client=OpenAIChatClient(),
)
# State for streaming output
last_stream_agent_id: str | None = None
stream_line_open: bool = False
# Build the workflow
print("\nBuilding Magentic Workflow...")
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.with_standard_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
# Define the task
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
print(f"\nTask: {task}")
print("\nStarting workflow execution...")
# Run the workflow
try:
output: str | None = None
async for event in workflow.run_stream(task):
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
kind = props.get("orchestrator_message_kind", "") if props else ""
text = event.data.text if event.data else ""
print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if last_stream_agent_id != agent_id or not stream_line_open:
if stream_line_open:
print()
print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
last_stream_agent_id = agent_id
stream_line_open = True
if event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif isinstance(event, WorkflowOutputEvent):
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
if stream_line_open:
print()
if output is not None:
print(f"Workflow completed with result:\n\n{output}")
except Exception as e:
print(f"Workflow execution failed: {e}")
logger.exception("Workflow exception", exc_info=e)
if __name__ == "__main__":
asyncio.run(main())
Opcje konfiguracji
Parametry menedżera
-
max_round_count: Maksymalna liczba rund współpracy (wartość domyślna: 10) -
max_stall_count: Maksymalna liczba rund bez postępu przed uruchomieniem obsługiwaniem zastoju (ustawienie domyślne: 3) -
max_reset_count: Dozwolona maksymalna liczba zresetowań planu (wartość domyślna: 2)
Rodzaje interwencji człowieka
-
PLAN_REVIEW: Przeglądanie i zatwierdzanie/poprawianie początkowego planu -
TOOL_APPROVAL: Zatwierdź wywołanie narzędzia/funkcji (dla wyjaśnienia agenta) -
STALL: Przepływ pracy został zatrzymany i wymaga wskazówek
Decyzje dotyczące interwencji człowieka
-
APPROVE: Zaakceptuj wywołanie planu lub narzędzia bez zmian -
REVISE: Żądanie poprawki z opiniami (przegląd planu) -
REJECT: Odrzuć/odmów (zatwierdzenie narzędzia) -
CONTINUE: Kontynuuj z bieżącym stanem (zawieszenie) -
REPLAN: Wyzwalanie ponownego planowania (zatrzymanie) -
GUIDANCE: Podaj tekst wskazówek (zatrzymanie, zatwierdzanie narzędzi)
Typy zdarzeń
Zdarzenia są emitowane przez AgentRunUpdateEvent z metadanymi w additional_properties.
-
magentic_event_typeMAGENTIC_EVENT_TYPE_ORCHESTRATOR: albo lubMAGENTIC_EVENT_TYPE_AGENT_DELTA -
orchestrator_message_kind: W przypadku zdarzeń orkiestratora wskazuje typ komunikatu (np. "instrukcja", "powiadomienie", "task_ledger") -
agent_id: w przypadku zdarzeń różnicowych agenta identyfikuje agenta przesyłania strumieniowego
Przykładowe dane wyjściowe
Wkrótce...