Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ten dokument zawiera omówienie sposobu używania przepływów pracy jako agentów w programie Microsoft Agent Framework.
Przegląd
Czasami utworzono zaawansowany przepływ pracy z wieloma agentami, niestandardowymi funkcjami wykonawczych i złożoną logiką — ale chcesz używać go tak samo jak każdy inny agent. Dokładnie to, co umożliwiają agenci przepływu pracy. Zawijając przepływ pracy jako Agentelement, możesz wchodzić z nim w interakcje za pomocą tego samego znanego interfejsu API, którego używasz dla prostego agenta czatu.
Najważniejsze korzyści
- Ujednolicony interfejs: interakcja ze złożonymi przepływami pracy przy użyciu tego samego interfejsu API co prostych agentów
- Zgodność interfejsu API: integrowanie przepływów pracy z istniejącymi systemami obsługującymi interfejs agenta
- Komponowalność: używanie agentów przepływu pracy jako bloków konstrukcyjnych w większych systemach agentów lub innych przepływach pracy
- Zarządzanie wątkami: wykorzystanie wątków agenta do obsługi stanu konwersacji, punktów kontrolnych i wznowienia
- Obsługa przesyłania strumieniowego: Otrzymuj aktualizacje w czasie rzeczywistym podczas realizacji przepływu pracy
Jak to działa
Gdy konwertujesz przepływ pracy na agenta:
- Przepływ pracy jest weryfikowany, aby upewnić się, że jego komponent wykonawczy startu może akceptować komunikaty czatu.
- Wątek jest tworzony w celu zarządzania stanem konwersacji i punktami kontrolnymi
- Komunikaty wejściowe są kierowane do wykonawcy początkowego przepływu pracy
- Zdarzenia przepływu pracy są konwertowane na aktualizacje odpowiedzi agenta
- Żądania zewnętrznych danych wejściowych (z
RequestInfoExecutor) są udostępniane jako wywołania funkcji
Requirements
Aby użyć przepływu pracy jako agenta, funkcja wykonawcza uruchamiania przepływu pracy musi być w stanie obsłużyć IEnumerable<ChatMessage> jako dane wejściowe. Jest to automatycznie spełnione w przypadku używania ChatClientAgent lub innych funkcji wykonawczych opartych na agencie.
Tworzenie agenta przepływu pracy
AsAgent() Użyj metody rozszerzenia, aby przekonwertować dowolny zgodny przepływ pracy na agenta:
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
// First, build your workflow
var workflow = AgentWorkflowBuilder
.CreateSequentialPipeline(researchAgent, writerAgent, reviewerAgent)
.Build();
// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAgent(
id: "content-pipeline",
name: "Content Pipeline Agent",
description: "A multi-agent workflow that researches, writes, and reviews content"
);
Parametry AsAgent
| Parameter | Typ | Description |
|---|---|---|
id |
string? |
Opcjonalny unikatowy identyfikator agenta. Generowane automatycznie, jeśli nie podano. |
name |
string? |
Opcjonalna nazwa wyświetlana dla agenta. |
description |
string? |
Opcjonalny opis przeznaczenia agenta. |
checkpointManager |
CheckpointManager? |
Opcjonalny menedżer punktów kontrolnych na potrzeby trwałości między sesjami. |
executionEnvironment |
IWorkflowExecutionEnvironment? |
Opcjonalne środowisko wykonywania. Domyślnie ustawiane jako InProcessExecution.OffThread lub InProcessExecution.Concurrent, w zależności od konfiguracji przepływu pracy. |
Korzystanie z agentów przepływu pracy
Tworzenie wątku
Każda konwersacja z agentem przepływu pracy wymaga wątku do zarządzania stanem:
// Create a new thread for the conversation
AgentThread thread = workflowAgent.GetNewThread();
Wykonywanie bez przesyłania strumieniowego
W przypadku prostych przypadków użycia, w których chcesz uzyskać pełną odpowiedź:
var messages = new List<ChatMessage>
{
new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};
AgentRunResponse response = await workflowAgent.RunAsync(messages, thread);
foreach (ChatMessage message in response.Messages)
{
Console.WriteLine($"{message.AuthorName}: {message.Text}");
}
Wykonywanie przesyłania strumieniowego
Aby uzyskać aktualizacje w czasie rzeczywistym podczas wykonywania przepływu pracy:
var messages = new List<ChatMessage>
{
new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};
await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
// Process streaming updates from each agent in the workflow
if (!string.IsNullOrEmpty(update.Text))
{
Console.Write(update.Text);
}
}
Obsługa żądań zewnętrznych danych wejściowych
Gdy przepływ pracy zawiera funkcje wykonawcze, które żądają zewnętrznych danych wejściowych (przy użyciu RequestInfoExecutor), te żądania są udostępniane jako wywołania funkcji w odpowiedzi agenta:
await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
// Check for function call requests
foreach (AIContent content in update.Contents)
{
if (content is FunctionCallContent functionCall)
{
// Handle the external input request
Console.WriteLine($"Workflow requests input: {functionCall.Name}");
Console.WriteLine($"Request data: {functionCall.Arguments}");
// Provide the response in the next message
}
}
}
Serializacja wątków i wznowienie
Wątki agenta przepływu pracy można serializować w celu przechowywania i wznowienia później.
// Serialize the thread state
JsonElement serializedThread = thread.Serialize();
// Store serializedThread to your persistence layer...
// Later, resume the thread
AgentThread resumedThread = workflowAgent.DeserializeThread(serializedThread);
// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedThread))
{
Console.Write(update.Text);
}
Tworzenie punktów kontrolnych z agentami przepływu pracy
Włącz tworzenie punktów kontrolnych, aby stan przepływu pracy był zachowany po ponownym uruchomieniu procesu.
// Create a checkpoint manager with your storage backend
var checkpointManager = new CheckpointManager(new FileCheckpointStorage("./checkpoints"));
// Create workflow agent with checkpointing enabled
AIAgent workflowAgent = workflow.AsAgent(
id: "persistent-workflow",
name: "Persistent Workflow Agent",
checkpointManager: checkpointManager
);
Requirements
Aby użyć przepływu pracy jako agenta, funkcja wykonawcza uruchamiania przepływu pracy musi być w stanie obsłużyć list[ChatMessage] jako dane wejściowe. To jest automatycznie spełniane podczas korzystania z ChatAgent lub AgentExecutor.
Tworzenie agenta przepływu pracy
Wywołaj as_agent() dowolny zgodny przepływ pracy, aby przekonwertować go na agenta:
from agent_framework import WorkflowBuilder, ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
# Create your chat client and agents
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
researcher = ChatAgent(
name="Researcher",
instructions="Research and gather information on the given topic.",
chat_client=chat_client,
)
writer = ChatAgent(
name="Writer",
instructions="Write clear, engaging content based on research.",
chat_client=chat_client,
)
# Build your workflow
workflow = (
WorkflowBuilder()
.set_start_executor(researcher)
.add_edge(researcher, writer)
.build()
)
# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")
parametry as_agent
| Parameter | Typ | Description |
|---|---|---|
name |
str | None |
Opcjonalna wyświetlana nazwa agenta. Generowane automatycznie, jeśli nie podano. |
Korzystanie z agentów przepływu pracy
Tworzenie wątku
Każda konwersacja z agentem przepływu pracy wymaga wątku do zarządzania stanem:
# Create a new thread for the conversation
thread = workflow_agent.get_new_thread()
Wykonywanie bez przesyłania strumieniowego
W przypadku prostych przypadków użycia, w których chcesz uzyskać pełną odpowiedź:
from agent_framework import ChatMessage, Role
messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]
response = await workflow_agent.run(messages, thread=thread)
for message in response.messages:
print(f"{message.author_name}: {message.text}")
Wykonywanie przesyłania strumieniowego
Aby uzyskać aktualizacje w czasie rzeczywistym podczas wykonywania przepływu pracy:
messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]
async for update in workflow_agent.run_stream(messages, thread=thread):
# Process streaming updates from each agent in the workflow
if update.text:
print(update.text, end="", flush=True)
Obsługa żądań zewnętrznych danych wejściowych
Gdy przepływ pracy zawiera funkcje wykonawcze, które żądają zewnętrznych danych wejściowych (przy użyciu RequestInfoExecutor), te żądania są udostępniane jako wywołania funkcji. Agent przepływu pracy monitoruje oczekujące żądania i wymaga odpowiedzi przed kontynuowaniem.
from agent_framework import (
FunctionCallContent,
FunctionApprovalRequestContent,
FunctionApprovalResponseContent,
)
async for update in workflow_agent.run_stream(messages, thread=thread):
for content in update.contents:
if isinstance(content, FunctionApprovalRequestContent):
# The workflow is requesting external input
request_id = content.id
function_call = content.function_call
print(f"Workflow requests input: {function_call.name}")
print(f"Request data: {function_call.arguments}")
# Store the request_id to provide a response later
# Check for pending requests
if workflow_agent.pending_requests:
print(f"Pending requests: {list(workflow_agent.pending_requests.keys())}")
Dostarczanie odpowiedzi na oczekujące żądania
Aby kontynuować wykonywanie przepływu pracy po zewnętrznym żądaniu danych wejściowych:
# Create a response for the pending request
response_content = FunctionApprovalResponseContent(
id=request_id,
function_call=function_call,
approved=True,
)
response_message = ChatMessage(
role=Role.USER,
contents=[response_content],
)
# Continue the workflow with the response
async for update in workflow_agent.run_stream([response_message], thread=thread):
if update.text:
print(update.text, end="", flush=True)
Kompletny przykład
Oto kompletny przykład demonstracyjny pokazujący agenta procesu roboczego z wyjściem strumieniowym.
import asyncio
from agent_framework import (
ChatAgent,
ChatMessage,
Role,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework._workflows import SequentialBuilder
from azure.identity import AzureCliCredential
async def main():
# Set up the chat client
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Create specialized agents
researcher = ChatAgent(
name="Researcher",
instructions="Research the given topic and provide key facts.",
chat_client=chat_client,
)
writer = ChatAgent(
name="Writer",
instructions="Write engaging content based on the research provided.",
chat_client=chat_client,
)
reviewer = ChatAgent(
name="Reviewer",
instructions="Review the content and provide a final polished version.",
chat_client=chat_client,
)
# Build a sequential workflow
workflow = (
SequentialBuilder()
.add_agents([researcher, writer, reviewer])
.build()
)
# Convert to a workflow agent
workflow_agent = workflow.as_agent(name="Content Creation Pipeline")
# Create a thread and run the workflow
thread = workflow_agent.get_new_thread()
messages = [ChatMessage(role=Role.USER, content="Write about quantum computing")]
print("Starting workflow...")
print("=" * 60)
current_author = None
async for update in workflow_agent.run_stream(messages, thread=thread):
# Show when different agents are responding
if update.author_name and update.author_name != current_author:
if current_author:
print("\n" + "-" * 40)
print(f"\n[{update.author_name}]:")
current_author = update.author_name
if update.text:
print(update.text, end="", flush=True)
print("\n" + "=" * 60)
print("Workflow completed!")
if __name__ == "__main__":
asyncio.run(main())
Opis konwersji zdarzeń
Gdy przepływ pracy działa jako agent, zdarzenia przepływu pracy są konwertowane na odpowiedzi agenta. Typ odpowiedzi zależy od używanej metody:
-
run(): zwracaAgentRunResponsepełny wynik po zakończeniu przepływu pracy -
run_stream(): generujeAgentRunResponseUpdateobiekty w trakcie realizacji przepływu pracy, dostarczając aktualizacje w czasie rzeczywistym
Podczas wykonywania zdarzenia wewnętrznego przepływu pracy są mapowane na odpowiedzi agenta w następujący sposób:
| Zdarzenie przepływu pracy | Odpowiedź agenta |
|---|---|
AgentRunUpdateEvent |
Przekazywane jako AgentRunResponseUpdate (przesyłanie strumieniowe) lub agregowane do AgentRunResponse (bez przesyłania strumieniowego) |
RequestInfoEvent |
Przekonwertowane na FunctionCallContent i FunctionApprovalRequestContent |
| Inne zdarzenia | Dołączone do raw_representation dla widoczności i monitorowania |
Ta konwersja umożliwia korzystanie ze standardowego interfejsu agenta, jednocześnie mając dostęp do szczegółowych informacji o przepływie pracy w razie potrzeby.
Przypadki użycia
1. Złożone potoki agentów
Zawijanie przepływu pracy z wieloma agentami jako jednego agenta do użycia w aplikacjach:
User Request --> [Workflow Agent] --> Final Response
|
+-- Researcher Agent
+-- Writer Agent
+-- Reviewer Agent
2. Skład agenta
Użyj agentów przepływu pracy jako składników w większych systemach:
- Agent przepływu pracy może służyć jako narzędzie przez innego agenta
- Wiele agentów przepływu pracy można organizować razem
- Agenci przepływu pracy mogą być zagnieżdżani w innych przepływach pracy
3. Integracja interfejsu API
Ujawnianie złożonych przepływów pracy za pośrednictwem interfejsów API oczekujących standardowego interfejsu agenta, umożliwiając:
- Interfejsy rozmów korzystające z zaawansowanych przepływów pracy zaplecza
- Integracja z istniejącymi systemami opartymi na agentach
- Stopniowa migracja od prostych agentów do złożonych przepływów pracy
Dalsze kroki
- Dowiedz się, jak obsługiwać żądania i odpowiedzi w przepływach pracy
- Dowiedz się, jak zarządzać stanem w przepływach pracy
- Dowiedz się, jak tworzyć punkty kontrolne i wznawiać pracę z nich
- Dowiedz się, jak monitorować przepływy pracy
- Dowiedz się więcej o izolacji stanu w przepływach pracy
- Dowiedz się, jak wizualizować przepływy pracy