次の方法で共有


Microsoft Agent Framework ワークフロー オーケストレーション - シーケンシャル

順次オーケストレーションでは、エージェントはパイプラインに編成されます。 各エージェントはタスクを順番に処理し、その出力をシーケンス内の次のエージェントに渡します。 これは、ドキュメント レビュー、データ処理パイプライン、マルチステージ推論など、各ステップが前の手順に基づいて構築されるワークフローに最適です。

シーケンシャル オーケストレーション

ここでは、次の内容について学習します

  • エージェントのシーケンシャル パイプラインを作成する方法
  • 前の出力に基づいて各エージェントをチェーンする方法
  • 特殊なタスク用のカスタム Executor とエージェントを混在させる方法
  • パイプラインを介して会話フローを追跡する方法

エージェントの定義

順次オーケストレーションでは、各エージェントが順番にタスクを処理し、シーケンス内の次のエージェントに出力を渡すパイプラインにエージェントが編成されます。

Azure OpenAI クライアントを設定する

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
    .GetChatClient(deploymentName)
    .AsIChatClient();

順番に動作する特殊なエージェントを作成します。

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

シーケンシャル オーケストレーションを設定する

AgentWorkflowBuilderを使用してワークフローを構築します。

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

シーケンシャル ワークフローの実行

ワークフローを実行し、イベントを処理します。

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

StreamingRun run = await InProcessExecution.StreamAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is AgentRunUpdateEvent e)
    {
        Console.WriteLine($"{e.ExecutorId}: {e.Data}");
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = (List<ChatMessage>)outputEvt.Data!;
        break;
    }
}

// Display final result
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Content}");
}

サンプル出力

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

主要概念

  • 順次処理: 各エージェントは、前のエージェントの出力を順番に処理します。
  • AgentWorkflowBuilder.BuildSequential(): エージェントのコレクションからパイプライン ワークフローを作成します
  • ChatClientAgent: 特定の手順を使用してチャット クライアントによってサポートされるエージェントを表します
  • StreamingRun: イベント ストリーミング機能を使用してリアルタイム実行を提供します
  • イベント処理: AgentRunUpdateEvent を通じてエージェントの進行状況を監視し、WorkflowOutputEvent を通じて完了を監視する

順次オーケストレーションでは、各エージェントがタスクを順番に処理し、出力は次のタスクに送られます。 まず、2 段階のプロセスのエージェントを定義します。

from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential

# 1) Create agents using AzureChatClient
chat_client = AzureChatClient(credential=AzureCliCredential())

writer = chat_client.create_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.create_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

シーケンシャル オーケストレーションを設定する

SequentialBuilder クラスは、エージェントがタスクを順番に処理するパイプラインを作成します。 各エージェントは、会話の完全な履歴を確認し、応答を追加します。

from agent_framework import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder().participants([writer, reviewer]).build()

シーケンシャル ワークフローの実行

ワークフローを実行し、各エージェントの投稿を示す最後の会話を収集します。

from agent_framework import ChatMessage, WorkflowOutputEvent

# 3) Run and print final conversation
output_evt: WorkflowOutputEvent | None = None
async for event in workflow.run_stream("Write a tagline for a budget-friendly eBike."):
    if isinstance(event, WorkflowOutputEvent):
        output_evt = event

if output_evt:
    print("===== Final Conversation =====")
    messages: list[ChatMessage] | Any = output_evt.data
    for i, msg in enumerate(messages, start=1):
        name = msg.author_name or ("assistant" if msg.role == Role.ASSISTANT else "user")
        print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")

サンプル出力

===== Final Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend less—your affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

上級: エージェントとカスタムエグゼキューターの混在

シーケンシャル オーケストレーションでは、特殊な処理のためのカスタム Executor とエージェントの混在がサポートされています。 これは、LLM を必要としないカスタム ロジックが必要な場合に便利です。

カスタム Executor を定義する

from agent_framework import Executor, WorkflowContext, handler
from agent_framework import ChatMessage, Role

class Summarizer(Executor):
    """Simple summarizer: consumes full conversation and appends an assistant summary."""

    @handler
    async def summarize(
        self,
        conversation: list[ChatMessage],
        ctx: WorkflowContext[list[ChatMessage]]
    ) -> None:
        users = sum(1 for m in conversation if m.role == Role.USER)
        assistants = sum(1 for m in conversation if m.role == Role.ASSISTANT)
        summary = ChatMessage(
            role=Role.ASSISTANT,
            text=f"Summary -> users:{users} assistants:{assistants}"
        )
        await ctx.send_message(list(conversation) + [summary])

混合シーケンシャル ワークフローの作成

# Create a content agent
content = chat_client.create_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder().participants([content, summarizer]).build()

カスタム Executor を使用したサンプル出力

------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
Summary -> users:1 assistants:1

主要概念

  • 共有コンテキスト: 各参加者は、以前のすべてのメッセージを含む完全な会話履歴を受け取ります
  • 順序が重要です: エージェントは participants() リストで指定された順序で厳密に実行します
  • 柔軟な参加者: エージェントとカスタム Executor を任意の順序で混在させることができます
  • 会話フロー: 各エージェント/Executor が会話に追加され、完全なダイアログが作成されます

次のステップ