다음을 통해 공유


워크플로에서 요청 및 응답 처리

이 자습서에서는 에이전트 프레임워크 워크플로를 사용하여 워크플로에서 요청 및 응답을 처리하는 방법을 보여 줍니다. 실행을 일시 중지하여 외부 원본(예: 사용자 또는 다른 시스템)의 입력을 요청한 다음, 응답이 제공되면 다시 시작할 수 있는 대화형 워크플로를 만드는 방법을 알아봅니다.

다루는 개념

.NET에서 휴먼 인더 루프 워크플로는 실행을 일시 중지하고 사용자 입력을 수집하기 위해 외부 요청 처리를 사용합니다 RequestPort . 이 패턴을 사용하면 시스템이 실행 중에 외부 원본의 정보를 요청할 수 있는 대화형 워크플로를 사용할 수 있습니다.

필수 조건

NuGet 패키지 설치

먼저 .NET 프로젝트에 필요한 패키지를 설치합니다.

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

주요 구성 요소

RequestPort 및 외부 요청

워크플로 RequestPort 와 외부 입력 원본 간의 브리지 역할을 합니다. 워크플로에 입력이 필요한 경우 애플리케이션이 처리할 RequestInfoEvent를 생성합니다.

// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

신호 형식

다양한 요청 형식을 통신하도록 신호 형식을 정의합니다.

/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
    Init,     // Initial guess request
    Above,    // Previous guess was too high
    Below,    // Previous guess was too low
}

워크플로 실행기

사용자 입력을 처리하고 피드백을 제공하는 실행기를 만듭니다.

/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        _targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
    {
        _tries++;
        if (message == _targetNumber)
        {
            await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
                         .ConfigureAwait(false);
        }
        else if (message < _targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
        }
    }
}

워크플로 빌드

피드백 루프에서 RequestPort 및 실행기를 연결합니다.

internal static class WorkflowHelper
{
    internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
    {
        // Create the executors
        RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
        JudgeExecutor judgeExecutor = new(42);

        // Build the workflow by connecting executors in a loop
        return new WorkflowBuilder(numberRequestPort)
            .AddEdge(numberRequestPort, judgeExecutor)
            .AddEdge(judgeExecutor, numberRequestPort)
            .WithOutputFrom(judgeExecutor)
            .BuildAsync<NumberSignal>();
    }
}

대화형 워크플로 실행

워크플로 실행 중에 외부 요청을 처리합니다.

private static async Task Main()
{
    // Create the workflow
    var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);

    // Execute the workflow
    await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
    await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
    {
        switch (evt)
        {
            case RequestInfoEvent requestInputEvt:
                // Handle human input request from the workflow
                ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
                await handle.SendResponseAsync(response).ConfigureAwait(false);
                break;

            case WorkflowOutputEvent outputEvt:
                // The workflow has yielded output
                Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
                return;
        }
    }
}

요청 처리

다양한 유형의 입력 요청을 처리합니다.

private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
    switch (request.DataAs<NumberSignal?>())
    {
        case NumberSignal.Init:
            int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
            return request.CreateResponse(initialGuess);
        case NumberSignal.Above:
            int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
            return request.CreateResponse(lowerGuess);
        case NumberSignal.Below:
            int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
            return request.CreateResponse(higherGuess);
        default:
            throw new ArgumentException("Unexpected request type.");
    }
}

private static int ReadIntegerFromConsole(string prompt)
{
    while (true)
    {
        Console.Write(prompt);
        string? input = Console.ReadLine();
        if (int.TryParse(input, out int value))
        {
            return value;
        }
        Console.WriteLine("Invalid input. Please enter a valid integer.");
    }
}

구현 개념

정보 요청 이벤트 흐름

  1. 워크플로 실행: 외부 입력이 필요할 때까지 워크플로가 처리됩니다.
  2. 요청 생성: RequestPort가 요청 세부 정보를 사용하여 RequestInfoEvent 생성합니다.
  3. 외부 처리: 애플리케이션이 이벤트를 catch하고 사용자 입력을 수집합니다.
  4. 응답 제출: 워크플로를 계속하려면 ExternalResponse 반환하십시오
  5. 워크플로 다시 시작: 워크플로가 제공된 입력으로 처리를 계속합니다.

워크플로 수명 주기

  • 스트리밍 실행: 실시간으로 이벤트를 모니터링하는 데 사용 StreamAsync
  • 이벤트 처리: 입력 요청 및 RequestInfoEvent 완료를 위한 프로세스 WorkflowOutputEvent
  • 응답 조정: 워크플로의 응답 처리 메커니즘을 사용하여 요청에 응답 일치

구현 흐름

  1. 워크플로 초기화: 워크플로는 RequestPort에 NumberSignal.Init 보내는 것으로 시작합니다.

  2. 요청 생성: RequestPort는 사용자로부터 초기 추측을 요청하는 요청을 생성합니다 RequestInfoEvent .

  3. 워크플로 일시 중지: 애플리케이션이 요청을 처리하는 동안 워크플로가 일시 중지되고 외부 입력을 기다립니다.

  4. 사용자 응답: 외부 애플리케이션은 사용자 입력을 수집하고 워크플로로 ExternalResponse 다시 보냅니다.

  5. 처리 및 피드백: JudgeExecutor 추측을 처리하고 워크플로를 완료하거나 새 신호(위/아래)를 보내 다른 추측을 요청합니다.

  6. 루프 연속: 올바른 숫자를 추측할 때까지 프로세스가 반복됩니다.

프레임워크 이점

  • 형식 안전성: 강력한 입력을 통해 요청-응답 계약이 유지됩니다.
  • 이벤트 기반: 리치 이벤트 시스템은 워크플로 실행에 대한 가시성을 제공합니다.
  • 일시 중지 가능한 실행: 외부 입력을 기다리는 동안 워크플로가 무기한 일시 중지할 수 있습니다.
  • 상태 관리: 워크플로 상태는 일시 중지-다시 시작 주기에서 유지됩니다.
  • 유연한 통합: RequestPorts는 모든 외부 입력 원본(UI, API, 콘솔 등)과 통합할 수 있습니다.

전체 샘플

전체 작업 구현은 휴먼 인 더 루프 기본 샘플을 참조하세요.

이 패턴을 사용하면 사용자가 자동화된 워크플로 내의 주요 의사 결정 지점에서 입력을 제공할 수 있는 정교한 대화형 애플리케이션을 빌드할 수 있습니다.

만들게 될 것들

요청-응답 패턴을 보여 주는 대화형 숫자 추측 게임 워크플로를 만듭니다.

  • 지능형 추측을 만드는 AI 에이전트
  • API를 사용하여 직접 요청을 보낼 수 있는 request_info 실행기
  • 에이전트와 사용자 상호 작용 간에 @response_handler를 사용하여 조정하는 턴 관리자
  • 실시간 피드백을 위한 대화형 콘솔 입력/출력

필수 조건

  • Python 3.10 이상
  • 구성된 Azure OpenAI 배포
  • 구성된 Azure CLI 인증(az login)
  • Python 비동기 프로그래밍에 대한 기본 이해

주요 개념

요청 및 응답 기능

실행기에는 휴먼 인 더 루프 상호 작용을 가능하게 하는 기본 제공 요청 및 응답 기능이 있습니다.

  • 요청을 보내기 위한 호출 ctx.request_info(request_data=request_data, response_type=response_type)
  • 데코레이터를 @response_handler 사용하여 응답 처리
  • 상속 요구 사항 없이 사용자 지정 요청/응답 형식 정의

Request-Response 플로우

실행기는 ctx.request_info()를 사용하여 직접 요청을 보내고, @response_handler 데코레이터를 사용하여 응답을 처리할 수 있습니다.

  1. 실행기 호출 ctx.request_info(request_data=request_data, response_type=response_type)
  2. 워크플로가 요청 데이터를 사용하여 RequestInfoEvent 내보낸다
  3. 외부 시스템(사람, API 등)이 요청을 처리합니다.
  4. 응답이 send_responses_streaming()을 통해 다시 전송됩니다.
  5. 워크플로가 다시 시작되고 실행기 @response_handler 메서드에 응답을 전달합니다.

환경 설정

먼저 필요한 패키지를 설치합니다.

pip install agent-framework-core --pre
pip install azure-identity

요청 및 응답 모델 정의

먼저 요청-응답 통신을 위한 데이터 구조를 정의합니다.

import asyncio
from dataclasses import dataclass
from pydantic import BaseModel

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Executor,
    RequestInfoEvent,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowOutputEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
    handler,
    response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

@dataclass
class HumanFeedbackRequest:
    """Request message for human feedback in the guessing game."""
    prompt: str = ""
    guess: int | None = None

class GuessOutput(BaseModel):
    """Structured output from the AI agent with response_format enforcement."""
    guess: int

구조적 HumanFeedbackRequest 요청 페이로드에 대한 간단한 데이터 클래스입니다.

  • 요청 페이로드에 대한 강력한 타이핑
  • 순방향 호환성 유효성 검사
  • 응답과의 상관 관계 의미 체계를 명확하게 하기
  • 상세한 UI 프롬프트에 대한 맥락별 필드(예: 이전 추측)

Turn Manager 만들기

턴 관리자는 AI 에이전트와 사람 간의 흐름을 조정합니다.

class TurnManager(Executor):
    """Coordinates turns between the AI agent and human player.

    Responsibilities:
    - Start the game by requesting the agent's first guess
    - Process agent responses and request human feedback
    - Handle human feedback and continue the game or finish
    """

    def __init__(self, id: str | None = None):
        super().__init__(id=id or "turn_manager")

    @handler
    async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
        """Start the game by asking the agent for an initial guess."""
        user = ChatMessage(Role.USER, text="Start by making your first guess.")
        await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))

    @handler
    async def on_agent_response(
        self,
        result: AgentExecutorResponse,
        ctx: WorkflowContext,
    ) -> None:
        """Handle the agent's guess and request human guidance."""
        # Parse structured model output (defensive default if agent didn't reply)
        text = result.agent_run_response.text or ""
        last_guess = GuessOutput.model_validate_json(text).guess if text else None

        # Craft a clear human prompt that defines higher/lower relative to agent's guess
        prompt = (
            f"The agent guessed: {last_guess if last_guess is not None else text}. "
            "Type one of: higher (your number is higher than this guess), "
            "lower (your number is lower than this guess), correct, or exit."
        )
        # Send a request using the request_info API
        await ctx.request_info(
            request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
            response_type=str
        )

    @response_handler
    async def on_human_feedback(
        self,
        original_request: HumanFeedbackRequest,
        feedback: str,
        ctx: WorkflowContext[AgentExecutorRequest, str],
    ) -> None:
        """Continue the game or finish based on human feedback."""
        reply = feedback.strip().lower()
        # Use the correlated request's guess to avoid extra state reads
        last_guess = original_request.guess

        if reply == "correct":
            await ctx.yield_output(f"Guessed correctly: {last_guess}")
            return

        # Provide feedback to the agent for the next guess
        user_msg = ChatMessage(
            Role.USER,
            text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
        )
        await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))

워크플로 빌드

모든 구성 요소를 연결하는 기본 워크플로를 만듭니다.

async def main() -> None:
    # Create the chat agent with structured output enforcement
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
    agent = chat_client.create_agent(
        instructions=(
            "You guess a number between 1 and 10. "
            "If the user says 'higher' or 'lower', adjust your next guess. "
            'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
            "No explanations or additional text."
        ),
        response_format=GuessOutput,
    )

    # Create workflow components
    turn_manager = TurnManager(id="turn_manager")
    agent_exec = AgentExecutor(agent=agent, id="agent")

    # Build the workflow graph
    workflow = (
        WorkflowBuilder()
        .set_start_executor(turn_manager)
        .add_edge(turn_manager, agent_exec)  # Ask agent to make/adjust a guess
        .add_edge(agent_exec, turn_manager)  # Agent's response goes back to coordinator
        .build()
    )

    # Execute the interactive workflow
    await run_interactive_workflow(workflow)

async def run_interactive_workflow(workflow):
    """Run the workflow with human-in-the-loop interaction."""
    pending_responses: dict[str, str] | None = None
    completed = False
    workflow_output: str | None = None

    print("🎯 Number Guessing Game")
    print("Think of a number between 1 and 10, and I'll try to guess it!")
    print("-" * 50)

    while not completed:
        # First iteration uses run_stream("start")
        # Subsequent iterations use send_responses_streaming with pending responses
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("start")
        )

        # Collect events for this turn
        events = [event async for event in stream]
        pending_responses = None

        # Process events to collect requests and detect completion
        requests: list[tuple[str, str]] = []  # (request_id, prompt)
        for event in events:
            if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
                # RequestInfoEvent for our HumanFeedbackRequest
                requests.append((event.request_id, event.data.prompt))
            elif isinstance(event, WorkflowOutputEvent):
                # Capture workflow output when yielded
                workflow_output = str(event.data)
                completed = True

        # Check workflow status
        pending_status = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
            for e in events
        )
        idle_with_requests = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
            for e in events
        )

        if pending_status:
            print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
        if idle_with_requests:
            print("⏸️  State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")

        # Handle human requests if any
        if requests and not completed:
            responses: dict[str, str] = {}
            for req_id, prompt in requests:
                print(f"\n🤖 {prompt}")
                answer = input("👤 Enter higher/lower/correct/exit: ").lower()

                if answer == "exit":
                    print("👋 Exiting...")
                    return
                responses[req_id] = answer
            pending_responses = responses

    # Show final result
    print(f"\n🎉 {workflow_output}")

예제 실행

전체 작업 구현은 휴먼 인 더 루프 추측 게임 샘플을 참조하세요.

작동 방식

  1. 워크플로 초기화: 워크플로는 AI 에이전트에서 초기 추측을 요청하는 것으로 시작 TurnManager 합니다.

  2. 에이전트 응답: AI 에이전트가 추측을 수행하고 구조화된 JSON을 반환합니다. 이 JSON은 다시 TurnManager흐릅니다.

  3. 사용자 요청: TurnManager 에이전트의 추측을 처리하고 나서 ctx.request_info()HumanFeedbackRequest으로 호출합니다.

  4. 워크플로 일시 중지: 워크플로 RequestInfoEvent 는 추가 작업을 수행할 수 없을 때까지 내보내고 계속한 다음, 사람의 입력을 기다립니다.

  5. 사용자 응답: 외부 애플리케이션은 사용자 입력을 수집하고 .를 사용하여 send_responses_streaming()응답을 다시 보냅니다.

  6. 다시 시작 및 계속: 워크플로가 다시 시작되고, TurnManager'의 @response_handler 메서드가 사용자 피드백을 처리하고, 게임을 종료하거나 에이전트에 다른 요청을 보냅니다.

주요 이점

  • 구조적 통신: 형식이 안전한 요청 및 응답 모델에서 런타임 오류를 방지합니다.
  • 상관 관계: 요청 ID는 응답이 올바른 요청과 일치하는지 확인합니다.
  • 일시 중지 가능한 실행: 외부 입력을 기다리는 동안 워크플로가 무기한 일시 중지할 수 있습니다.
  • 상태 유지: 워크플로 상태는 일시 중지-다시 시작 주기에서 유지 관리됩니다.
  • 이벤트 기반: 리치 이벤트 시스템은 워크플로 상태 및 전환에 대한 가시성을 제공합니다.

이 패턴을 사용하면 AI 에이전트와 인간이 구조화된 워크플로 내에서 원활하게 협업하는 정교한 대화형 애플리케이션을 빌드할 수 있습니다.

다음 단계