Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Lernprogramm wird das Erstellen eines gleichzeitigen Workflows mithilfe von Agent Framework veranschaulicht. Sie lernen, Fanout- und Fan-In-Muster zu implementieren, die die parallele Verarbeitung ermöglichen, sodass mehrere Executoren oder Agents gleichzeitig arbeiten und dann ihre Ergebnisse aggregieren können.
Was Sie erstellen werden
Sie erstellen einen Workflow, der:
- Nimmt eine Frage als Eingabe an (z. B. "Was ist Temperatur?")
- Sendet die gleiche Frage gleichzeitig an zwei experten KI-Agenten (Physiker und Chemiker)
- Sammelt und kombiniert Antworten von beiden Agents in einer einzigen Ausgabe.
- Veranschaulicht die gleichzeitige Ausführung mit KI-Agents mithilfe von Fanout-/Fan-In-Mustern
Behandelte Konzepte
Voraussetzungen
- .NET 8.0 SDK oder höher
- Azure OpenAI-Dienstendpunkt und -Bereitstellung konfiguriert
- Installierte und authentifizierte Azure CLI (für die Azure-Anmeldeinformationsauthentifizierung)
- Eine neue Konsolenanwendung
Schritt 1: Installieren von NuGet-Paketen
Installieren Sie zunächst die erforderlichen Pakete für Ihr .NET-Projekt:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
Schritt 2: Einrichten von Abhängigkeiten und Azure OpenAI
Richten Sie zunächst Ihr Projekt mit den erforderlichen NuGet-Paketen und dem Azure OpenAI-Client ein:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
Schritt 3: Erstellung von Experten-KI-Agenten
Erstellen Sie zwei spezialisierte KI-Agents, die Expertenperspektiven bieten:
// Create the AI agents with specialized expertise
ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
Schritt 4: Erstellen des Start-Executors
Erstellen Sie einen Executor, der die gleichzeitige Verarbeitung initiiert, indem Sie Eingaben an mehrere Agents senden:
var startExecutor = new ConcurrentStartExecutor();
Die ConcurrentStartExecutor Implementierung:
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
}
}
Schritt 5: Erstellen des Aggregation Executors
Erstellen Sie einen Executor, der Antworten von mehreren Agents sammelt und kombiniert:
var aggregationExecutor = new ConcurrentAggregationExecutor();
Die ConcurrentAggregationExecutor Implementierung:
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];
/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);
if (this._messages.Count == 2)
{
var formattedMessages = string.Join(Environment.NewLine,
this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}
}
Schritt 6: Erstellen des Workflows
Verbinden Sie die Ausführenden und die Agenten mithilfe von Fan-Out- und Fan-In-Mustern.
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();
Schritt 7: Ausführen des Workflows
Führen Sie den Workflow aus, und erfassen Sie die Streamingausgabe:
// Execute the workflow in streaming mode
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
}
}
Funktionsweise
-
Fan-Out: Die
ConcurrentStartExecutorEingangsfrage empfängt und der Fan-out-Rand sendet sie gleichzeitig an die Physiker- und Chemiker-Agenten. - Parallele Verarbeitung: Beide KI-Agents verarbeiten die gleiche Frage gleichzeitig, wobei jeder seine Expertenperspektive bietet.
-
Fan-In: Die
ConcurrentAggregationExecutorsammeltChatMessageAntworten von beiden Agenten. - Aggregation: Sobald beide Antworten empfangen wurden, kombiniert der Aggregator sie in einer formatierten Ausgabe.
Wichtige Konzepte
-
Fan-Out Edges: Verwenden Sie
AddFanOutEdge(), um dieselbe Eingabe an mehrere Executoren oder Agenten zu verteilen. -
Fan-In Edges: Verwenden Sie
AddFanInEdge(), um Ergebnisse von mehreren Quellexekutoren zu sammeln. - KI-Agent-Integration: KI-Agents können direkt als Executoren in Workflows verwendet werden.
-
Executor-Basisklasse: Benutzerdefinierte Executoren erben von
Executor<TInput>und überschreiben dieHandleAsync-Methode. -
Turn Tokens: Verwenden Sie
TurnToken, um den Agenten zu signalisieren, mit der Verarbeitung der wartenden Nachrichten zu beginnen. -
Streamingausführung: Verwenden Sie
StreamAsync(), um Echtzeitaktualisierungen zu erhalten, während der Workflow voranschreitet.
Vollständige Implementierung
Die vollständige Implementierung dieses gleichzeitigen Workflows mit KI-Agenten finden Sie im Beispiel „Concurrent/Program.cs“ im Agent Framework-Repository.
In der Python-Implementierung erstellen Sie einen gleichzeitigen Workflow, der Daten über mehrere parallele Executoren verarbeitet und Ergebnisse verschiedener Typen aggregiert. In diesem Beispiel wird veranschaulicht, wie das Framework gemischte Ergebnistypen aus gleichzeitiger Verarbeitung behandelt.
Was Sie erstellen werden
Sie erstellen einen Workflow, der:
- Verwendet eine Liste von Zahlen als Eingabe
- Verteilt die Liste auf zwei parallele Executoren (ein berechneter Mittelwert, eine berechnete Summe)
- Aggregiert die verschiedenen Ergebnistypen (Float und Int) in eine endgültige Ausgabe.
- Veranschaulicht, wie das Framework verschiedene Ergebnistypen von gleichzeitigen Executoren behandelt
Behandelte Konzepte
Voraussetzungen
- Python 3.10 oder höher
- Agent Framework Core installiert:
pip install agent-framework-core --pre
Schritt 1: Importieren erforderlicher Abhängigkeiten
Importieren Sie zunächst die erforderlichen Komponenten aus dem Agent Framework:
import asyncio
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never
Schritt 2: Erstellen des Dispatcher-Executors
Der Dispatcher ist für die Verteilung der anfänglichen Eingabe an mehrere parallele Executoren verantwortlich:
class Dispatcher(Executor):
"""
The sole purpose of this executor is to dispatch the input of the workflow to
other executors.
"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
if not numbers:
raise RuntimeError("Input must be a valid list of integers.")
await ctx.send_message(numbers)
Schritt 3: Parallele Verarbeitungsexekutoren erstellen
Erstellen Sie zwei Executoren, die die Daten gleichzeitig verarbeiten:
class Average(Executor):
"""Calculate the average of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
average: float = sum(numbers) / len(numbers)
await ctx.send_message(average)
class Sum(Executor):
"""Calculate the sum of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
total: int = sum(numbers)
await ctx.send_message(total)
Schritt 4: Erstellen des Executor des Aggregators
Der Aggregator sammelt Ergebnisse aus den parallelen Executoren und liefert die endgültige Ausgabe:
class Aggregator(Executor):
"""Aggregate the results from the different tasks and yield the final output."""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""Receive the results from the source executors.
The framework will automatically collect messages from the source executors
and deliver them as a list.
Args:
results (list[int | float]): execution results from upstream executors.
The type annotation must be a list of union types that the upstream
executors will produce.
ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
"""
await ctx.yield_output(results)
Schritt 5: Erstellen des Workflows
Verbinden Sie die Executoren mithilfe von Lüfter- und Lüfter-In-Kantenmustern:
async def main() -> None:
# 1) Create the executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
# 2) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.build()
)
Schritt 6: Ausführen des Workflows
Führen Sie den Workflow mit Beispieldaten aus, und erfassen Sie die Ausgabe:
# 3) Run the workflow
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output is not None:
print(output)
if __name__ == "__main__":
asyncio.run(main())
Funktionsweise
-
Fan-Out: Die
Dispatcherempfängt die Eingabeliste und sendet sie gleichzeitig an dieAverage- undSum-Ausführer. -
Parallele Verarbeitung: Beide Executoren verarbeiten die gleiche Eingabe gleichzeitig und erzeugen unterschiedliche Ergebnistypen:
-
Averageexecutor erzeugt einfloatErgebnis -
Sumexecutor erzeugt einintErgebnis
-
-
Fan-In: Die
Aggregatorempfängt die Ergebnisse von beiden Task-Executoren in Form einer Liste, die beide Typen enthält. -
Typbehandlung: Das Framework behandelt automatisch die verschiedenen Ergebnistypen mithilfe von Union-Typen (
int | float)
Wichtige Konzepte
-
Fan-Out Edges: Verwenden
add_fan_out_edges(), um dieselbe Eingabe an mehrere Executoren zu senden -
Fan-In Edges: Verwenden Sie
add_fan_in_edges(), um Ergebnisse aus mehreren Quellexekutoren zu sammeln. -
Union-Typen: Behandeln verschiedener Ergebnistypen mithilfe von Typanmerkungen wie
list[int | float] - Gleichzeitige Ausführung: Mehrere Executoren verarbeiten gleichzeitig Daten, verbessern die Leistung
Vollständige Implementierung
Die vollständige funktionierende Implementierung dieses gleichzeitigen Workflows finden Sie im aggregate_results_of_different_types.py Beispiel im Agent Framework-Repository.