Freigeben über


Erstellen eines einfachen parallelen Workflows

In diesem Lernprogramm wird das Erstellen eines gleichzeitigen Workflows mithilfe von Agent Framework veranschaulicht. Sie lernen, Fanout- und Fan-In-Muster zu implementieren, die die parallele Verarbeitung ermöglichen, sodass mehrere Executoren oder Agents gleichzeitig arbeiten und dann ihre Ergebnisse aggregieren können.

Was Sie erstellen werden

Sie erstellen einen Workflow, der:

  • Nimmt eine Frage als Eingabe an (z. B. "Was ist Temperatur?")
  • Sendet die gleiche Frage gleichzeitig an zwei experten KI-Agenten (Physiker und Chemiker)
  • Sammelt und kombiniert Antworten von beiden Agents in einer einzigen Ausgabe.
  • Veranschaulicht die gleichzeitige Ausführung mit KI-Agents mithilfe von Fanout-/Fan-In-Mustern

Behandelte Konzepte

Voraussetzungen

Schritt 1: Installieren von NuGet-Paketen

Installieren Sie zunächst die erforderlichen Pakete für Ihr .NET-Projekt:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease

Schritt 2: Einrichten von Abhängigkeiten und Azure OpenAI

Richten Sie zunächst Ihr Projekt mit den erforderlichen NuGet-Paketen und dem Azure OpenAI-Client ein:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
            .GetChatClient(deploymentName).AsIChatClient();

Schritt 3: Erstellung von Experten-KI-Agenten

Erstellen Sie zwei spezialisierte KI-Agents, die Expertenperspektiven bieten:

        // Create the AI agents with specialized expertise
        ChatClientAgent physicist = new(
            chatClient,
            name: "Physicist",
            instructions: "You are an expert in physics. You answer questions from a physics perspective."
        );

        ChatClientAgent chemist = new(
            chatClient,
            name: "Chemist",
            instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
        );

Schritt 4: Erstellen des Start-Executors

Erstellen Sie einen Executor, der die gleichzeitige Verarbeitung initiiert, indem Sie Eingaben an mehrere Agents senden:

        var startExecutor = new ConcurrentStartExecutor();

Die ConcurrentStartExecutor Implementierung:

/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
    /// <summary>
    /// Starts the concurrent processing by sending messages to the agents.
    /// </summary>
    /// <param name="message">The user message to process</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Broadcast the message to all connected agents. Receiving agents will queue
        // the message but will not start processing until they receive a turn token.
        await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);

        // Broadcast the turn token to kick off the agents.
        await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
    }
}

Schritt 5: Erstellen des Aggregation Executors

Erstellen Sie einen Executor, der Antworten von mehreren Agents sammelt und kombiniert:

        var aggregationExecutor = new ConcurrentAggregationExecutor();

Die ConcurrentAggregationExecutor Implementierung:

/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
    Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
    private readonly List<ChatMessage> _messages = [];

    /// <summary>
    /// Handles incoming messages from the agents and aggregates their responses.
    /// </summary>
    /// <param name="message">The message from the agent</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._messages.AddRange(message);

        if (this._messages.Count == 2)
        {
            var formattedMessages = string.Join(Environment.NewLine,
                this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
            await context.YieldOutputAsync(formattedMessages, cancellationToken);
        }
    }
}

Schritt 6: Erstellen des Workflows

Verbinden Sie die Ausführenden und die Agenten mithilfe von Fan-Out- und Fan-In-Mustern.

        // Build the workflow by adding executors and connecting them
        var workflow = new WorkflowBuilder(startExecutor)
            .AddFanOutEdge(startExecutor, targets: [physicist, chemist])
            .AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
            .WithOutputFrom(aggregationExecutor)
            .Build();

Schritt 7: Ausführen des Workflows

Führen Sie den Workflow aus, und erfassen Sie die Streamingausgabe:

        // Execute the workflow in streaming mode
        await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            if (evt is WorkflowOutputEvent output)
            {
                Console.WriteLine($"Workflow completed with results:\n{output.Data}");
            }
        }
    }
}

Funktionsweise

  1. Fan-Out: Die ConcurrentStartExecutor Eingangsfrage empfängt und der Fan-out-Rand sendet sie gleichzeitig an die Physiker- und Chemiker-Agenten.
  2. Parallele Verarbeitung: Beide KI-Agents verarbeiten die gleiche Frage gleichzeitig, wobei jeder seine Expertenperspektive bietet.
  3. Fan-In: Die ConcurrentAggregationExecutor sammelt ChatMessage Antworten von beiden Agenten.
  4. Aggregation: Sobald beide Antworten empfangen wurden, kombiniert der Aggregator sie in einer formatierten Ausgabe.

Wichtige Konzepte

  • Fan-Out Edges: Verwenden Sie AddFanOutEdge(), um dieselbe Eingabe an mehrere Executoren oder Agenten zu verteilen.
  • Fan-In Edges: Verwenden Sie AddFanInEdge(), um Ergebnisse von mehreren Quellexekutoren zu sammeln.
  • KI-Agent-Integration: KI-Agents können direkt als Executoren in Workflows verwendet werden.
  • Executor-Basisklasse: Benutzerdefinierte Executoren erben von Executor<TInput> und überschreiben die HandleAsync-Methode.
  • Turn Tokens: Verwenden Sie TurnToken, um den Agenten zu signalisieren, mit der Verarbeitung der wartenden Nachrichten zu beginnen.
  • Streamingausführung: Verwenden Sie StreamAsync(), um Echtzeitaktualisierungen zu erhalten, während der Workflow voranschreitet.

Vollständige Implementierung

Die vollständige Implementierung dieses gleichzeitigen Workflows mit KI-Agenten finden Sie im Beispiel „Concurrent/Program.cs“ im Agent Framework-Repository.

In der Python-Implementierung erstellen Sie einen gleichzeitigen Workflow, der Daten über mehrere parallele Executoren verarbeitet und Ergebnisse verschiedener Typen aggregiert. In diesem Beispiel wird veranschaulicht, wie das Framework gemischte Ergebnistypen aus gleichzeitiger Verarbeitung behandelt.

Was Sie erstellen werden

Sie erstellen einen Workflow, der:

  • Verwendet eine Liste von Zahlen als Eingabe
  • Verteilt die Liste auf zwei parallele Executoren (ein berechneter Mittelwert, eine berechnete Summe)
  • Aggregiert die verschiedenen Ergebnistypen (Float und Int) in eine endgültige Ausgabe.
  • Veranschaulicht, wie das Framework verschiedene Ergebnistypen von gleichzeitigen Executoren behandelt

Behandelte Konzepte

Voraussetzungen

  • Python 3.10 oder höher
  • Agent Framework Core installiert: pip install agent-framework-core --pre

Schritt 1: Importieren erforderlicher Abhängigkeiten

Importieren Sie zunächst die erforderlichen Komponenten aus dem Agent Framework:

import asyncio
import random

from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never

Schritt 2: Erstellen des Dispatcher-Executors

Der Dispatcher ist für die Verteilung der anfänglichen Eingabe an mehrere parallele Executoren verantwortlich:

class Dispatcher(Executor):
    """
    The sole purpose of this executor is to dispatch the input of the workflow to
    other executors.
    """

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
        if not numbers:
            raise RuntimeError("Input must be a valid list of integers.")

        await ctx.send_message(numbers)

Schritt 3: Parallele Verarbeitungsexekutoren erstellen

Erstellen Sie zwei Executoren, die die Daten gleichzeitig verarbeiten:

class Average(Executor):
    """Calculate the average of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
        average: float = sum(numbers) / len(numbers)
        await ctx.send_message(average)


class Sum(Executor):
    """Calculate the sum of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
        total: int = sum(numbers)
        await ctx.send_message(total)

Schritt 4: Erstellen des Executor des Aggregators

Der Aggregator sammelt Ergebnisse aus den parallelen Executoren und liefert die endgültige Ausgabe:

class Aggregator(Executor):
    """Aggregate the results from the different tasks and yield the final output."""

    @handler
    async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
        """Receive the results from the source executors.

        The framework will automatically collect messages from the source executors
        and deliver them as a list.

        Args:
            results (list[int | float]): execution results from upstream executors.
                The type annotation must be a list of union types that the upstream
                executors will produce.
            ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
        """
        await ctx.yield_output(results)

Schritt 5: Erstellen des Workflows

Verbinden Sie die Executoren mithilfe von Lüfter- und Lüfter-In-Kantenmustern:

async def main() -> None:
    # 1) Create the executors
    dispatcher = Dispatcher(id="dispatcher")
    average = Average(id="average")
    summation = Sum(id="summation")
    aggregator = Aggregator(id="aggregator")

    # 2) Build a simple fan out and fan in workflow
    workflow = (
        WorkflowBuilder()
        .set_start_executor(dispatcher)
        .add_fan_out_edges(dispatcher, [average, summation])
        .add_fan_in_edges([average, summation], aggregator)
        .build()
    )

Schritt 6: Ausführen des Workflows

Führen Sie den Workflow mit Beispieldaten aus, und erfassen Sie die Ausgabe:

    # 3) Run the workflow
    output: list[int | float] | None = None
    async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
        if isinstance(event, WorkflowOutputEvent):
            output = event.data

    if output is not None:
        print(output)

if __name__ == "__main__":
    asyncio.run(main())

Funktionsweise

  1. Fan-Out: Die Dispatcher empfängt die Eingabeliste und sendet sie gleichzeitig an die Average- und Sum-Ausführer.
  2. Parallele Verarbeitung: Beide Executoren verarbeiten die gleiche Eingabe gleichzeitig und erzeugen unterschiedliche Ergebnistypen:
    • Average executor erzeugt ein float Ergebnis
    • Sum executor erzeugt ein int Ergebnis
  3. Fan-In: Die Aggregator empfängt die Ergebnisse von beiden Task-Executoren in Form einer Liste, die beide Typen enthält.
  4. Typbehandlung: Das Framework behandelt automatisch die verschiedenen Ergebnistypen mithilfe von Union-Typen (int | float)

Wichtige Konzepte

  • Fan-Out Edges: Verwenden add_fan_out_edges() , um dieselbe Eingabe an mehrere Executoren zu senden
  • Fan-In Edges: Verwenden Sie add_fan_in_edges(), um Ergebnisse aus mehreren Quellexekutoren zu sammeln.
  • Union-Typen: Behandeln verschiedener Ergebnistypen mithilfe von Typanmerkungen wie list[int | float]
  • Gleichzeitige Ausführung: Mehrere Executoren verarbeiten gleichzeitig Daten, verbessern die Leistung

Vollständige Implementierung

Die vollständige funktionierende Implementierung dieses gleichzeitigen Workflows finden Sie im aggregate_results_of_different_types.py Beispiel im Agent Framework-Repository.

Nächste Schritte