Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Diese Seite bietet eine Übersicht darüber, wie die Anforderungs- und Antwortbehandlung im Microsoft Agent Framework-Workflowsystem funktioniert.
Überblick
Ausführende in einem Workflow können Anfragen an Stellen außerhalb des Workflows senden und auf Antworten warten. Dies ist nützlich für Szenarien, in denen ein Executor mit externen Systemen interagieren muss, z. B. menschliche Interaktionen oder andere asynchrone Operationen.
Aktivieren der Anforderungs- und Antwortbehandlung in einem Workflow
Anfragen und Antworten werden über einen speziellen Typ namens InputPortverarbeitet.
// Create an input port that receives requests of type CustomRequestType and responses of type CustomResponseType.
var inputPort = InputPort.Create<CustomRequestType, CustomResponseType>("input-port");
Fügen Sie den Eingabeport zu einem Workflow hinzu.
var executorA = new SomeExecutor();
var workflow = new WorkflowBuilder(inputPort)
.AddEdge(inputPort, executorA)
.AddEdge(executorA, inputPort)
.Build<CustomRequestType>();
Da wir nun im Workflow in beide Richtungen executorA mit inputPort verbunden haben, muss executorA in der Lage sein, Anfragen zu senden und Antworten über die inputPort zu empfangen. Hier ist, was wir in SomeExecutor tun müssen, um eine Anfrage zu senden und eine Antwort zu erhalten.
internal sealed class SomeExecutor() : Executor<CustomResponseType>("SomeExecutor")
{
public async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Alternativ können Sie das Senden von Anforderungen und die Verarbeitung von Antworten in zwei Handlern trennen.
internal sealed class SomeExecutor() : Executor("SomeExecutor")
{
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
{
return routeBuilder
.AddHandler<CustomResponseType>(this.HandleCustomResponseAsync)
.AddHandler<OtherDataType>(this.HandleOtherDataAsync);
}
public async ValueTask HandleCustomResponseAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
}
public async ValueTask HandleOtherDataAsync(OtherDataType message, IWorkflowContext context)
{
// Process the message...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Executors können mithilfe von ctx.request_info() Anforderungen senden und mit @response_handler Antworten verarbeiten.
from agent_framework import response_handler, WorkflowBuilder
executor_a = SomeExecutor()
executor_b = SomeOtherExecutor()
workflow_builder = WorkflowBuilder()
workflow_builder.set_start_executor(executor_a)
workflow_builder.add_edge(executor_a, executor_b)
workflow = workflow_builder.build()
executor_a kann Anforderungen senden und Antworten direkt mithilfe integrierter Funktionen empfangen.
from agent_framework import (
Executor,
WorkflowContext,
handler,
response_handler,
)
class SomeExecutor(Executor):
@handler
async def handle_data(
self,
data: OtherDataType,
context: WorkflowContext,
):
# Process the message...
...
# Send a request using the API
await context.request_info(
request_data=CustomRequestType(...),
response_type=CustomResponseType
)
@response_handler
async def handle_response(
self,
original_request: CustomRequestType,
response: CustomResponseType,
context: WorkflowContext,
):
# Process the response...
...
Der @response_handler Dekorateur registriert die Methode automatisch, um Antworten für die spezifizierten Anfrage- und Antworttypen zu verarbeiten.
Behandeln von Anforderungen und Antworten
Ein InputPort emittiert ein RequestInfoEvent, wenn es eine Anforderung empfängt. Sie können diese Ereignisse abonnieren, um eingehende Anforderungen aus dem Workflow zu behandeln. Wenn Sie eine Antwort von einem externen System erhalten, senden Sie sie mithilfe des Antwortmechanismus an den Workflow zurück. Das Framework leitet die Antwort automatisch an den Executor weiter, der die ursprüngliche Anforderung gesendet hat.
StreamingRun handle = await InProcessExecution.StreamAsync(workflow, input).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
ExternalResponse response = requestInputEvt.Request.CreateResponse<CustomResponseType>(...);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent workflowOutputEvt:
// The workflow has completed successfully
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
return;
}
}
Executors können Anforderungen direkt senden, ohne eine separate Komponente zu benötigen. Wenn ein Executor ctx.request_info() aufruft, gibt der Workflow eine RequestInfoEvent aus. Sie können diese Ereignisse abonnieren, um eingehende Anforderungen aus dem Workflow zu behandeln. Wenn Sie eine Antwort von einem externen System erhalten, senden Sie sie mithilfe des Antwortmechanismus an den Workflow zurück. Das Framework leitet die Antwort automatisch an die Methode des @response_handler Executors weiter.
from agent_framework import RequestInfoEvent
while True:
request_info_events : list[RequestInfoEvent] = []
pending_responses : dict[str, CustomResponseType] = {}
stream = workflow.run_stream(input) if not pending_responses else workflow.send_responses_streaming(pending_responses)
async for event in stream:
if isinstance(event, RequestInfoEvent):
# Handle `RequestInfoEvent` from the workflow
request_info_events.append(event)
if not request_info_events:
break
for request_info_event in request_info_events:
# Handle `RequestInfoEvent` from the workflow
response = CustomResponseType(...)
pending_responses[request_info_event.request_id] = response
Prüfpunkte und Anforderungen
Weitere Informationen zu Prüfpunkten finden Sie auf dieser Seite.
Wenn ein Prüfpunkt erstellt wird, werden ausstehende Anforderungen auch als Teil des Prüfpunktzustands gespeichert. Wenn Sie von einem Wiederherstellungspunkt aus wiederherstellen, werden alle ausstehenden Anforderungen als RequestInfoEvent Objekte erneut ausgegeben, sodass Sie sie erfassen und darauf eingehen können. Sie können während des Fortsetzungsprozesses keine Antworten direkt geben. Stattdessen müssen Sie auf die erneut gesendeten Ereignisse lauschen und mithilfe des standardisierten Antwortmechanismus reagieren.