Udostępnij przez


Przepływy pracy programu Microsoft Agent Framework — żądanie i odpowiedź

Ta strona zawiera omówienie sposobu działania obsługi żądań i odpowiedzi w systemie przepływu pracy programu Microsoft Agent Framework.

Przegląd

Wykonawcy w przepływie pracy mogą wysyłać żądania na zewnątrz przepływu pracy i czekać na odpowiedzi. Jest to przydatne w przypadku scenariuszy, w których funkcja wykonawcza musi wchodzić w interakcje z systemami zewnętrznymi, takimi jak interakcje typu human-in-the-loop lub inne operacje asynchroniczne.

Włączanie obsługi żądań i odpowiedzi w przepływie pracy

Żądania i odpowiedzi są obsługiwane za pośrednictwem specjalnego typu o nazwie InputPort.

// Create an input port that receives requests of type CustomRequestType and responses of type CustomResponseType.
var inputPort = InputPort.Create<CustomRequestType, CustomResponseType>("input-port");

Dodaj port wejściowy do przepływu pracy.

var executorA = new SomeExecutor();
var workflow = new WorkflowBuilder(inputPort)
    .AddEdge(inputPort, executorA)
    .AddEdge(executorA, inputPort)
    .Build<CustomRequestType>();

Teraz, ponieważ w procesie mamy executorA połączony z inputPort w obu kierunkach, executorA musi mieć możliwość wysyłania żądań i odbierania odpowiedzi za pośrednictwem inputPort. Oto, co musimy zrobić, SomeExecutor aby wysłać żądanie i otrzymać odpowiedź.

internal sealed class SomeExecutor() : Executor<CustomResponseType>("SomeExecutor")
{
    public async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
    {
        // Process the response...
        ...
        // Send a request
        await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
    }
}

Alternatywnie, SomeExecutor można rozdzielić wysyłanie żądań i obsługę odpowiedzi na dwa moduły obsługi.

internal sealed class SomeExecutor() : Executor("SomeExecutor")
{
    protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
    {
        return routeBuilder
            .AddHandler<CustomResponseType>(this.HandleCustomResponseAsync)
            .AddHandler<OtherDataType>(this.HandleOtherDataAsync);
    }

    public async ValueTask HandleCustomResponseAsync(CustomResponseType message, IWorkflowContext context)
    {
        // Process the response...
        ...
    }

    public async ValueTask HandleOtherDataAsync(OtherDataType message, IWorkflowContext context)
    {
        // Process the message...
        ...
        // Send a request
        await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
    }
}

Egzekutorzy mogą wysyłać żądania przy użyciu ctx.request_info() i obsługiwać odpowiedzi za pomocą @response_handler.

from agent_framework import response_handler, WorkflowBuilder

executor_a = SomeExecutor()
executor_b = SomeOtherExecutor()
workflow_builder = WorkflowBuilder()
workflow_builder.set_start_executor(executor_a)
workflow_builder.add_edge(executor_a, executor_b)
workflow = workflow_builder.build()

executor_a może wysyłać żądania i odbierać odpowiedzi bezpośrednio przy użyciu wbudowanych funkcji.

from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
    response_handler,
)

class SomeExecutor(Executor):

    @handler
    async def handle_data(
        self,
        data: OtherDataType,
        context: WorkflowContext,
    ):
        # Process the message...
        ...
        # Send a request using the API
        await context.request_info(
            request_data=CustomRequestType(...),
            response_type=CustomResponseType
        )

    @response_handler
    async def handle_response(
        self,
        original_request: CustomRequestType,
        response: CustomResponseType,
        context: WorkflowContext,
    ):
        # Process the response...
        ...

Dekorator @response_handler automatycznie rejestruje metodę do obsługi odpowiedzi dla określonych typów żądań i odpowiedzi.

Obsługa żądań i odpowiedzi

Element InputPort emituje RequestInfoEvent gdy otrzyma żądanie. Możesz zapisać się na te zdarzenia w celu obsługi żądań przychodzących z procesu pracy. Gdy otrzymasz odpowiedź z systemu zewnętrznego, wyślij ją z powrotem do przepływu pracy przy użyciu mechanizmu odpowiedzi. Platforma automatycznie kieruje odpowiedź do funkcji wykonawczej, która wysłała oryginalne żądanie.

StreamingRun handle = await InProcessExecution.StreamAsync(workflow, input).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            ExternalResponse response = requestInputEvt.Request.CreateResponse<CustomResponseType>(...);
            await handle.SendResponseAsync(response).ConfigureAwait(false);
            break;

        case WorkflowOutputEvent workflowOutputEvt:
            // The workflow has completed successfully
            Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
            return;
    }
}

Funkcje wykonawcze mogą wysyłać żądania bezpośrednio bez konieczności użycia oddzielnego składnika. Gdy wykonawca wywołuje ctx.request_info(), przepływ pracy emituje RequestInfoEvent. Możesz zapisać się na te zdarzenia w celu obsługi żądań przychodzących z procesu pracy. Gdy otrzymasz odpowiedź z systemu zewnętrznego, wyślij ją z powrotem do przepływu pracy przy użyciu mechanizmu odpowiedzi. Platforma automatycznie kieruje odpowiedź na metodę wykonawcy @response_handler .

from agent_framework import RequestInfoEvent

while True:
    request_info_events : list[RequestInfoEvent] = []
    pending_responses : dict[str, CustomResponseType] = {}

    stream = workflow.run_stream(input) if not pending_responses else workflow.send_responses_streaming(pending_responses)

    async for event in stream:
        if isinstance(event, RequestInfoEvent):
            # Handle `RequestInfoEvent` from the workflow
            request_info_events.append(event)

    if not request_info_events:
        break

    for request_info_event in request_info_events:
        # Handle `RequestInfoEvent` from the workflow
        response = CustomResponseType(...)
        pending_responses[request_info_event.request_id] = response

Punkty kontrolne i żądania

Aby dowiedzieć się więcej na temat punktów kontrolnych, zapoznaj się z tą stroną.

Po utworzeniu punktu kontrolnego oczekujące żądania są również zapisywane jako część stanu punktu kontrolnego. Po przywróceniu z punktu kontrolnego wszystkie oczekujące żądania będą wysyłane ponownie jako RequestInfoEvent obiekty, co umożliwia przechwytywanie i reagowanie na nie. Nie można podać odpowiedzi bezpośrednio podczas operacji wznawiania — zamiast tego należy nasłuchiwać ponownie emitowanych zdarzeń i reagować przy użyciu standardowego mechanizmu odpowiedzi.

Dalsze kroki