Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ta strona zawiera omówienie sposobu działania obsługi żądań i odpowiedzi w systemie przepływu pracy programu Microsoft Agent Framework.
Przegląd
Wykonawcy w przepływie pracy mogą wysyłać żądania na zewnątrz przepływu pracy i czekać na odpowiedzi. Jest to przydatne w przypadku scenariuszy, w których funkcja wykonawcza musi wchodzić w interakcje z systemami zewnętrznymi, takimi jak interakcje typu human-in-the-loop lub inne operacje asynchroniczne.
Włączanie obsługi żądań i odpowiedzi w przepływie pracy
Żądania i odpowiedzi są obsługiwane za pośrednictwem specjalnego typu o nazwie InputPort.
// Create an input port that receives requests of type CustomRequestType and responses of type CustomResponseType.
var inputPort = InputPort.Create<CustomRequestType, CustomResponseType>("input-port");
Dodaj port wejściowy do przepływu pracy.
var executorA = new SomeExecutor();
var workflow = new WorkflowBuilder(inputPort)
.AddEdge(inputPort, executorA)
.AddEdge(executorA, inputPort)
.Build<CustomRequestType>();
Teraz, ponieważ w procesie mamy executorA połączony z inputPort w obu kierunkach, executorA musi mieć możliwość wysyłania żądań i odbierania odpowiedzi za pośrednictwem inputPort. Oto, co musimy zrobić, SomeExecutor aby wysłać żądanie i otrzymać odpowiedź.
internal sealed class SomeExecutor() : Executor<CustomResponseType>("SomeExecutor")
{
public async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Alternatywnie, SomeExecutor można rozdzielić wysyłanie żądań i obsługę odpowiedzi na dwa moduły obsługi.
internal sealed class SomeExecutor() : Executor("SomeExecutor")
{
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
{
return routeBuilder
.AddHandler<CustomResponseType>(this.HandleCustomResponseAsync)
.AddHandler<OtherDataType>(this.HandleOtherDataAsync);
}
public async ValueTask HandleCustomResponseAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
}
public async ValueTask HandleOtherDataAsync(OtherDataType message, IWorkflowContext context)
{
// Process the message...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Egzekutorzy mogą wysyłać żądania przy użyciu ctx.request_info() i obsługiwać odpowiedzi za pomocą @response_handler.
from agent_framework import response_handler, WorkflowBuilder
executor_a = SomeExecutor()
executor_b = SomeOtherExecutor()
workflow_builder = WorkflowBuilder()
workflow_builder.set_start_executor(executor_a)
workflow_builder.add_edge(executor_a, executor_b)
workflow = workflow_builder.build()
executor_a może wysyłać żądania i odbierać odpowiedzi bezpośrednio przy użyciu wbudowanych funkcji.
from agent_framework import (
Executor,
WorkflowContext,
handler,
response_handler,
)
class SomeExecutor(Executor):
@handler
async def handle_data(
self,
data: OtherDataType,
context: WorkflowContext,
):
# Process the message...
...
# Send a request using the API
await context.request_info(
request_data=CustomRequestType(...),
response_type=CustomResponseType
)
@response_handler
async def handle_response(
self,
original_request: CustomRequestType,
response: CustomResponseType,
context: WorkflowContext,
):
# Process the response...
...
Dekorator @response_handler automatycznie rejestruje metodę do obsługi odpowiedzi dla określonych typów żądań i odpowiedzi.
Obsługa żądań i odpowiedzi
Element InputPort emituje RequestInfoEvent gdy otrzyma żądanie. Możesz zapisać się na te zdarzenia w celu obsługi żądań przychodzących z procesu pracy. Gdy otrzymasz odpowiedź z systemu zewnętrznego, wyślij ją z powrotem do przepływu pracy przy użyciu mechanizmu odpowiedzi. Platforma automatycznie kieruje odpowiedź do funkcji wykonawczej, która wysłała oryginalne żądanie.
StreamingRun handle = await InProcessExecution.StreamAsync(workflow, input).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
ExternalResponse response = requestInputEvt.Request.CreateResponse<CustomResponseType>(...);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent workflowOutputEvt:
// The workflow has completed successfully
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
return;
}
}
Funkcje wykonawcze mogą wysyłać żądania bezpośrednio bez konieczności użycia oddzielnego składnika. Gdy wykonawca wywołuje ctx.request_info(), przepływ pracy emituje RequestInfoEvent. Możesz zapisać się na te zdarzenia w celu obsługi żądań przychodzących z procesu pracy. Gdy otrzymasz odpowiedź z systemu zewnętrznego, wyślij ją z powrotem do przepływu pracy przy użyciu mechanizmu odpowiedzi. Platforma automatycznie kieruje odpowiedź na metodę wykonawcy @response_handler .
from agent_framework import RequestInfoEvent
while True:
request_info_events : list[RequestInfoEvent] = []
pending_responses : dict[str, CustomResponseType] = {}
stream = workflow.run_stream(input) if not pending_responses else workflow.send_responses_streaming(pending_responses)
async for event in stream:
if isinstance(event, RequestInfoEvent):
# Handle `RequestInfoEvent` from the workflow
request_info_events.append(event)
if not request_info_events:
break
for request_info_event in request_info_events:
# Handle `RequestInfoEvent` from the workflow
response = CustomResponseType(...)
pending_responses[request_info_event.request_id] = response
Punkty kontrolne i żądania
Aby dowiedzieć się więcej na temat punktów kontrolnych, zapoznaj się z tą stroną.
Po utworzeniu punktu kontrolnego oczekujące żądania są również zapisywane jako część stanu punktu kontrolnego. Po przywróceniu z punktu kontrolnego wszystkie oczekujące żądania będą wysyłane ponownie jako RequestInfoEvent obiekty, co umożliwia przechwytywanie i reagowanie na nie. Nie można podać odpowiedzi bezpośrednio podczas operacji wznawiania — zamiast tego należy nasłuchiwać ponownie emitowanych zdarzeń i reagować przy użyciu standardowego mechanizmu odpowiedzi.