다음을 통해 공유


단순 동시 워크플로 만들기

이 자습서에서는 Agent Framework를 사용하여 동시 워크플로를 만드는 방법을 보여 줍니다. 여러 실행기 또는 에이전트가 동시에 작업한 다음 결과를 집계할 수 있도록 병렬 처리를 가능하게 하는 팬아웃 및 팬인 패턴을 구현하는 방법을 알아봅니다.

빌드할 내용

다음과 같은 워크플로를 만듭니다.

  • 질문을 입력으로 사용합니다(예: "온도란?").
  • 두 명의 전문 AI 에이전트(물리학자 및 화학자)에게 동일한 질문을 동시에 보냅니다.
  • 두 에이전트의 응답을 수집하고 단일 출력으로 결합합니다.
  • 팬아웃/팬인 패턴을 통해 AI 에이전트와의 동시 실행을 구현합니다.

다루는 개념

필수 조건

1단계: NuGet 패키지 설치

먼저 .NET 프로젝트에 필요한 패키지를 설치합니다.

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease

2단계: 종속성 및 Azure OpenAI 설정

먼저 필요한 NuGet 패키지 및 Azure OpenAI 클라이언트를 사용하여 프로젝트를 설정합니다.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
            .GetChatClient(deploymentName).AsIChatClient();

3단계: 전문가 AI 에이전트 만들기

전문가의 관점을 제공하는 두 개의 특수한 AI 에이전트를 만듭니다.

        // Create the AI agents with specialized expertise
        ChatClientAgent physicist = new(
            chatClient,
            name: "Physicist",
            instructions: "You are an expert in physics. You answer questions from a physics perspective."
        );

        ChatClientAgent chemist = new(
            chatClient,
            name: "Chemist",
            instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
        );

4단계: 시작 실행기 만들기

여러 에이전트에 입력을 전송하여 동시 처리를 시작하는 실행기를 만듭니다.

        var startExecutor = new ConcurrentStartExecutor();

구현:ConcurrentStartExecutor

/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
    /// <summary>
    /// Starts the concurrent processing by sending messages to the agents.
    /// </summary>
    /// <param name="message">The user message to process</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Broadcast the message to all connected agents. Receiving agents will queue
        // the message but will not start processing until they receive a turn token.
        await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);

        // Broadcast the turn token to kick off the agents.
        await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
    }
}

5단계: 집계 실행기 만들기

여러 에이전트의 응답을 수집하고 결합하는 실행기를 만듭니다.

        var aggregationExecutor = new ConcurrentAggregationExecutor();

구현:ConcurrentAggregationExecutor

/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
    Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
    private readonly List<ChatMessage> _messages = [];

    /// <summary>
    /// Handles incoming messages from the agents and aggregates their responses.
    /// </summary>
    /// <param name="message">The message from the agent</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._messages.AddRange(message);

        if (this._messages.Count == 2)
        {
            var formattedMessages = string.Join(Environment.NewLine,
                this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
            await context.YieldOutputAsync(formattedMessages, cancellationToken);
        }
    }
}

6단계: 워크플로 빌드

팬아웃 및 팬인 에지 패턴을 사용하여 실행기 및 에이전트를 연결합니다.

        // Build the workflow by adding executors and connecting them
        var workflow = new WorkflowBuilder(startExecutor)
            .AddFanOutEdge(startExecutor, targets: [physicist, chemist])
            .AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
            .WithOutputFrom(aggregationExecutor)
            .Build();

7단계: 워크플로 실행

워크플로를 실행하고 스트리밍 출력을 캡처합니다.

        // Execute the workflow in streaming mode
        await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            if (evt is WorkflowOutputEvent output)
            {
                Console.WriteLine($"Workflow completed with results:\n{output.Data}");
            }
        }
    }
}

작동 방식

  1. 팬 아웃: ConcurrentStartExecutor 입력 질문을 수신하고 팬 아웃 에지는 동시에 물리학자와 화학자 에이전트 모두에게 보냅니다.
  2. 병렬 처리: 두 AI 에이전트는 각각 전문가의 관점을 제공하여 동일한 질문을 동시에 처리합니다.
  3. 팬인: ConcurrentAggregationExecutor 두 에이전트에서 응답을 수집 ChatMessage 합니다.
  4. 집계: 두 응답이 모두 수신되면 집계기는 형식이 지정된 출력으로 결합합니다.

주요 개념

  • Fan-Out Edges: AddFanOutEdge() 동일한 입력을 여러 실행기 또는 에이전트에 배포하는 데 사용합니다.
  • Fan-In Edge: AddFanInEdge() 여러 소스 실행기에서 결과를 수집하는 데 사용합니다.
  • AI 에이전트 통합: AI 에이전트는 워크플로에서 실행자로 직접 사용할 수 있습니다.
  • 실행기 기본 클래스: 사용자 지정 실행기는 Executor<TInput>을 상속하고 HandleAsync 메서드를 재정의합니다.
  • 토큰 전환: TurnToken 에이전트에 신호를 전송하여 대기 중인 메시지 처리를 시작합니다.
  • 스트리밍 실행: 워크플로가 진행됨에 따라 실시간 업데이트를 가져오는 데 사용합니다 StreamAsync() .

전체 구현

AI 에이전트와 이 동시 워크플로의 전체 작업 구현은 Agent Framework 리포지토리의 동시/Program.cs 샘플을 참조하세요.

Python 구현에서는 여러 병렬 실행기를 통해 데이터를 처리하고 다양한 형식의 결과를 집계하는 동시 워크플로를 빌드합니다. 이 예제에서는 프레임워크가 동시 처리에서 혼합된 결과 형식을 처리하는 방법을 보여 줍니다.

빌드할 내용

다음과 같은 워크플로를 만듭니다.

  • 숫자 목록을 입력으로 사용합니다.
  • 목록을 두 개의 병렬 실행기(계산 평균 1개, 계산 합계 1개)로 분산합니다.
  • 다른 결과 형식(float 및 int)을 최종 출력으로 집계합니다.
  • 프레임워크가 동시 실행기에서 다양한 결과 형식을 처리하는 방법을 보여 줍니다.

다루는 개념

필수 조건

  • Python 3.10 이상
  • Agent Framework Core가 설치됨: pip install agent-framework-core --pre

1단계: 필요한 종속성 가져오기

먼저 에이전트 프레임워크에서 필요한 구성 요소를 가져옵니다.

import asyncio
import random

from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never

2단계: 디스패처 실행기 만들기

디스패처는 초기 입력을 여러 병렬 실행기에 배포합니다.

class Dispatcher(Executor):
    """
    The sole purpose of this executor is to dispatch the input of the workflow to
    other executors.
    """

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
        if not numbers:
            raise RuntimeError("Input must be a valid list of integers.")

        await ctx.send_message(numbers)

3단계: 병렬 처리 실행기 만들기

데이터를 동시에 처리하는 두 개의 실행기를 만듭니다.

class Average(Executor):
    """Calculate the average of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
        average: float = sum(numbers) / len(numbers)
        await ctx.send_message(average)


class Sum(Executor):
    """Calculate the sum of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
        total: int = sum(numbers)
        await ctx.send_message(total)

4단계: Aggregator 실행기 만들기

집계기는 병렬 실행기에서 결과를 수집하고 최종 출력을 생성합니다.

class Aggregator(Executor):
    """Aggregate the results from the different tasks and yield the final output."""

    @handler
    async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
        """Receive the results from the source executors.

        The framework will automatically collect messages from the source executors
        and deliver them as a list.

        Args:
            results (list[int | float]): execution results from upstream executors.
                The type annotation must be a list of union types that the upstream
                executors will produce.
            ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
        """
        await ctx.yield_output(results)

5단계: 워크플로 빌드

팬아웃 및 팬인 에지 패턴을 사용하여 실행기를 연결합니다.

async def main() -> None:
    # 1) Create the executors
    dispatcher = Dispatcher(id="dispatcher")
    average = Average(id="average")
    summation = Sum(id="summation")
    aggregator = Aggregator(id="aggregator")

    # 2) Build a simple fan out and fan in workflow
    workflow = (
        WorkflowBuilder()
        .set_start_executor(dispatcher)
        .add_fan_out_edges(dispatcher, [average, summation])
        .add_fan_in_edges([average, summation], aggregator)
        .build()
    )

6단계: 워크플로 실행

샘플 데이터로 워크플로를 실행하고 출력을 캡처합니다.

    # 3) Run the workflow
    output: list[int | float] | None = None
    async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
        if isinstance(event, WorkflowOutputEvent):
            output = event.data

    if output is not None:
        print(output)

if __name__ == "__main__":
    asyncio.run(main())

작동 방식

  1. 팬아웃: Dispatcher 입력 목록을 수신하고 이를 AverageSum 실행기에 동시에 보냅니다.
  2. 병렬 처리: 두 실행기는 동일한 입력을 동시에 처리하여 다른 결과 형식을 생성합니다.
    • Average 실행기에서 float 결과를 생성합니다.
    • Sum실행기가 결과를 생성합니다.int
  3. Fan-In: Aggregator 두 실행기에서 두 형식을 모두 포함하는 목록으로 결과를 받습니다.
  4. 형식 처리: 프레임워크는 공용 구조체 형식(int | float)을 사용하여 다양한 결과 형식을 자동으로 처리합니다.

주요 개념

  • Fan-Out Edge: 여러 실행기에 동일한 입력을 보내는 데 사용 add_fan_out_edges()
  • Fan-In Edge: 여러 원본 실행기에서 결과를 수집하는 데 사용 add_fan_in_edges()
  • 공용 구조체 형식: 다음과 같은 형식 주석을 사용하여 다양한 결과 형식 처리 list[int | float]
  • 동시 실행: 여러 실행기가 동시에 데이터를 처리하여 성능 향상

전체 구현

이 동시 워크플로의 전체 작업 구현은 Agent Framework 리포지토리의 aggregate_results_of_different_types.py 샘플을 참조하세요.

다음 단계