順次オーケストレーションでは、エージェントはパイプラインに編成されます。 各エージェントはタスクを順番に処理し、その出力をシーケンス内の次のエージェントに渡します。 これは、ドキュメント レビュー、データ処理パイプライン、マルチステージ推論など、各ステップが前の手順に基づいて構築されるワークフローに最適です。
ここでは、次の内容について学習します
- エージェントのシーケンシャル パイプラインを作成する方法
- 前の出力に基づいて各エージェントをチェーンする方法
- 特殊なタスク用のカスタム Executor とエージェントを混在させる方法
- パイプラインを介して会話フローを追跡する方法
エージェントの定義
順次オーケストレーションでは、各エージェントが順番にタスクを処理し、シーケンス内の次のエージェントに出力を渡すパイプラインにエージェントが編成されます。
Azure OpenAI クライアントを設定する
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName)
.AsIChatClient();
順番に動作する特殊なエージェントを作成します。
// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient,
$"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
$"input by outputting the name of the input language and then translating the input to {targetLanguage}.");
// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
select GetTranslationAgent(lang, client));
シーケンシャル オーケストレーションを設定する
AgentWorkflowBuilderを使用してワークフローを構築します。
// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);
シーケンシャル ワークフローの実行
ワークフローを実行し、イベントを処理します。
// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
StreamingRun run = await InProcessExecution.StreamAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is AgentRunUpdateEvent e)
{
Console.WriteLine($"{e.ExecutorId}: {e.Data}");
}
else if (evt is WorkflowOutputEvent outputEvt)
{
result = (List<ChatMessage>)outputEvt.Data!;
break;
}
}
// Display final result
foreach (var message in result)
{
Console.WriteLine($"{message.Role}: {message.Content}");
}
サンプル出力
French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!
主要概念
- 順次処理: 各エージェントは、前のエージェントの出力を順番に処理します。
- AgentWorkflowBuilder.BuildSequential(): エージェントのコレクションからパイプライン ワークフローを作成します
- ChatClientAgent: 特定の手順を使用してチャット クライアントによってサポートされるエージェントを表します
- StreamingRun: イベント ストリーミング機能を使用してリアルタイム実行を提供します
-
イベント処理:
AgentRunUpdateEventを通じてエージェントの進行状況を監視し、WorkflowOutputEventを通じて完了を監視する
順次オーケストレーションでは、各エージェントがタスクを順番に処理し、出力は次のタスクに送られます。 まず、2 段階のプロセスのエージェントを定義します。
from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential
# 1) Create agents using AzureChatClient
chat_client = AzureChatClient(credential=AzureCliCredential())
writer = chat_client.create_agent(
instructions=(
"You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
),
name="writer",
)
reviewer = chat_client.create_agent(
instructions=(
"You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
),
name="reviewer",
)
シーケンシャル オーケストレーションを設定する
SequentialBuilder クラスは、エージェントがタスクを順番に処理するパイプラインを作成します。 各エージェントは、会話の完全な履歴を確認し、応答を追加します。
from agent_framework import SequentialBuilder
# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder().participants([writer, reviewer]).build()
シーケンシャル ワークフローの実行
ワークフローを実行し、各エージェントの投稿を示す最後の会話を収集します。
from agent_framework import ChatMessage, WorkflowOutputEvent
# 3) Run and print final conversation
output_evt: WorkflowOutputEvent | None = None
async for event in workflow.run_stream("Write a tagline for a budget-friendly eBike."):
if isinstance(event, WorkflowOutputEvent):
output_evt = event
if output_evt:
print("===== Final Conversation =====")
messages: list[ChatMessage] | Any = output_evt.data
for i, msg in enumerate(messages, start=1):
name = msg.author_name or ("assistant" if msg.role == Role.ASSISTANT else "user")
print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")
サンプル出力
===== Final Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend less—your affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!
上級: エージェントとカスタムエグゼキューターの混在
シーケンシャル オーケストレーションでは、特殊な処理のためのカスタム Executor とエージェントの混在がサポートされています。 これは、LLM を必要としないカスタム ロジックが必要な場合に便利です。
カスタム Executor を定義する
from agent_framework import Executor, WorkflowContext, handler
from agent_framework import ChatMessage, Role
class Summarizer(Executor):
"""Simple summarizer: consumes full conversation and appends an assistant summary."""
@handler
async def summarize(
self,
conversation: list[ChatMessage],
ctx: WorkflowContext[list[ChatMessage]]
) -> None:
users = sum(1 for m in conversation if m.role == Role.USER)
assistants = sum(1 for m in conversation if m.role == Role.ASSISTANT)
summary = ChatMessage(
role=Role.ASSISTANT,
text=f"Summary -> users:{users} assistants:{assistants}"
)
await ctx.send_message(list(conversation) + [summary])
混合シーケンシャル ワークフローの作成
# Create a content agent
content = chat_client.create_agent(
instructions="Produce a concise paragraph answering the user's request.",
name="content",
)
# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder().participants([content, summarizer]).build()
カスタム Executor を使用したサンプル出力
------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
Summary -> users:1 assistants:1
主要概念
- 共有コンテキスト: 各参加者は、以前のすべてのメッセージを含む完全な会話履歴を受け取ります
-
順序が重要です: エージェントは
participants()リストで指定された順序で厳密に実行します - 柔軟な参加者: エージェントとカスタム Executor を任意の順序で混在させることができます
- 会話フロー: 各エージェント/Executor が会話に追加され、完全なダイアログが作成されます