本文件概述如何在 Microsoft Agent Framework 中使用 工作流程作為代理程式 。
概觀
有時你已經建立了一個複雜的工作流程,包含多個代理、自訂執行程序和複雜的邏輯,但你希望能像使用其他代理一樣使用它。 這正是工作流程代理能讓你做到的。 透過將你的工作流程包裝成 Agent,你可以透過熟悉的 API 與它互動,就像簡單聊天代理一樣。
主要優點
- 統一介面:使用與簡單代理相同的 API 與複雜工作流程互動
- API 相容性:將工作流程整合至支援代理介面的現有系統
- 可組合性:將工作流程代理作為大型代理系統或其他工作流程的建構組件
- 執行緒管理:利用代理執行緒進行對話狀態、檢查點及恢復
- 串流支援:在工作流程執行時即時獲得更新
運作方式
當你將工作流程轉換成代理程式時:
- 工作流程經過驗證以確保啟動執行程式能接受聊天訊息
- 會建立一個執行緒來管理對話狀態和檢查點
- 輸入訊息會被路由到工作流程的啟動執行器
- 工作流程事件會轉換成代理回應更新
- 外部輸入請求(來自
RequestInfoExecutor)會以函式呼叫的形式呈現
需求
若要將工作流程作為代理使用,該工作流程的啟動執行器必須能作為輸入處理 IEnumerable<ChatMessage> 。 當使用 ChatClientAgent 或其他基於代理的執行者時,這會自動滿足。
建立工作流程代理程式
使用 AsAgent() 擴充方法將任何相容的工作流程轉換為代理:
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
// First, build your workflow
var workflow = AgentWorkflowBuilder
.CreateSequentialPipeline(researchAgent, writerAgent, reviewerAgent)
.Build();
// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAgent(
id: "content-pipeline",
name: "Content Pipeline Agent",
description: "A multi-agent workflow that researches, writes, and reviews content"
);
AsAgent 參數
| 參數 | 類型 | Description |
|---|---|---|
id |
string? |
代理人可選的唯一識別碼。 若未提供,則為自動生成。 |
name |
string? |
代理人可選的顯示名稱。 |
description |
string? |
代理人目的的可選描述。 |
checkpointManager |
CheckpointManager? |
可選的檢查點管理器,用於在不同會話間保持資料的持久性。 |
executionEnvironment |
IWorkflowExecutionEnvironment? |
可選執行環境。 預設為InProcessExecution.OffThread或InProcessExecution.Concurrent,依據工作流程設定。 |
使用工作流程代理
建立執行緒
每次與工作流程代理的對話都需要一個執行緒來管理狀態:
// Create a new thread for the conversation
AgentThread thread = workflowAgent.GetNewThread();
非串流執行
對於想要完整回應的簡單應用:
var messages = new List<ChatMessage>
{
new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};
AgentRunResponse response = await workflowAgent.RunAsync(messages, thread);
foreach (ChatMessage message in response.Messages)
{
Console.WriteLine($"{message.AuthorName}: {message.Text}");
}
串流執行
為了在工作流程執行期間獲得即時更新:
var messages = new List<ChatMessage>
{
new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};
await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
// Process streaming updates from each agent in the workflow
if (!string.IsNullOrEmpty(update.Text))
{
Console.Write(update.Text);
}
}
處理外部輸入請求
當工作流程包含執行器,並請求外部輸入(使用 RequestInfoExecutor),這些請求會在代理回應中以函式呼叫的形式呈現:
await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
// Check for function call requests
foreach (AIContent content in update.Contents)
{
if (content is FunctionCallContent functionCall)
{
// Handle the external input request
Console.WriteLine($"Workflow requests input: {functionCall.Name}");
Console.WriteLine($"Request data: {functionCall.Arguments}");
// Provide the response in the next message
}
}
}
執行緒序列化與恢復
工作流程代理執行緒可序列化以維持持久性,之後再繼續:
// Serialize the thread state
JsonElement serializedThread = thread.Serialize();
// Store serializedThread to your persistence layer...
// Later, resume the thread
AgentThread resumedThread = workflowAgent.DeserializeThread(serializedThread);
// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedThread))
{
Console.Write(update.Text);
}
使用工作流程代理進行檢查點
啟用檢查點以在程序重新啟動期間持續維持工作流程狀態:
// Create a checkpoint manager with your storage backend
var checkpointManager = new CheckpointManager(new FileCheckpointStorage("./checkpoints"));
// Create workflow agent with checkpointing enabled
AIAgent workflowAgent = workflow.AsAgent(
id: "persistent-workflow",
name: "Persistent Workflow Agent",
checkpointManager: checkpointManager
);
需求
若要將工作流程作為代理使用,該工作流程的啟動執行器必須能作為輸入處理 list[ChatMessage] 。 當使用 ChatAgent 或 AgentExecutor時,這會自動滿足。
建立工作流程代理程式
請在任何相容的工作流程上呼叫 as_agent() 以將其轉換為代理:
from agent_framework import WorkflowBuilder, ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
# Create your chat client and agents
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
researcher = ChatAgent(
name="Researcher",
instructions="Research and gather information on the given topic.",
chat_client=chat_client,
)
writer = ChatAgent(
name="Writer",
instructions="Write clear, engaging content based on research.",
chat_client=chat_client,
)
# Build your workflow
workflow = (
WorkflowBuilder()
.set_start_executor(researcher)
.add_edge(researcher, writer)
.build()
)
# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")
as_agent 參數
| 參數 | 類型 | Description |
|---|---|---|
name |
str | None |
代理人可選的顯示名稱。 若未提供,則為自動生成。 |
使用工作流代理
建立執行緒
每次與工作流程代理的對話都需要一個執行緒來管理狀態:
# Create a new thread for the conversation
thread = workflow_agent.get_new_thread()
非串流執行
對於想要完整回應的簡單應用:
from agent_framework import ChatMessage, Role
messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]
response = await workflow_agent.run(messages, thread=thread)
for message in response.messages:
print(f"{message.author_name}: {message.text}")
串流執行
為了在工作流程執行期間獲得即時更新:
messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]
async for update in workflow_agent.run_stream(messages, thread=thread):
# Process streaming updates from each agent in the workflow
if update.text:
print(update.text, end="", flush=True)
處理外部輸入請求
當工作流程包含執行器,這些執行器會請求外部輸入(使用 RequestInfoExecutor),這些請求會以函式呼叫的形式呈現。 工作流程代理會追蹤待處理的請求,並在繼續前期待回應:
from agent_framework import (
FunctionCallContent,
FunctionApprovalRequestContent,
FunctionApprovalResponseContent,
)
async for update in workflow_agent.run_stream(messages, thread=thread):
for content in update.contents:
if isinstance(content, FunctionApprovalRequestContent):
# The workflow is requesting external input
request_id = content.id
function_call = content.function_call
print(f"Workflow requests input: {function_call.name}")
print(f"Request data: {function_call.arguments}")
# Store the request_id to provide a response later
# Check for pending requests
if workflow_agent.pending_requests:
print(f"Pending requests: {list(workflow_agent.pending_requests.keys())}")
回覆待處理請求
為了在收到外部系統的輸入請求後繼續執行工作流程:
# Create a response for the pending request
response_content = FunctionApprovalResponseContent(
id=request_id,
function_call=function_call,
approved=True,
)
response_message = ChatMessage(
role=Role.USER,
contents=[response_content],
)
# Continue the workflow with the response
async for update in workflow_agent.run_stream([response_message], thread=thread):
if update.text:
print(update.text, end="", flush=True)
完整範例
這裡有一個完整的範例,展示一個工作流程代理的串流輸出:
import asyncio
from agent_framework import (
ChatAgent,
ChatMessage,
Role,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework._workflows import SequentialBuilder
from azure.identity import AzureCliCredential
async def main():
# Set up the chat client
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Create specialized agents
researcher = ChatAgent(
name="Researcher",
instructions="Research the given topic and provide key facts.",
chat_client=chat_client,
)
writer = ChatAgent(
name="Writer",
instructions="Write engaging content based on the research provided.",
chat_client=chat_client,
)
reviewer = ChatAgent(
name="Reviewer",
instructions="Review the content and provide a final polished version.",
chat_client=chat_client,
)
# Build a sequential workflow
workflow = (
SequentialBuilder()
.add_agents([researcher, writer, reviewer])
.build()
)
# Convert to a workflow agent
workflow_agent = workflow.as_agent(name="Content Creation Pipeline")
# Create a thread and run the workflow
thread = workflow_agent.get_new_thread()
messages = [ChatMessage(role=Role.USER, content="Write about quantum computing")]
print("Starting workflow...")
print("=" * 60)
current_author = None
async for update in workflow_agent.run_stream(messages, thread=thread):
# Show when different agents are responding
if update.author_name and update.author_name != current_author:
if current_author:
print("\n" + "-" * 40)
print(f"\n[{update.author_name}]:")
current_author = update.author_name
if update.text:
print(update.text, end="", flush=True)
print("\n" + "=" * 60)
print("Workflow completed!")
if __name__ == "__main__":
asyncio.run(main())
理解事件轉換
當工作流程以代理執行時,工作流程事件會轉換成代理回應。 回應的類型取決於你使用哪種方法:
-
run():完成工作流程後,回傳一個AgentRunResponse,其中包含完整結果。 -
run_stream(): 在工作流程執行時產生AgentRunResponseUpdate物件,提供即時更新
執行過程中,內部工作流程事件會映射到代理回應,具體如下:
| 工作流程事件 | 代理回應 |
|---|---|
AgentRunUpdateEvent |
以 AgentRunResponseUpdate 傳輸(串流)或聚合成 AgentRunResponse (非串流) |
RequestInfoEvent |
轉換為 FunctionCallContent 和 FunctionApprovalRequestContent |
| 其他活動 | 包含於 raw_representation 的可觀察性中 |
此轉換讓您能使用標準客服介面,同時在需要時仍能取得詳細的工作流程資訊。
使用案例
1. 複雜代理管線
將多代理工作流程包裝為單一代理,用於應用程式:
User Request --> [Workflow Agent] --> Final Response
|
+-- Researcher Agent
+-- Writer Agent
+-- Reviewer Agent
2. 代理人組成
在大型系統中將工作流程代理作為元件使用:
- 工作流程代理可以被其他代理當作工具使用
- 多個工作流程代理可以協同協調
- 工作流程代理可以嵌套於其他工作流程中
3. API 整合
透過預期標準代理介面的 API 揭露複雜工作流程,實現:
- 使用複雜後端工作流程的聊天介面
- 與現有代理系統的整合
- 從簡單客服人員逐步遷移到複雜工作流程