本教程演示如何使用 Agent Framework 工作流处理工作流中的请求和响应。 你将了解如何创建交互式工作流,以便暂停执行以从外部源(如人类或其他系统)请求输入,然后在提供响应后恢复。
涵盖的概念
在 .NET 中,人机交互工作流使用 RequestPort 和外部请求处理来暂停执行并收集用户输入。 此模式支持交互式工作流,其中系统可以在执行过程中从外部源请求信息。
先决条件
- .NET 8.0 SDK 或更高版本。
- 已配置 Azure OpenAI 服务终结点和部署。
- 已安装并经过身份验证的 Azure CLI(用于 Azure 凭据身份验证)。
- 基本了解 C# 和异步编程。
- 新的控制台应用程序。
安装 NuGet 包
首先,安装 .NET 项目所需的包:
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
关键组件
RequestPort 和外部请求
充当 RequestPort 工作流和外部输入源之间的桥梁。 当工作流需要输入时,它会生成 RequestInfoEvent 由您的应用程序处理。
// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
信号类型
定义信号类型以传达不同的请求类型:
/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
Init, // Initial guess request
Above, // Previous guess was too high
Below, // Previous guess was too low
}
工作流执行程序
创建处理用户输入并提供反馈的执行程序:
/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
_targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
{
_tries++;
if (message == _targetNumber)
{
await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
.ConfigureAwait(false);
}
else if (message < _targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
}
}
}
生成工作流
在反馈循环中连接 RequestPort 和执行程序:
internal static class WorkflowHelper
{
internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
{
// Create the executors
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
JudgeExecutor judgeExecutor = new(42);
// Build the workflow by connecting executors in a loop
return new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.BuildAsync<NumberSignal>();
}
}
执行交互式工作流
在工作流执行期间处理外部请求:
private static async Task Main()
{
// Create the workflow
var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);
// Execute the workflow
await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle human input request from the workflow
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
}
请求处理
处理不同类型的输入请求:
private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
switch (request.DataAs<NumberSignal?>())
{
case NumberSignal.Init:
int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
return request.CreateResponse(initialGuess);
case NumberSignal.Above:
int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
return request.CreateResponse(lowerGuess);
case NumberSignal.Below:
int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
return request.CreateResponse(higherGuess);
default:
throw new ArgumentException("Unexpected request type.");
}
}
private static int ReadIntegerFromConsole(string prompt)
{
while (true)
{
Console.Write(prompt);
string? input = Console.ReadLine();
if (int.TryParse(input, out int value))
{
return value;
}
Console.WriteLine("Invalid input. Please enter a valid integer.");
}
}
实现概念
RequestInfoEvent 流程
- 工作流执行:工作流进程,直到需要外部输入
-
请求生成:RequestPort 生成包含请求详细信息的
RequestInfoEvent。 - 外部处理:应用程序捕获事件并收集用户输入
-
响应提交:发送
ExternalResponse返回以继续工作流 - 工作流恢复:工作流继续使用提供的输入进行处理
工作流生命周期
-
流式执行:利用
StreamAsync实时监视事件 -
事件处理:
RequestInfoEvent用于输入请求和WorkflowOutputEvent用于完成的过程 - 响应协调:使用工作流的响应处理机制匹配对请求的响应
实施流程
工作流初始化:工作流首先向 RequestPort 发送一个
NumberSignal.Init。请求生成:RequestPort 生成一个请求以获取用户的初始猜测。
工作流暂停:当应用程序处理请求时,工作流将暂停并等待外部输入。
人工响应:外部应用程序收集用户输入并发送
ExternalResponse回工作流。处理和反馈:处理
JudgeExecutor猜测并完成工作流或发送新信号(上/下方)以请求另一个猜测。循环延续:该过程将重复,直到猜出正确的数字。
框架优势
- 类型安全性:强键入可确保维护请求响应协定
- 事件驱动:丰富的事件系统提供工作流执行的可见性
- 暂停执行:工作流在等待外部输入时可以无限期暂停
- 状态管理:工作流状态在暂停恢复周期之间保留
- 灵活集成:RequestPorts 可与任何外部输入源(UI、API、控制台等)集成。
完整示例
有关完整实现,请参阅 Human-in-the-Loop Basic 示例。
此模式可生成复杂的交互式应用程序,用户可在自动化工作流中的关键决策点提供输入。
你将构建的内容
你将创建一个交互式数字猜测游戏工作流,用于演示请求-响应模式:
- 发出智能猜测的 AI 代理
- 能够直接使用
request_infoAPI发送请求的执行程序 - 一个协调代理与人工交互的轮次管理器,使用
@response_handler - 用于实时反馈的交互式控制台输入/输出
先决条件
- Python 3.10 或更高版本
- 已配置 Azure OpenAI 部署
- 已配置 Azure CLI 身份验证(
az login) - 基本了解 Python 异步编程
关键概念
请求和响应功能
执行程序具有内置的请求和响应功能,可实现人工循环交互:
- 调用
ctx.request_info(request_data=request_data, response_type=response_type)以发送请求 - 使用
@response_handler修饰器处理响应 - 定义没有继承要求的自定义请求/响应类型
请求-响应流程
执行器可以使用ctx.request_info()直接发送请求,并使用@response_handler修饰器处理响应:
- 执行器调用
ctx.request_info(request_data=request_data, response_type=response_type) - 工作流发出包含请求数据的
RequestInfoEvent - 外部系统(人工、API 等)处理请求
- 通过
send_responses_streaming()发送响应 - 工作流将恢复并传送响应给执行程序的
@response_handler方法。
设置环境
首先,安装所需的包:
pip install agent-framework-core --pre
pip install azure-identity
定义请求和响应模型
首先定义请求-响应通信的数据结构:
import asyncio
from dataclasses import dataclass
from pydantic import BaseModel
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatMessage,
Executor,
RequestInfoEvent,
Role,
WorkflowBuilder,
WorkflowContext,
WorkflowOutputEvent,
WorkflowRunState,
WorkflowStatusEvent,
handler,
response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
@dataclass
class HumanFeedbackRequest:
"""Request message for human feedback in the guessing game."""
prompt: str = ""
guess: int | None = None
class GuessOutput(BaseModel):
"""Structured output from the AI agent with response_format enforcement."""
guess: int
这个 HumanFeedbackRequest 是用于结构化请求负载的简单数据类。
- 请求有效负载的强类型
- 向前兼容的验证
- 明确响应与语义相关的关联
- 丰富的 UI 提示的背景字段(如先前猜测的字段)
创建轮次管理器
轮次管理器协调 AI 代理和人工之间的流:
class TurnManager(Executor):
"""Coordinates turns between the AI agent and human player.
Responsibilities:
- Start the game by requesting the agent's first guess
- Process agent responses and request human feedback
- Handle human feedback and continue the game or finish
"""
def __init__(self, id: str | None = None):
super().__init__(id=id or "turn_manager")
@handler
async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Start the game by asking the agent for an initial guess."""
user = ChatMessage(Role.USER, text="Start by making your first guess.")
await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))
@handler
async def on_agent_response(
self,
result: AgentExecutorResponse,
ctx: WorkflowContext,
) -> None:
"""Handle the agent's guess and request human guidance."""
# Parse structured model output (defensive default if agent didn't reply)
text = result.agent_run_response.text or ""
last_guess = GuessOutput.model_validate_json(text).guess if text else None
# Craft a clear human prompt that defines higher/lower relative to agent's guess
prompt = (
f"The agent guessed: {last_guess if last_guess is not None else text}. "
"Type one of: higher (your number is higher than this guess), "
"lower (your number is lower than this guess), correct, or exit."
)
# Send a request using the request_info API
await ctx.request_info(
request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
response_type=str
)
@response_handler
async def on_human_feedback(
self,
original_request: HumanFeedbackRequest,
feedback: str,
ctx: WorkflowContext[AgentExecutorRequest, str],
) -> None:
"""Continue the game or finish based on human feedback."""
reply = feedback.strip().lower()
# Use the correlated request's guess to avoid extra state reads
last_guess = original_request.guess
if reply == "correct":
await ctx.yield_output(f"Guessed correctly: {last_guess}")
return
# Provide feedback to the agent for the next guess
user_msg = ChatMessage(
Role.USER,
text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
)
await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))
生成工作流
创建连接所有组件的主工作流:
async def main() -> None:
# Create the chat agent with structured output enforcement
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
agent = chat_client.create_agent(
instructions=(
"You guess a number between 1 and 10. "
"If the user says 'higher' or 'lower', adjust your next guess. "
'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
"No explanations or additional text."
),
response_format=GuessOutput,
)
# Create workflow components
turn_manager = TurnManager(id="turn_manager")
agent_exec = AgentExecutor(agent=agent, id="agent")
# Build the workflow graph
workflow = (
WorkflowBuilder()
.set_start_executor(turn_manager)
.add_edge(turn_manager, agent_exec) # Ask agent to make/adjust a guess
.add_edge(agent_exec, turn_manager) # Agent's response goes back to coordinator
.build()
)
# Execute the interactive workflow
await run_interactive_workflow(workflow)
async def run_interactive_workflow(workflow):
"""Run the workflow with human-in-the-loop interaction."""
pending_responses: dict[str, str] | None = None
completed = False
workflow_output: str | None = None
print("🎯 Number Guessing Game")
print("Think of a number between 1 and 10, and I'll try to guess it!")
print("-" * 50)
while not completed:
# First iteration uses run_stream("start")
# Subsequent iterations use send_responses_streaming with pending responses
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("start")
)
# Collect events for this turn
events = [event async for event in stream]
pending_responses = None
# Process events to collect requests and detect completion
requests: list[tuple[str, str]] = [] # (request_id, prompt)
for event in events:
if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
# RequestInfoEvent for our HumanFeedbackRequest
requests.append((event.request_id, event.data.prompt))
elif isinstance(event, WorkflowOutputEvent):
# Capture workflow output when yielded
workflow_output = str(event.data)
completed = True
# Check workflow status
pending_status = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
for e in events
)
idle_with_requests = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
for e in events
)
if pending_status:
print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
if idle_with_requests:
print("⏸️ State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")
# Handle human requests if any
if requests and not completed:
responses: dict[str, str] = {}
for req_id, prompt in requests:
print(f"\n🤖 {prompt}")
answer = input("👤 Enter higher/lower/correct/exit: ").lower()
if answer == "exit":
print("👋 Exiting...")
return
responses[req_id] = answer
pending_responses = responses
# Show final result
print(f"\n🎉 {workflow_output}")
运行示例
有关完整的人机交互猜谜游戏示例的实现,请参阅人机交互猜谜游戏示例。
工作原理
工作流初始化:工作流从请求 AI 代理提供一个初步猜测开始
TurnManager。代理响应:AI 代理进行猜测并返回结构化的 JSON,JSON 数据将流回
TurnManager。人工请求:处理
TurnManager代理的猜测,并调用ctx.request_info()带有HumanFeedbackRequest。工作流暂停:工作流发出一个
RequestInfoEvent并继续进行,直到无法采取进一步操作,然后等待人工输入。人类响应:外部应用程序收集人类输入,并使用
send_responses_streaming()发送响应。恢复并继续:工作流恢复,
TurnManager的@response_handler方法处理人工反馈,并结束游戏或向代理发送另一个请求。
主要优势
- 结构化通信:类型安全请求和响应模型可防止运行时错误
- 相关性:请求 ID 确保响应与正确的请求匹配
- 暂停执行:工作流在等待外部输入时可以无限期暂停
- 状态保留:工作流状态在暂停恢复周期内保持
- 事件驱动:丰富的事件系统提供工作流状态和转换的可见性
此模式支持构建复杂的交互式应用程序,其中 AI 代理和人类可在结构化工作流中无缝协作。