Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Lernprogramm wird veranschaulicht, wie Anforderungen und Antworten in Workflows mithilfe von Agent-Framework-Workflows behandelt werden. Sie erfahren, wie Sie interaktive Workflows erstellen, die die Ausführung anhalten können, um Eingaben aus externen Quellen (z. B. Menschen oder anderen Systemen) anzufordern und dann nach der Bereitstellung einer Antwort fortzusetzen.
Behandelte Konzepte
In .NET nutzen Mensch-in-der-Schleife-Workflows die Bearbeitung externer Anfragen, um die Ausführung anzuhalten und Benutzereingaben zu sammeln. Dieses Muster ermöglicht interaktive Workflows, bei denen das System Während der Ausführung Informationen aus externen Quellen anfordern kann.
Voraussetzungen
- .NET 8.0 SDK oder höher.
- Azure OpenAI-Dienstendpunkt und -Bereitstellung konfiguriert.
- Azure CLI installiert und authentifiziert (für die Azure-Anmeldeinformationsauthentifizierung).
- Grundlegendes Verständnis der C#- und asynchronen Programmierung.
- Eine neue Konsolenanwendung.
Installieren von NuGet-Paketen
Installieren Sie zunächst die erforderlichen Pakete für Ihr .NET-Projekt:
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
Schlüsselkomponenten
RequestPort und externe Anforderungen
A RequestPort fungiert als Brücke zwischen dem Workflow und externen Eingabequellen. Wenn der Arbeitsablauf eine Input-Anforderung benötigt, generiert er ein RequestInfoEvent, das Ihre Anwendung verarbeitet.
// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
Signaltypen
Definieren Sie Signaltypen für die Kommunikation verschiedener Anforderungstypen:
/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
Init, // Initial guess request
Above, // Previous guess was too high
Below, // Previous guess was too low
}
Workflowausführer
Erstellen Sie Executoren, die Benutzereingaben verarbeiten und Feedback geben:
/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
_targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
{
_tries++;
if (message == _targetNumber)
{
await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
.ConfigureAwait(false);
}
else if (message < _targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
}
}
}
Erstellen des Workflows
Verbinden sie den RequestPort und den Executor in einer Feedbackschleife:
internal static class WorkflowHelper
{
internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
{
// Create the executors
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
JudgeExecutor judgeExecutor = new(42);
// Build the workflow by connecting executors in a loop
return new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.BuildAsync<NumberSignal>();
}
}
Ausführen des interaktiven Workflows
Behandeln externer Anforderungen während der Workflowausführung:
private static async Task Main()
{
// Create the workflow
var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);
// Execute the workflow
await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle human input request from the workflow
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
}
Anforderungsverarbeitung
Verarbeiten verschiedener Arten von Eingabeanforderungen:
private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
switch (request.DataAs<NumberSignal?>())
{
case NumberSignal.Init:
int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
return request.CreateResponse(initialGuess);
case NumberSignal.Above:
int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
return request.CreateResponse(lowerGuess);
case NumberSignal.Below:
int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
return request.CreateResponse(higherGuess);
default:
throw new ArgumentException("Unexpected request type.");
}
}
private static int ReadIntegerFromConsole(string prompt)
{
while (true)
{
Console.Write(prompt);
string? input = Console.ReadLine();
if (int.TryParse(input, out int value))
{
return value;
}
Console.WriteLine("Invalid input. Please enter a valid integer.");
}
}
Implementierungskonzepte
RequestInfoEvent-Ereignisablauf
- Workflowausführung: Der Workflow verarbeitet, bis externe Eingaben erforderlich sind.
-
Anforderungsgenerierung: RequestPort generiert eine
RequestInfoEventmit den Anforderungsdetails - Externe Behandlung: Ihre Anwendung fängt das Ereignis ab und sammelt Benutzereingaben.
-
Antwortübermittlung: Inhalte zurücksenden
ExternalResponse, um den Workflow fortzusetzen - Workflow-Wiederaufnahme: Der Workflow wird weiterhin mit der bereitgestellten Eingabe verarbeitet.
Workflow-Lebenszyklus
-
Streamingausführung: Verwenden Sie
StreamAsync, um Ereignisse in Echtzeit zu überwachen -
Ereignisbehandlung: Prozess
RequestInfoEventfür Eingabeanforderungen undWorkflowOutputEventabschluss - Antwortkoordination: Abgleichen von Antworten auf Anforderungen mithilfe des Antwortbehandlungsmechanismus des Workflows
Implementierungsfluss
Workflowinitialisierung: Der Workflow beginnt, indem er einen
NumberSignal.Initan den RequestPort sendet.Anforderungsgenerierung: Der RequestPort generiert eine Anforderung eines
RequestInfoEventanfänglichen Schätzwerts vom Benutzer.Workflow pause: Der Workflow hält an und wartet auf externe Eingaben, während die Anwendung die Anforderung verarbeitet.
Menschliche Reaktion: Die externe Anwendung sammelt Benutzereingaben und sendet einen
ExternalResponseZurück an den Workflow.Verarbeitung und Feedback: Der
JudgeExecutorProzess verarbeitet die Vermutung und schließt entweder den Workflow ab oder sendet ein neues Signal (Oben/Unten), um eine weitere Vermutung anzufordern.Fortsetzung der Schleife: Der Prozess wird wiederholt, bis die richtige Zahl erraten wird.
Framework-Vorteile
- Typsicherheit: Durch starke Typisierung wird sichergestellt, dass Anforderungsantwortverträge beibehalten werden.
- Ereignisgesteuert: Rich-Ereignissystem bietet Einblicke in die Workflowausführung
- Pausierbare Ausführung: Workflows können unbegrenzt angehalten werden, während sie auf externe Eingaben warten.
- Zustandsverwaltung: Der Workflowstatus wird über Pausen-Fortsetzungszyklen beibehalten.
- Flexible Integration: RequestPorts können in jede externe Eingabequelle (UI, API, Konsole usw.) integriert werden.
Vollständiges Beispiel
Die vollständige Arbeitsimplementierung finden Sie im Beispiel "Human-in-the-Loop Basic".
Dieses Muster ermöglicht das Erstellen anspruchsvoller interaktiver Anwendungen, in denen Benutzer Eingaben an wichtigen Entscheidungspunkten innerhalb automatisierter Workflows bereitstellen können.
Was Sie erstellen werden
Sie erstellen einen interaktiven Spielablauf zum Erraten von Nummern, der Anforderungsantwortmuster veranschaulicht:
- Ein KI-Agent, der intelligente Erraten macht
- Executors, die Anforderungen direkt über die
request_infoAPI senden können - Ein Turn-Manager, der die Koordination zwischen dem Agenten und menschlichen Interaktionen übernimmt.
@response_handler - Interaktive Konsoleneingabe/Ausgabe für Echtzeitfeedback
Voraussetzungen
- Python 3.10 oder höher
- Azure OpenAI-Bereitstellung konfiguriert
- Azure CLI-Authentifizierung konfiguriert (
az login) - Grundlegendes Verständnis der asynchronen Python-Programmierung
Wichtige Konzepte
Funktionen für Anforderungen und Antworten
Executors verfügen über integrierte Funktionen für Anforderungen und Antworten, die Mensch-in-der-Schleife-Interaktionen ermöglichen.
- Anruf
ctx.request_info(request_data=request_data, response_type=response_type)zum Senden von Anforderungen - Verwenden Sie den
@response_handler-Dekorator, um Antworten zu behandeln - Definieren von benutzerdefinierten Anforderungs-/Antworttypen ohne Vererbungsanforderungen
Request-Response Ablauf
Executors können Anforderungen direkt mithilfe von ctx.request_info() senden und Antworten mithilfe des @response_handler Dekorators verarbeiten.
- Executor-Aufrufe
ctx.request_info(request_data=request_data, response_type=response_type) - Workflow gibt eine
RequestInfoEventmit den Anforderungsdaten aus. - Externes System (Mensch, API usw.) verarbeitet die Anforderung
- Die Antwort wird über
send_responses_streaming() - Workflow wird fortgesetzt und liefert die Antwort auf die Methode des
@response_handlerExecutors.
Einrichten der Umgebung
Installieren Sie zunächst die erforderlichen Pakete:
pip install agent-framework-core --pre
pip install azure-identity
Definieren von Anforderungs- und Antwortmodellen
Definieren Sie zunächst die Datenstrukturen für die Anforderungsantwortkommunikation:
import asyncio
from dataclasses import dataclass
from pydantic import BaseModel
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatMessage,
Executor,
RequestInfoEvent,
Role,
WorkflowBuilder,
WorkflowContext,
WorkflowOutputEvent,
WorkflowRunState,
WorkflowStatusEvent,
handler,
response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
@dataclass
class HumanFeedbackRequest:
"""Request message for human feedback in the guessing game."""
prompt: str = ""
guess: int | None = None
class GuessOutput(BaseModel):
"""Structured output from the AI agent with response_format enforcement."""
guess: int
Dies HumanFeedbackRequest ist eine einfache Datenklasse für strukturierte Anforderungsnutzlasten:
- Starke Typisierung für Anforderungs-Payloads
- Zukünftskompatible Validierung
- Klare Korrelationsemantik mit Antworten
- Kontextfelder (wie der vorherige Schätzwert) für ansprechende Benutzeroberflächen-Eingabeaufforderungen
Erstellen des Turn-Managers
Der Turn-Manager koordiniert den Fluss zwischen dem KI-Agenten und dem Menschen:
class TurnManager(Executor):
"""Coordinates turns between the AI agent and human player.
Responsibilities:
- Start the game by requesting the agent's first guess
- Process agent responses and request human feedback
- Handle human feedback and continue the game or finish
"""
def __init__(self, id: str | None = None):
super().__init__(id=id or "turn_manager")
@handler
async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Start the game by asking the agent for an initial guess."""
user = ChatMessage(Role.USER, text="Start by making your first guess.")
await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))
@handler
async def on_agent_response(
self,
result: AgentExecutorResponse,
ctx: WorkflowContext,
) -> None:
"""Handle the agent's guess and request human guidance."""
# Parse structured model output (defensive default if agent didn't reply)
text = result.agent_run_response.text or ""
last_guess = GuessOutput.model_validate_json(text).guess if text else None
# Craft a clear human prompt that defines higher/lower relative to agent's guess
prompt = (
f"The agent guessed: {last_guess if last_guess is not None else text}. "
"Type one of: higher (your number is higher than this guess), "
"lower (your number is lower than this guess), correct, or exit."
)
# Send a request using the request_info API
await ctx.request_info(
request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
response_type=str
)
@response_handler
async def on_human_feedback(
self,
original_request: HumanFeedbackRequest,
feedback: str,
ctx: WorkflowContext[AgentExecutorRequest, str],
) -> None:
"""Continue the game or finish based on human feedback."""
reply = feedback.strip().lower()
# Use the correlated request's guess to avoid extra state reads
last_guess = original_request.guess
if reply == "correct":
await ctx.yield_output(f"Guessed correctly: {last_guess}")
return
# Provide feedback to the agent for the next guess
user_msg = ChatMessage(
Role.USER,
text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
)
await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))
Erstellen des Workflows
Erstellen Sie den Hauptworkflow, der alle Komponenten verbindet:
async def main() -> None:
# Create the chat agent with structured output enforcement
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
agent = chat_client.create_agent(
instructions=(
"You guess a number between 1 and 10. "
"If the user says 'higher' or 'lower', adjust your next guess. "
'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
"No explanations or additional text."
),
response_format=GuessOutput,
)
# Create workflow components
turn_manager = TurnManager(id="turn_manager")
agent_exec = AgentExecutor(agent=agent, id="agent")
# Build the workflow graph
workflow = (
WorkflowBuilder()
.set_start_executor(turn_manager)
.add_edge(turn_manager, agent_exec) # Ask agent to make/adjust a guess
.add_edge(agent_exec, turn_manager) # Agent's response goes back to coordinator
.build()
)
# Execute the interactive workflow
await run_interactive_workflow(workflow)
async def run_interactive_workflow(workflow):
"""Run the workflow with human-in-the-loop interaction."""
pending_responses: dict[str, str] | None = None
completed = False
workflow_output: str | None = None
print("🎯 Number Guessing Game")
print("Think of a number between 1 and 10, and I'll try to guess it!")
print("-" * 50)
while not completed:
# First iteration uses run_stream("start")
# Subsequent iterations use send_responses_streaming with pending responses
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("start")
)
# Collect events for this turn
events = [event async for event in stream]
pending_responses = None
# Process events to collect requests and detect completion
requests: list[tuple[str, str]] = [] # (request_id, prompt)
for event in events:
if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
# RequestInfoEvent for our HumanFeedbackRequest
requests.append((event.request_id, event.data.prompt))
elif isinstance(event, WorkflowOutputEvent):
# Capture workflow output when yielded
workflow_output = str(event.data)
completed = True
# Check workflow status
pending_status = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
for e in events
)
idle_with_requests = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
for e in events
)
if pending_status:
print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
if idle_with_requests:
print("⏸️ State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")
# Handle human requests if any
if requests and not completed:
responses: dict[str, str] = {}
for req_id, prompt in requests:
print(f"\n🤖 {prompt}")
answer = input("👤 Enter higher/lower/correct/exit: ").lower()
if answer == "exit":
print("👋 Exiting...")
return
responses[req_id] = answer
pending_responses = responses
# Show final result
print(f"\n🎉 {workflow_output}")
Ausführen des Beispiels
Die vollständige Arbeitsimplementierung finden Sie im Beispiel "Human-in-the-Loop Guessing Game".
Funktionsweise
Workflowinitialisierung: Der Workflow beginnt mit dem
TurnManagerAnfordern eines anfänglichen Schätzwerts vom KI-Agent.Agentenantwort: Der KI-Agent macht eine Vermutung und gibt eine strukturierte JSON zurück, die zum
TurnManagerzurückfließt.Menschliche Anforderung: Das
TurnManagerbearbeitet die Vermutung des Agenten und ruftctx.request_info()mit einemHumanFeedbackRequestauf.Workflow-Pause: Der Workflow gibt eine
RequestInfoEventund wird fortgesetzt, bis keine weiteren Aktionen ausgeführt werden können, und wartet dann auf menschliche Eingaben.Menschliche Reaktion: Die externe Anwendung sammelt menschliche Eingaben und sendet Antworten mithilfe von
send_responses_streaming().Fortsetzen: Der Workflow wird fortgesetzt, die Methode von
TurnManager@response_handlerverarbeitet das Feedback und beendet entweder das Spiel oder sendet eine weitere Anfrage an den Agent.
Wichtige Vorteile
- Strukturierte Kommunikation: Typsichere Anforderungs- und Antwortmodelle verhindern Laufzeitfehler
- Korrelation: Anforderungs-IDs stellen sicher, dass Antworten den richtigen Anforderungen entsprechen
- Pausierbare Ausführung: Workflows können unbegrenzt angehalten werden, während sie auf externe Eingaben warten.
- Zustandserhaltung: Workflowstatus wird über Pausen-Fortsetzungszyklen hinweg beibehalten.
- Ereignisgesteuert: Rich-Ereignissystem bietet Einblicke in Workflowstatus und Übergänge
Dieses Muster ermöglicht das Erstellen anspruchsvoller interaktiver Anwendungen, bei denen KI-Agents und Menschen nahtlos in strukturierten Workflows zusammenarbeiten.