Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym samouczku pokazano, jak utworzyć współbieżny przepływ pracy za pomocą Agent Framework. Dowiesz się, jak zaimplementować wzorce fan-out i fan-in, które umożliwiają przetwarzanie równoległe, umożliwiając jednoczesne działanie wielu funkcji wykonawczych lub agentów, a następnie agregowanie ich wyników.
Co będziesz budować
Utworzysz przepływ pracy, który:
- Przyjmuje pytanie jako dane wejściowe (na przykład "Co to jest temperatura?")
- Wysyła to samo pytanie do dwóch ekspertów agentów sztucznej inteligencji jednocześnie (Fizyk i Chemik)
- Zbiera i łączy odpowiedzi z obu agentów w pojedyncze dane wyjściowe
- Demonstruje współbieżne wykonywanie z agentami AI przy użyciu wzorców rozdzielania-łączenia
Omówione pojęcia
Wymagania wstępne
- Pakiet .NET 8.0 SDK lub nowszy
- Skonfigurowany punkt końcowy usługi Azure OpenAI i wdrożenie
- Zainstalowany i uwierzytelniony interfejs wiersza polecenia platformy Azure(na potrzeby uwierzytelniania poświadczeń platformy Azure)
- Nowa aplikacja konsolowa
Krok 1. Instalowanie pakietów NuGet
Najpierw zainstaluj wymagane pakiety dla projektu .NET:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
Krok 2. Konfigurowanie zależności i interfejsu Azure OpenAI
Zacznij od skonfigurowania projektu przy użyciu wymaganych pakietów NuGet i klienta usługi Azure OpenAI:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
Krok 3: Tworzenie ekspertów agentów AI
Utwórz dwóch wyspecjalizowanych agentów sztucznej inteligencji, którzy zapewnią perspektywy ekspertów:
// Create the AI agents with specialized expertise
ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
Krok 4: Utwórz Startowy Egzekutor
Utwórz funkcję wykonawcczą, która inicjuje przetwarzanie współbieżne, wysyłając dane wejściowe do wielu agentów:
var startExecutor = new ConcurrentStartExecutor();
Implementacja ConcurrentStartExecutor :
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
}
}
Krok 5. Tworzenie modułu wykonawczego agregacji
Utwórz funkcję wykonawcza, która zbiera i łączy odpowiedzi z wielu agentów:
var aggregationExecutor = new ConcurrentAggregationExecutor();
Implementacja ConcurrentAggregationExecutor :
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];
/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);
if (this._messages.Count == 2)
{
var formattedMessages = string.Join(Environment.NewLine,
this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}
}
Krok 6. Kompilowanie przepływu pracy
Połącz procesy wykonawcze z agentami przy użyciu wzorców rozgałęzienia i zbiegania.
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();
Krok 7. Wykonywanie przepływu pracy
Uruchom przepływ pracy i przechwyć dane wyjściowe przesyłania strumieniowego:
// Execute the workflow in streaming mode
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
}
}
Jak to działa
-
Fan-Out: Odbiera
ConcurrentStartExecutorpytanie wejściowe, a krawędź wentylatora wysyła go zarówno do agentów fizyka, jak i chemika jednocześnie. - Przetwarzanie równoległe: obaj agenci sztucznej inteligencji przetwarzają to samo pytanie jednocześnie, z których każdy zapewnia swoją perspektywę eksperta.
-
Fan-In: Funkcja
ConcurrentAggregationExecutorzbieraChatMessageodpowiedzi od obu agentów. - Agregacja: Po odebraniu obu odpowiedzi agregator łączy je w sformatowane dane wyjściowe.
Kluczowe pojęcia
-
Fan-Out Edges: użyj
AddFanOutEdge()do dystrybuowania tych samych danych wejściowych do wielu wykonawców lub agentów. -
Fan-In Edges: służy
AddFanInEdge()do zbierania wyników z wielu funkcji wykonawczych źródła. - Integracja agenta sztucznej inteligencji: agenci sztucznej inteligencji mogą służyć bezpośrednio jako funkcje wykonawcze w przepływach pracy.
-
Klasa bazowa wykonawcy: Niestandardowi wykonawcy dziedziczą
Executor<TInput>i zastępują metodęHandleAsync. -
Włącz tokeny: użyj polecenia
TurnToken, aby zasygnalizować agentów, aby rozpocząć przetwarzanie komunikatów w kolejce. -
Wykonywanie przesyłania strumieniowego: Użyj
StreamAsync()do pobierania aktualizacji w czasie rzeczywistym w miarę postępu przepływu pracy.
Kompletna implementacja
Aby uzyskać pełną działającą implementację tego współbieżnego przepływu pracy z agentami sztucznej inteligencji, zobacz przykład Concurrent/Program.cs w repozytorium Platformy agentów.
W implementacji języka Python utworzysz współbieżny przepływ pracy, który przetwarza dane za pomocą wielu równoległych funkcji wykonawczych i agreguje wyniki różnych typów. W tym przykładzie pokazano, jak platforma obsługuje mieszane typy wyników z przetwarzania współbieżnego.
Co będziesz budować
Utworzysz przepływ pracy, który:
- Przyjmuje listę liczb jako dane wejściowe
- Dystrybuuje listę do dwóch równoległych modułów wykonawczych (jeden obliczający średnią, drugi obliczający sumę)
- Agreguje różne typy wyników (zmiennoprzecinkowe i int) do końcowych danych wyjściowych
- Pokazuje, jak struktura obsługuje różne typy wyników od współbieżnych funkcji wykonawczych
Omówione pojęcia
Wymagania wstępne
- Środowisko Python w wersji 3.10 lub nowszej
- Zainstalowano program Agent Framework Core:
pip install agent-framework-core --pre
Krok 1. Importowanie wymaganych zależności
Zacznij od zaimportowania niezbędnych składników z programu Agent Framework:
import asyncio
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never
Krok 2: Utwórz Dispatcher Executor
Dyspozytor jest odpowiedzialny za dystrybucję początkowych danych wejściowych do wielu równoległych funkcji wykonawczych:
class Dispatcher(Executor):
"""
The sole purpose of this executor is to dispatch the input of the workflow to
other executors.
"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
if not numbers:
raise RuntimeError("Input must be a valid list of integers.")
await ctx.send_message(numbers)
Krok 3. Tworzenie funkcji wykonawczych przetwarzania równoległego
Utwórz dwie funkcje wykonawcze, które będą przetwarzać dane jednocześnie:
class Average(Executor):
"""Calculate the average of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
average: float = sum(numbers) / len(numbers)
await ctx.send_message(average)
class Sum(Executor):
"""Calculate the sum of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
total: int = sum(numbers)
await ctx.send_message(total)
Krok 4: Utwórz wykonawcę agregatora
Agregator zbiera wyniki z równoległych funkcji wykonawczych i zwraca końcowe dane wyjściowe:
class Aggregator(Executor):
"""Aggregate the results from the different tasks and yield the final output."""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""Receive the results from the source executors.
The framework will automatically collect messages from the source executors
and deliver them as a list.
Args:
results (list[int | float]): execution results from upstream executors.
The type annotation must be a list of union types that the upstream
executors will produce.
ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
"""
await ctx.yield_output(results)
Krok 5. Tworzenie przepływu pracy
Połącz egzekutory używając wzorców fan-out i fan-in:
async def main() -> None:
# 1) Create the executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
# 2) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.build()
)
Krok 6. Uruchamianie przepływu pracy
Wykonaj przepływ pracy z przykładowymi danymi i przechwyć dane wyjściowe:
# 3) Run the workflow
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output is not None:
print(output)
if __name__ == "__main__":
asyncio.run(main())
Jak to działa
-
Fan-Out: Obiekt
Dispatcherodbiera listę danych wejściowych i wysyła ją jednocześnie do funkcji wykonawczejAverageiSum -
Przetwarzanie równoległe: obie funkcje wykonawcze przetwarzają te same dane wejściowe jednocześnie, generując różne typy wyników:
-
Averagefunkcja wykonawcza generujefloatwynik -
Sumfunkcja wykonawcza generujeintwynik
-
-
Fan-In:
Aggregatorfunkcja odbiera wyniki z obu funkcji wykonawczych jako listę zawierającą oba typy -
Obsługa typów: platforma automatycznie obsługuje różne typy wyników przy użyciu typów unii (
int | float)
Kluczowe pojęcia
-
Fan-Out Edges: użyj
add_fan_out_edges()aby wysłać te same dane wejściowe do wielu egzekutorów. -
Fan-In Edges: służy
add_fan_in_edges()do zbierania wyników z wielu funkcji wykonawczych źródła -
Typy unii: zarządzanie różnymi typami rezultatów przy użyciu adnotacji typu, takich jak
list[int | float] - Współbieżne wykonywanie: wiele funkcji wykonawczych przetwarza dane jednocześnie, zwiększając wydajność
Kompletna implementacja
Aby uzyskać pełną działającą implementację tego współbieżnego przepływu pracy, zobacz przykład aggregate_results_of_different_types.py w repozytorium Platformy agentów.