다음을 통해 공유


Microsoft 에이전트 프레임워크 워크플로 - 요청 및 응답

이 페이지에서는 Microsoft 에이전트 프레임워크 워크플로 시스템에서 요청 및 응답 처리가 작동하는 방식에 대한 개요를 제공합니다.

개요

워크플로의 실행기는 워크플로 외부로 요청을 보내고 응답을 기다릴 수 있습니다. 이는 실행기가 휴먼 인 더 루프 상호 작용 또는 기타 비동기 작업과 같은 외부 시스템과 상호 작용해야 하는 시나리오에 유용합니다.

워크플로에서 요청 및 응답 처리 사용

요청 및 응답은 InputPort라는 특수 형식을 통해 처리됩니다.

// Create an input port that receives requests of type CustomRequestType and responses of type CustomResponseType.
var inputPort = InputPort.Create<CustomRequestType, CustomResponseType>("input-port");

워크플로에 입력 포트를 추가합니다.

var executorA = new SomeExecutor();
var workflow = new WorkflowBuilder(inputPort)
    .AddEdge(inputPort, executorA)
    .AddEdge(executorA, inputPort)
    .Build<CustomRequestType>();

이제 워크플로에서 executorAinputPort가 양방향으로 연결되어 있으므로, executorAinputPort를 통해 요청을 보내고 응답을 받을 수 있어야 합니다. 다음은 요청을 보내고 응답을 받기 위해 수행해야 하는 작업 SomeExecutor 입니다.

internal sealed class SomeExecutor() : Executor<CustomResponseType>("SomeExecutor")
{
    public async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
    {
        // Process the response...
        ...
        // Send a request
        await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
    }
}

SomeExecutor 또는 요청 전송 및 응답 처리를 두 개의 처리기로 구분할 수 있습니다.

internal sealed class SomeExecutor() : Executor("SomeExecutor")
{
    protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
    {
        return routeBuilder
            .AddHandler<CustomResponseType>(this.HandleCustomResponseAsync)
            .AddHandler<OtherDataType>(this.HandleOtherDataAsync);
    }

    public async ValueTask HandleCustomResponseAsync(CustomResponseType message, IWorkflowContext context)
    {
        // Process the response...
        ...
    }

    public async ValueTask HandleOtherDataAsync(OtherDataType message, IWorkflowContext context)
    {
        // Process the message...
        ...
        // Send a request
        await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
    }
}

실행기는 .를 사용하여 ctx.request_info() 요청을 보내고 응답을 @response_handler처리할 수 있습니다.

from agent_framework import response_handler, WorkflowBuilder

executor_a = SomeExecutor()
executor_b = SomeOtherExecutor()
workflow_builder = WorkflowBuilder()
workflow_builder.set_start_executor(executor_a)
workflow_builder.add_edge(executor_a, executor_b)
workflow = workflow_builder.build()

executor_a 는 기본 제공 기능을 사용하여 요청을 보내고 직접 응답을 받을 수 있습니다.

from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
    response_handler,
)

class SomeExecutor(Executor):

    @handler
    async def handle_data(
        self,
        data: OtherDataType,
        context: WorkflowContext,
    ):
        # Process the message...
        ...
        # Send a request using the API
        await context.request_info(
            request_data=CustomRequestType(...),
            response_type=CustomResponseType
        )

    @response_handler
    async def handle_response(
        self,
        original_request: CustomRequestType,
        response: CustomResponseType,
        context: WorkflowContext,
    ):
        # Process the response...
        ...

데코레이터는 @response_handler 지정된 요청 및 응답 형식에 대한 응답을 처리하는 메서드를 자동으로 등록합니다.

요청 및 응답 처리

요청을 InputPort 받으면 RequestInfoEvent를 내보낸다. 이러한 이벤트를 구독하여 워크플로에서 들어오는 요청을 처리할 수 있습니다. 외부 시스템에서 응답을 받으면 응답 메커니즘을 사용하여 워크플로로 다시 보냅니다. 프레임워크는 원래 요청을 보낸 실행기로 응답을 자동으로 라우팅합니다.

StreamingRun handle = await InProcessExecution.StreamAsync(workflow, input).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            ExternalResponse response = requestInputEvt.Request.CreateResponse<CustomResponseType>(...);
            await handle.SendResponseAsync(response).ConfigureAwait(false);
            break;

        case WorkflowOutputEvent workflowOutputEvt:
            // The workflow has completed successfully
            Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
            return;
    }
}

실행기는 별도의 구성 요소 없이 요청을 직접 보낼 수 있습니다. 실행기가 ctx.request_info()를 호출하면 워크플로는 RequestInfoEvent를 내보낸다. 이러한 이벤트를 구독하여 워크플로에서 들어오는 요청을 처리할 수 있습니다. 외부 시스템에서 응답을 받으면 응답 메커니즘을 사용하여 워크플로로 다시 보냅니다. 프레임워크는 자동으로 응답을 실행자의 @response_handler 메서드로 라우팅합니다.

from agent_framework import RequestInfoEvent

while True:
    request_info_events : list[RequestInfoEvent] = []
    pending_responses : dict[str, CustomResponseType] = {}

    stream = workflow.run_stream(input) if not pending_responses else workflow.send_responses_streaming(pending_responses)

    async for event in stream:
        if isinstance(event, RequestInfoEvent):
            # Handle `RequestInfoEvent` from the workflow
            request_info_events.append(event)

    if not request_info_events:
        break

    for request_info_event in request_info_events:
        # Handle `RequestInfoEvent` from the workflow
        response = CustomResponseType(...)
        pending_responses[request_info_event.request_id] = response

검사점 및 요청

검사점에 대한 자세한 내용은 이 페이지를 참조하세요.

검사점을 만들 때 보류 중인 요청도 검사점 상태의 일부로 저장됩니다. 검사점에서 복원하면 보류 중인 모든 요청이 RequestInfoEvent 개체로 다시 내보내져서 이를 캡처하고 응답할 수 있습니다. 다시 시작 작업 중에 직접 응답을 제공할 수 없습니다. 대신 다시 내보내는 이벤트를 수신 대기하고 표준 응답 메커니즘을 사용하여 응답해야 합니다.

다음 단계