このページでは、Microsoft Agent Framework ワークフロー内で エージェント を使用する方法の概要について説明します。
概要
ワークフローにインテリジェンスを追加するには、ワークフロー実行の一部として AI エージェントを活用できます。 AI エージェントをワークフローに簡単に統合できるため、以前は実現することが困難だった複雑でインテリジェントなソリューションを作成できます。
エージェントをワークフローに直接追加する
エッジを使用して、ワークフローにエージェントを追加できます。
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// Create the agents first
AIAgent agentA = new ChatClientAgent(chatClient, instructions);
AIAgent agentB = new ChatClientAgent(chatClient, instructions);
// Build a workflow with the agents
WorkflowBuilder builder = new(agentA);
builder.AddEdge(agentA, agentB);
Workflow<ChatMessage> workflow = builder.Build<ChatMessage>();
ワークフローの実行
上記で作成したワークフロー内では、エージェントは実際には、ワークフローの他の部分とのエージェントの通信を処理する Executor 内にラップされます。 Executor は、次の 3 種類のメッセージを処理できます。
-
ChatMessage: 1 つのチャット メッセージ -
List<ChatMessage>: チャット メッセージの一覧 -
TurnToken: 新しいターンの開始を通知するターン トークン
Executor は、 TurnTokenを受け取るまでエージェントの応答をトリガーしません。
TurnTokenの前に受信したメッセージはすべて、TurnTokenの受信時にバッファーに格納され、エージェントに送信されます。
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, "Hello World!"));
// Must send the turn token to trigger the agents. The agents are wrapped as executors.
// When they receive messages, they will cache the messages and only start processing
// when they receive a TurnToken. The turn token will be passed from one agent to the next.
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
// The agents will run in streaming mode and an AgentRunUpdateEvent
// will be emitted as new chunks are generated.
if (evt is AgentRunUpdateEvent agentRunUpdate)
{
Console.WriteLine($"{agentRunUpdate.ExecutorId}: {agentRunUpdate.Data}");
}
}
組み込みのエージェント Executor の使用
エッジを使用して、ワークフローにエージェントを追加できます。
from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential
# Create the agents first
chat_client = AzureChatClient(credential=AzureCliCredential())
writer_agent: ChatAgent = chat_client.create_agent(
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
name="writer_agent",
)
reviewer_agent = chat_client.create_agent(
instructions=(
"You are an excellent content reviewer."
"Provide actionable feedback to the writer about the provided content."
"Provide the feedback in the most concise manner possible."
),
name="reviewer_agent",
)
# Build a workflow with the agents
builder = WorkflowBuilder()
builder.set_start_executor(writer_agent)
builder.add_edge(writer_agent, reviewer_agent)
workflow = builder.build()
ワークフローの実行
上記で作成したワークフロー内では、エージェントは実際には、ワークフローの他の部分とのエージェントの通信を処理する Executor 内にラップされます。 Executor は、次の 3 種類のメッセージを処理できます。
-
str: 文字列形式の 1 つのチャット メッセージ -
ChatMessage: 1 つのチャット メッセージ -
List<ChatMessage>: チャット メッセージの一覧
Executor は、これらの種類のいずれかのメッセージを受信するたびに、エージェントが応答をトリガーし、応答の種類が AgentExecutorResponse オブジェクトになります。 このクラスには、次のようなエージェントの応答に関する有用な情報が含まれています。
-
executor_id: この応答を生成した Executor の ID -
agent_run_response: エージェントからの完全な応答 -
full_conversation: この時点までの完全な会話履歴
ワークフローの実行時に、エージェントの応答に関連する 2 つのイベントの種類を生成できます。
-
AgentRunUpdateEventは、ストリーミング モードで生成されるエージェントの応答のチャンクを含みます。 -
AgentRunEventには、エージェントからの完全な応答が非ストリーミング モードで含まれています。
既定では、エージェントはストリーミング モードで実行される Executor にラップされます。 カスタム Executor を作成することで、この動作をカスタマイズできます。 詳細については、次のセクションを参照してください。
last_executor_id = None
async for event in workflow.run_streaming("Write a short blog post about AI agents."):
if isinstance(event, AgentRunUpdateEvent):
if event.executor_id != last_executor_id:
if last_executor_id is not None:
print()
print(f"{event.executor_id}:", end=" ", flush=True)
last_executor_id = event.executor_id
print(event.data, end="", flush=True)
カスタム エージェント「エグゼキューター」の使用
AI エージェントをワークフローに統合する方法をカスタマイズしたい場合があります。 これを実現するには、カスタム Executor を作成します。 これにより、次の操作を制御できます。
- エージェントの呼び出し: ストリーミングまたは非ストリーミング
- エージェントが処理するメッセージの種類 (カスタム メッセージの種類を含む)
- 初期化とクリーンアップを含むエージェントのライフ サイクル
- エージェント スレッドとその他のリソースの使用
- カスタム イベントを含む、エージェントの実行中に生成される追加のイベント
- 共有状態や要求/応答など、他のワークフロー機能との統合
internal sealed class CustomAgentExecutor : Executor<CustomInput, CustomOutput>("CustomAgentExecutor")
{
private readonly AIAgent _agent;
/// <summary>
/// Creates a new instance of the <see cref="CustomAgentExecutor"/> class.
/// </summary>
/// <param name="agent">The AI agent used for custom processing</param>
public CustomAgentExecutor(AIAgent agent) : base("CustomAgentExecutor")
{
this._agent = agent;
}
public async ValueTask<CustomOutput> HandleAsync(CustomInput message, IWorkflowContext context)
{
// Retrieve any shared states if needed
var sharedState = await context.ReadStateAsync<SharedStateType>("sharedStateId", scopeName: "SharedStateScope");
// Render the input for the agent
var agentInput = RenderInput(message, sharedState);
// Invoke the agent
// Assume the agent is configured with structured outputs with type `CustomOutput`
var response = await this._agent.RunAsync(agentInput);
var customOutput = JsonSerializer.Deserialize<CustomOutput>(response.Text);
return customOutput;
}
}
from agent_framework import (
ChatAgent,
ChatMessage,
Executor,
WorkflowContext,
handler
)
class Writer(Executor):
agent: ChatAgent
def __init__(self, chat_client: AzureChatClient, id: str = "writer"):
# Create a domain specific agent using your configured AzureChatClient.
agent = chat_client.create_agent(
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
)
# Associate the agent with this executor node. The base Executor stores it on self.agent.
super().__init__(agent=agent, id=id)
@handler
async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None:
"""Handles a single chat message and forwards the accumulated messages to the next executor in the workflow."""
# Invoke the agent with the incoming message and get the response
messages: list[ChatMessage] = [message]
response = await self.agent.run(messages)
# Accumulate messages and send them to the next executor in the workflow.
messages.extend(response.messages)
await ctx.send_message(messages)