Freigeben über


Verarbeiten von Anforderungen und Antworten in Workflows

In diesem Lernprogramm wird veranschaulicht, wie Anforderungen und Antworten in Workflows mithilfe von Agent-Framework-Workflows behandelt werden. Sie erfahren, wie Sie interaktive Workflows erstellen, die die Ausführung anhalten können, um Eingaben aus externen Quellen (z. B. Menschen oder anderen Systemen) anzufordern und dann nach der Bereitstellung einer Antwort fortzusetzen.

Behandelte Konzepte

In .NET nutzen Mensch-in-der-Schleife-Workflows die Bearbeitung externer Anfragen, um die Ausführung anzuhalten und Benutzereingaben zu sammeln. Dieses Muster ermöglicht interaktive Workflows, bei denen das System Während der Ausführung Informationen aus externen Quellen anfordern kann.

Voraussetzungen

Installieren von NuGet-Paketen

Installieren Sie zunächst die erforderlichen Pakete für Ihr .NET-Projekt:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Schlüsselkomponenten

RequestPort und externe Anforderungen

A RequestPort fungiert als Brücke zwischen dem Workflow und externen Eingabequellen. Wenn der Arbeitsablauf eine Input-Anforderung benötigt, generiert er ein RequestInfoEvent, das Ihre Anwendung verarbeitet.

// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Signaltypen

Definieren Sie Signaltypen für die Kommunikation verschiedener Anforderungstypen:

/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
    Init,     // Initial guess request
    Above,    // Previous guess was too high
    Below,    // Previous guess was too low
}

Workflowausführer

Erstellen Sie Executoren, die Benutzereingaben verarbeiten und Feedback geben:

/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        _targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
    {
        _tries++;
        if (message == _targetNumber)
        {
            await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
                         .ConfigureAwait(false);
        }
        else if (message < _targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
        }
    }
}

Erstellen des Workflows

Verbinden sie den RequestPort und den Executor in einer Feedbackschleife:

internal static class WorkflowHelper
{
    internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
    {
        // Create the executors
        RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
        JudgeExecutor judgeExecutor = new(42);

        // Build the workflow by connecting executors in a loop
        return new WorkflowBuilder(numberRequestPort)
            .AddEdge(numberRequestPort, judgeExecutor)
            .AddEdge(judgeExecutor, numberRequestPort)
            .WithOutputFrom(judgeExecutor)
            .BuildAsync<NumberSignal>();
    }
}

Ausführen des interaktiven Workflows

Behandeln externer Anforderungen während der Workflowausführung:

private static async Task Main()
{
    // Create the workflow
    var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);

    // Execute the workflow
    await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
    await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
    {
        switch (evt)
        {
            case RequestInfoEvent requestInputEvt:
                // Handle human input request from the workflow
                ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
                await handle.SendResponseAsync(response).ConfigureAwait(false);
                break;

            case WorkflowOutputEvent outputEvt:
                // The workflow has yielded output
                Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
                return;
        }
    }
}

Anforderungsverarbeitung

Verarbeiten verschiedener Arten von Eingabeanforderungen:

private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
    switch (request.DataAs<NumberSignal?>())
    {
        case NumberSignal.Init:
            int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
            return request.CreateResponse(initialGuess);
        case NumberSignal.Above:
            int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
            return request.CreateResponse(lowerGuess);
        case NumberSignal.Below:
            int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
            return request.CreateResponse(higherGuess);
        default:
            throw new ArgumentException("Unexpected request type.");
    }
}

private static int ReadIntegerFromConsole(string prompt)
{
    while (true)
    {
        Console.Write(prompt);
        string? input = Console.ReadLine();
        if (int.TryParse(input, out int value))
        {
            return value;
        }
        Console.WriteLine("Invalid input. Please enter a valid integer.");
    }
}

Implementierungskonzepte

RequestInfoEvent-Ereignisablauf

  1. Workflowausführung: Der Workflow verarbeitet, bis externe Eingaben erforderlich sind.
  2. Anforderungsgenerierung: RequestPort generiert eine RequestInfoEvent mit den Anforderungsdetails
  3. Externe Behandlung: Ihre Anwendung fängt das Ereignis ab und sammelt Benutzereingaben.
  4. Antwortübermittlung: Inhalte zurücksenden ExternalResponse, um den Workflow fortzusetzen
  5. Workflow-Wiederaufnahme: Der Workflow wird weiterhin mit der bereitgestellten Eingabe verarbeitet.

Workflow-Lebenszyklus

  • Streamingausführung: Verwenden Sie StreamAsync, um Ereignisse in Echtzeit zu überwachen
  • Ereignisbehandlung: Prozess RequestInfoEvent für Eingabeanforderungen und WorkflowOutputEvent abschluss
  • Antwortkoordination: Abgleichen von Antworten auf Anforderungen mithilfe des Antwortbehandlungsmechanismus des Workflows

Implementierungsfluss

  1. Workflowinitialisierung: Der Workflow beginnt, indem er einen NumberSignal.Init an den RequestPort sendet.

  2. Anforderungsgenerierung: Der RequestPort generiert eine Anforderung eines RequestInfoEvent anfänglichen Schätzwerts vom Benutzer.

  3. Workflow pause: Der Workflow hält an und wartet auf externe Eingaben, während die Anwendung die Anforderung verarbeitet.

  4. Menschliche Reaktion: Die externe Anwendung sammelt Benutzereingaben und sendet einen ExternalResponse Zurück an den Workflow.

  5. Verarbeitung und Feedback: Der JudgeExecutor Prozess verarbeitet die Vermutung und schließt entweder den Workflow ab oder sendet ein neues Signal (Oben/Unten), um eine weitere Vermutung anzufordern.

  6. Fortsetzung der Schleife: Der Prozess wird wiederholt, bis die richtige Zahl erraten wird.

Framework-Vorteile

  • Typsicherheit: Durch starke Typisierung wird sichergestellt, dass Anforderungsantwortverträge beibehalten werden.
  • Ereignisgesteuert: Rich-Ereignissystem bietet Einblicke in die Workflowausführung
  • Pausierbare Ausführung: Workflows können unbegrenzt angehalten werden, während sie auf externe Eingaben warten.
  • Zustandsverwaltung: Der Workflowstatus wird über Pausen-Fortsetzungszyklen beibehalten.
  • Flexible Integration: RequestPorts können in jede externe Eingabequelle (UI, API, Konsole usw.) integriert werden.

Vollständiges Beispiel

Die vollständige Arbeitsimplementierung finden Sie im Beispiel "Human-in-the-Loop Basic".

Dieses Muster ermöglicht das Erstellen anspruchsvoller interaktiver Anwendungen, in denen Benutzer Eingaben an wichtigen Entscheidungspunkten innerhalb automatisierter Workflows bereitstellen können.

Was Sie erstellen werden

Sie erstellen einen interaktiven Spielablauf zum Erraten von Nummern, der Anforderungsantwortmuster veranschaulicht:

  • Ein KI-Agent, der intelligente Erraten macht
  • Executors, die Anforderungen direkt über die request_info API senden können
  • Ein Turn-Manager, der die Koordination zwischen dem Agenten und menschlichen Interaktionen übernimmt. @response_handler
  • Interaktive Konsoleneingabe/Ausgabe für Echtzeitfeedback

Voraussetzungen

  • Python 3.10 oder höher
  • Azure OpenAI-Bereitstellung konfiguriert
  • Azure CLI-Authentifizierung konfiguriert (az login)
  • Grundlegendes Verständnis der asynchronen Python-Programmierung

Wichtige Konzepte

Funktionen für Anforderungen und Antworten

Executors verfügen über integrierte Funktionen für Anforderungen und Antworten, die Mensch-in-der-Schleife-Interaktionen ermöglichen.

  • Anruf ctx.request_info(request_data=request_data, response_type=response_type) zum Senden von Anforderungen
  • Verwenden Sie den @response_handler-Dekorator, um Antworten zu behandeln
  • Definieren von benutzerdefinierten Anforderungs-/Antworttypen ohne Vererbungsanforderungen

Request-Response Ablauf

Executors können Anforderungen direkt mithilfe von ctx.request_info() senden und Antworten mithilfe des @response_handler Dekorators verarbeiten.

  1. Executor-Aufrufe ctx.request_info(request_data=request_data, response_type=response_type)
  2. Workflow gibt eine RequestInfoEvent mit den Anforderungsdaten aus.
  3. Externes System (Mensch, API usw.) verarbeitet die Anforderung
  4. Die Antwort wird über send_responses_streaming()
  5. Workflow wird fortgesetzt und liefert die Antwort auf die Methode des @response_handler Executors.

Einrichten der Umgebung

Installieren Sie zunächst die erforderlichen Pakete:

pip install agent-framework-core --pre
pip install azure-identity

Definieren von Anforderungs- und Antwortmodellen

Definieren Sie zunächst die Datenstrukturen für die Anforderungsantwortkommunikation:

import asyncio
from dataclasses import dataclass
from pydantic import BaseModel

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Executor,
    RequestInfoEvent,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowOutputEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
    handler,
    response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

@dataclass
class HumanFeedbackRequest:
    """Request message for human feedback in the guessing game."""
    prompt: str = ""
    guess: int | None = None

class GuessOutput(BaseModel):
    """Structured output from the AI agent with response_format enforcement."""
    guess: int

Dies HumanFeedbackRequest ist eine einfache Datenklasse für strukturierte Anforderungsnutzlasten:

  • Starke Typisierung für Anforderungs-Payloads
  • Zukünftskompatible Validierung
  • Klare Korrelationsemantik mit Antworten
  • Kontextfelder (wie der vorherige Schätzwert) für ansprechende Benutzeroberflächen-Eingabeaufforderungen

Erstellen des Turn-Managers

Der Turn-Manager koordiniert den Fluss zwischen dem KI-Agenten und dem Menschen:

class TurnManager(Executor):
    """Coordinates turns between the AI agent and human player.

    Responsibilities:
    - Start the game by requesting the agent's first guess
    - Process agent responses and request human feedback
    - Handle human feedback and continue the game or finish
    """

    def __init__(self, id: str | None = None):
        super().__init__(id=id or "turn_manager")

    @handler
    async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
        """Start the game by asking the agent for an initial guess."""
        user = ChatMessage(Role.USER, text="Start by making your first guess.")
        await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))

    @handler
    async def on_agent_response(
        self,
        result: AgentExecutorResponse,
        ctx: WorkflowContext,
    ) -> None:
        """Handle the agent's guess and request human guidance."""
        # Parse structured model output (defensive default if agent didn't reply)
        text = result.agent_run_response.text or ""
        last_guess = GuessOutput.model_validate_json(text).guess if text else None

        # Craft a clear human prompt that defines higher/lower relative to agent's guess
        prompt = (
            f"The agent guessed: {last_guess if last_guess is not None else text}. "
            "Type one of: higher (your number is higher than this guess), "
            "lower (your number is lower than this guess), correct, or exit."
        )
        # Send a request using the request_info API
        await ctx.request_info(
            request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
            response_type=str
        )

    @response_handler
    async def on_human_feedback(
        self,
        original_request: HumanFeedbackRequest,
        feedback: str,
        ctx: WorkflowContext[AgentExecutorRequest, str],
    ) -> None:
        """Continue the game or finish based on human feedback."""
        reply = feedback.strip().lower()
        # Use the correlated request's guess to avoid extra state reads
        last_guess = original_request.guess

        if reply == "correct":
            await ctx.yield_output(f"Guessed correctly: {last_guess}")
            return

        # Provide feedback to the agent for the next guess
        user_msg = ChatMessage(
            Role.USER,
            text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
        )
        await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))

Erstellen des Workflows

Erstellen Sie den Hauptworkflow, der alle Komponenten verbindet:

async def main() -> None:
    # Create the chat agent with structured output enforcement
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
    agent = chat_client.create_agent(
        instructions=(
            "You guess a number between 1 and 10. "
            "If the user says 'higher' or 'lower', adjust your next guess. "
            'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
            "No explanations or additional text."
        ),
        response_format=GuessOutput,
    )

    # Create workflow components
    turn_manager = TurnManager(id="turn_manager")
    agent_exec = AgentExecutor(agent=agent, id="agent")

    # Build the workflow graph
    workflow = (
        WorkflowBuilder()
        .set_start_executor(turn_manager)
        .add_edge(turn_manager, agent_exec)  # Ask agent to make/adjust a guess
        .add_edge(agent_exec, turn_manager)  # Agent's response goes back to coordinator
        .build()
    )

    # Execute the interactive workflow
    await run_interactive_workflow(workflow)

async def run_interactive_workflow(workflow):
    """Run the workflow with human-in-the-loop interaction."""
    pending_responses: dict[str, str] | None = None
    completed = False
    workflow_output: str | None = None

    print("🎯 Number Guessing Game")
    print("Think of a number between 1 and 10, and I'll try to guess it!")
    print("-" * 50)

    while not completed:
        # First iteration uses run_stream("start")
        # Subsequent iterations use send_responses_streaming with pending responses
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("start")
        )

        # Collect events for this turn
        events = [event async for event in stream]
        pending_responses = None

        # Process events to collect requests and detect completion
        requests: list[tuple[str, str]] = []  # (request_id, prompt)
        for event in events:
            if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
                # RequestInfoEvent for our HumanFeedbackRequest
                requests.append((event.request_id, event.data.prompt))
            elif isinstance(event, WorkflowOutputEvent):
                # Capture workflow output when yielded
                workflow_output = str(event.data)
                completed = True

        # Check workflow status
        pending_status = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
            for e in events
        )
        idle_with_requests = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
            for e in events
        )

        if pending_status:
            print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
        if idle_with_requests:
            print("⏸️  State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")

        # Handle human requests if any
        if requests and not completed:
            responses: dict[str, str] = {}
            for req_id, prompt in requests:
                print(f"\n🤖 {prompt}")
                answer = input("👤 Enter higher/lower/correct/exit: ").lower()

                if answer == "exit":
                    print("👋 Exiting...")
                    return
                responses[req_id] = answer
            pending_responses = responses

    # Show final result
    print(f"\n🎉 {workflow_output}")

Ausführen des Beispiels

Die vollständige Arbeitsimplementierung finden Sie im Beispiel "Human-in-the-Loop Guessing Game".

Funktionsweise

  1. Workflowinitialisierung: Der Workflow beginnt mit dem TurnManager Anfordern eines anfänglichen Schätzwerts vom KI-Agent.

  2. Agentenantwort: Der KI-Agent macht eine Vermutung und gibt eine strukturierte JSON zurück, die zum TurnManager zurückfließt.

  3. Menschliche Anforderung: Das TurnManager bearbeitet die Vermutung des Agenten und ruft ctx.request_info() mit einem HumanFeedbackRequest auf.

  4. Workflow-Pause: Der Workflow gibt eine RequestInfoEvent und wird fortgesetzt, bis keine weiteren Aktionen ausgeführt werden können, und wartet dann auf menschliche Eingaben.

  5. Menschliche Reaktion: Die externe Anwendung sammelt menschliche Eingaben und sendet Antworten mithilfe von send_responses_streaming().

  6. Fortsetzen: Der Workflow wird fortgesetzt, die Methode von TurnManager@response_handler verarbeitet das Feedback und beendet entweder das Spiel oder sendet eine weitere Anfrage an den Agent.

Wichtige Vorteile

  • Strukturierte Kommunikation: Typsichere Anforderungs- und Antwortmodelle verhindern Laufzeitfehler
  • Korrelation: Anforderungs-IDs stellen sicher, dass Antworten den richtigen Anforderungen entsprechen
  • Pausierbare Ausführung: Workflows können unbegrenzt angehalten werden, während sie auf externe Eingaben warten.
  • Zustandserhaltung: Workflowstatus wird über Pausen-Fortsetzungszyklen hinweg beibehalten.
  • Ereignisgesteuert: Rich-Ereignissystem bietet Einblicke in Workflowstatus und Übergänge

Dieses Muster ermöglicht das Erstellen anspruchsvoller interaktiver Anwendungen, bei denen KI-Agents und Menschen nahtlos in strukturierten Workflows zusammenarbeiten.

Nächste Schritte