本页概述了 请求和响应 处理在 Microsoft Agent Framework 工作流系统中的工作原理。
概述
工作流中的执行程序可以将请求发送到工作流外部并等待响应。 这对于执行体需要与外部系统交互(例如人工参与交互或任何其他异步操作)的情况非常有用。
在工作流中启用请求和响应处理
请求和响应通过称为 InputPort的特殊类型进行处理。
// Create an input port that receives requests of type CustomRequestType and responses of type CustomResponseType.
var inputPort = InputPort.Create<CustomRequestType, CustomResponseType>("input-port");
将输入端口添加到工作流。
var executorA = new SomeExecutor();
var workflow = new WorkflowBuilder(inputPort)
.AddEdge(inputPort, executorA)
.AddEdge(executorA, inputPort)
.Build<CustomRequestType>();
现在,由于在工作流中,我们的executorA与inputPort是双向连接的,因此executorA需要能够通过inputPort发送请求和接收响应。 下面是我们需要在SomeExecutor执行的步骤,以发送请求并接收响应。
internal sealed class SomeExecutor() : Executor<CustomResponseType>("SomeExecutor")
{
public async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
或者, SomeExecutor 可以将请求发送和响应处理分为两个处理程序。
internal sealed class SomeExecutor() : Executor("SomeExecutor")
{
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
{
return routeBuilder
.AddHandler<CustomResponseType>(this.HandleCustomResponseAsync)
.AddHandler<OtherDataType>(this.HandleOtherDataAsync);
}
public async ValueTask HandleCustomResponseAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
}
public async ValueTask HandleOtherDataAsync(OtherDataType message, IWorkflowContext context)
{
// Process the message...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
执行程序可以使用ctx.request_info()发送请求,并使用@response_handler处理响应。
from agent_framework import response_handler, WorkflowBuilder
executor_a = SomeExecutor()
executor_b = SomeOtherExecutor()
workflow_builder = WorkflowBuilder()
workflow_builder.set_start_executor(executor_a)
workflow_builder.add_edge(executor_a, executor_b)
workflow = workflow_builder.build()
executor_a 可以使用内置功能直接发送请求和接收响应。
from agent_framework import (
Executor,
WorkflowContext,
handler,
response_handler,
)
class SomeExecutor(Executor):
@handler
async def handle_data(
self,
data: OtherDataType,
context: WorkflowContext,
):
# Process the message...
...
# Send a request using the API
await context.request_info(
request_data=CustomRequestType(...),
response_type=CustomResponseType
)
@response_handler
async def handle_response(
self,
original_request: CustomRequestType,
response: CustomResponseType,
context: WorkflowContext,
):
# Process the response...
...
@response_handler修饰器会自动注册方法来处理指定请求和响应类型的响应。
处理请求和响应
在收到请求时,InputPort 会发出 RequestInfoEvent。 可以订阅这些事件来处理来自工作流的传入请求。 从外部系统收到响应时,请使用响应机制将其发送回工作流。 框架会自动将响应路由到发送原始请求的执行程序。
StreamingRun handle = await InProcessExecution.StreamAsync(workflow, input).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
ExternalResponse response = requestInputEvt.Request.CreateResponse<CustomResponseType>(...);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent workflowOutputEvt:
// The workflow has completed successfully
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
return;
}
}
执行程序可以直接发送请求,而无需单独的组件。 执行程序调用 ctx.request_info()时,工作流会发出一个 RequestInfoEvent。 可以订阅这些事件来处理来自工作流的传入请求。 从外部系统收到响应时,请使用响应机制将其发送回工作流。 框架自动将响应路由到执行程序 @response_handler 的方法。
from agent_framework import RequestInfoEvent
while True:
request_info_events : list[RequestInfoEvent] = []
pending_responses : dict[str, CustomResponseType] = {}
stream = workflow.run_stream(input) if not pending_responses else workflow.send_responses_streaming(pending_responses)
async for event in stream:
if isinstance(event, RequestInfoEvent):
# Handle `RequestInfoEvent` from the workflow
request_info_events.append(event)
if not request_info_events:
break
for request_info_event in request_info_events:
# Handle `RequestInfoEvent` from the workflow
response = CustomResponseType(...)
pending_responses[request_info_event.request_id] = response
检查点和请求
若要了解有关检查点的详细信息,请参阅 此页面。
创建检查点时,挂起的请求也会作为检查点状态的一部分被保存。 从检查点还原时,系统会将任何挂起的请求重新发送为 RequestInfoEvent 对象,以便更好地捕获和响应这些请求。 不能在恢复操作期间直接提供响应,相反,你必须侦听事件的重新发出,并使用标准响应机制来进行响应。
后续步骤
- 了解如何管理 工作流中的状态。
- 了解如何创建检查点并从中恢复。
- 了解如何监视工作流。
- 了解工作流中的状态隔离。
- 了解如何可视化工作流。