Compartir a través de


Flujos de trabajo de Microsoft Agent Framework: uso de flujos de trabajo como agentes

En este documento se proporciona información general sobre cómo usar flujos de trabajo como agentes en Microsoft Agent Framework.

Información general

A veces ha creado un flujo de trabajo sofisticado con varios agentes, ejecutores personalizados y lógica compleja, pero quiere usarlo igual que cualquier otro agente. Eso es exactamente lo que los agentes de flujo de trabajo le permiten hacer. Al envolver el flujo de trabajo como un Agent, puede interactuar con él mediante la misma API familiar que se usaría para un agente de chat simple.

Ventajas clave

  • Interfaz unificada: interacción con flujos de trabajo complejos mediante la misma API que agentes simples
  • Compatibilidad de API: integración de flujos de trabajo con sistemas existentes que admiten la interfaz del agente
  • Composición: Use agentes de flujo de trabajo como bloques de construcción en sistemas de agentes más grandes o en otros flujos de trabajo.
  • Administración de subprocesos: aproveche los subprocesos del agente para el estado de conversación, los puntos de control y la reanudación.
  • Compatibilidad con streaming: obtener actualizaciones en tiempo real a medida que se ejecuta el flujo de trabajo

Funcionamiento

Al convertir un flujo de trabajo en un agente:

  1. El flujo de trabajo se valida para asegurarse de que su ejecutor de inicio puede aceptar mensajes de chat.
  2. Se crea un subproceso para administrar el estado de la conversación y los puntos de control.
  3. Los mensajes de entrada se enrutan al ejecutor de inicio del flujo de trabajo.
  4. Los eventos de flujo de trabajo se convierten en actualizaciones de respuesta del agente
  5. Las solicitudes de entrada externas (de RequestInfoExecutor) se muestran como llamadas de función

Requisitos

Para usar un flujo de trabajo como agente, el ejecutor de inicio del flujo de trabajo debe poder manejar IEnumerable<ChatMessage> como entrada. Esto se satisface automáticamente al usar ChatClientAgent u otros ejecutores basados en agente.

Creación de un agente de flujo de trabajo

Use el método de AsAgent() extensión para convertir cualquier flujo de trabajo compatible en un agente:

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

// First, build your workflow
var workflow = AgentWorkflowBuilder
    .CreateSequentialPipeline(researchAgent, writerAgent, reviewerAgent)
    .Build();

// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAgent(
    id: "content-pipeline",
    name: "Content Pipeline Agent",
    description: "A multi-agent workflow that researches, writes, and reviews content"
);

Parámetros de AsAgent

Parámetro Tipo Description
id string? Identificador único opcional para el agente. Se genera automáticamente si no se proporciona.
name string? Nombre para mostrar opcional para el agente.
description string? Descripción opcional del propósito del agente.
checkpointManager CheckpointManager? Administrador de puntos de comprobación opcional para la persistencia entre sesiones.
executionEnvironment IWorkflowExecutionEnvironment? Entorno de ejecución opcional. El valor predeterminado es InProcessExecution.OffThread o InProcessExecution.Concurrent, según la configuración del flujo de trabajo.

Uso de agentes de flujo de trabajo

Creación de un hilo

Cada conversación con un agente de flujo de trabajo requiere un subproceso para administrar el estado:

// Create a new thread for the conversation
AgentThread thread = workflowAgent.GetNewThread();

Ejecución sin streaming

Para casos de uso sencillos en los que desee la respuesta completa:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

AgentRunResponse response = await workflowAgent.RunAsync(messages, thread);

foreach (ChatMessage message in response.Messages)
{
    Console.WriteLine($"{message.AuthorName}: {message.Text}");
}

Ejecución de streaming

Para las actualizaciones en tiempo real a medida que se ejecuta el flujo de trabajo:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Process streaming updates from each agent in the workflow
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.Write(update.Text);
    }
}

Control de solicitudes de entrada externas

Cuando un flujo de trabajo contiene ejecutores que solicitan entrada externa (mediante RequestInfoExecutor), estas solicitudes se muestran como llamadas de función en la respuesta del agente:

await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Check for function call requests
    foreach (AIContent content in update.Contents)
    {
        if (content is FunctionCallContent functionCall)
        {
            // Handle the external input request
            Console.WriteLine($"Workflow requests input: {functionCall.Name}");
            Console.WriteLine($"Request data: {functionCall.Arguments}");

            // Provide the response in the next message
        }
    }
}

Serialización y reanudación de hilos

Los subprocesos del agente de flujo de trabajo se pueden serializar para la persistencia y reanudarse más adelante:

// Serialize the thread state
JsonElement serializedThread = thread.Serialize();

// Store serializedThread to your persistence layer...

// Later, resume the thread
AgentThread resumedThread = workflowAgent.DeserializeThread(serializedThread);

// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedThread))
{
    Console.Write(update.Text);
}

Puntos de control con agentes de flujo de trabajo

Habilite los puntos de control para conservar el estado del flujo de trabajo en los reinicios del proceso:

// Create a checkpoint manager with your storage backend
var checkpointManager = new CheckpointManager(new FileCheckpointStorage("./checkpoints"));

// Create workflow agent with checkpointing enabled
AIAgent workflowAgent = workflow.AsAgent(
    id: "persistent-workflow",
    name: "Persistent Workflow Agent",
    checkpointManager: checkpointManager
);

Requisitos

Para usar un flujo de trabajo como agente, el ejecutor de inicio del flujo de trabajo debe poder controlarlo list[ChatMessage] como entrada. Esto se cumple automáticamente cuando se usa ChatAgent o AgentExecutor.

Creación de un agente de flujo de trabajo

Llame a as_agent() en cualquier flujo de trabajo compatible para convertirlo en un agente:

from agent_framework import WorkflowBuilder, ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

# Create your chat client and agents
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

researcher = ChatAgent(
    name="Researcher",
    instructions="Research and gather information on the given topic.",
    chat_client=chat_client,
)

writer = ChatAgent(
    name="Writer", 
    instructions="Write clear, engaging content based on research.",
    chat_client=chat_client,
)

# Build your workflow
workflow = (
    WorkflowBuilder()
    .set_start_executor(researcher)
    .add_edge(researcher, writer)
    .build()
)

# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")

parámetros de as_agent

Parámetro Tipo Description
name str | None Nombre para mostrar opcional para el agente. Se genera automáticamente si no se proporciona.

Uso de agentes de flujo de trabajo

Creación de un hilo

Cada conversación con un agente de flujo de trabajo requiere un subproceso para administrar el estado:

# Create a new thread for the conversation
thread = workflow_agent.get_new_thread()

Ejecución sin streaming

Para casos de uso sencillos en los que desee la respuesta completa:

from agent_framework import ChatMessage, Role

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

response = await workflow_agent.run(messages, thread=thread)

for message in response.messages:
    print(f"{message.author_name}: {message.text}")

Ejecución de streaming

Para las actualizaciones en tiempo real a medida que se ejecuta el flujo de trabajo:

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

async for update in workflow_agent.run_stream(messages, thread=thread):
    # Process streaming updates from each agent in the workflow
    if update.text:
        print(update.text, end="", flush=True)

Control de solicitudes de entrada externas

Cuando un flujo de trabajo contiene ejecutores que solicitan entrada externa (mediante RequestInfoExecutor), estas solicitudes se muestran como llamadas de función. El agente de flujo de trabajo realiza un seguimiento de las solicitudes pendientes y espera respuestas antes de continuar:

from agent_framework import (
    FunctionCallContent,
    FunctionApprovalRequestContent,
    FunctionApprovalResponseContent,
)

async for update in workflow_agent.run_stream(messages, thread=thread):
    for content in update.contents:
        if isinstance(content, FunctionApprovalRequestContent):
            # The workflow is requesting external input
            request_id = content.id
            function_call = content.function_call

            print(f"Workflow requests input: {function_call.name}")
            print(f"Request data: {function_call.arguments}")

            # Store the request_id to provide a response later

# Check for pending requests
if workflow_agent.pending_requests:
    print(f"Pending requests: {list(workflow_agent.pending_requests.keys())}")

Proporcionar respuestas a solicitudes pendientes

Para continuar la ejecución del flujo de trabajo después de una solicitud de entrada externa:

# Create a response for the pending request
response_content = FunctionApprovalResponseContent(
    id=request_id,
    function_call=function_call,
    approved=True,
)

response_message = ChatMessage(
    role=Role.USER,
    contents=[response_content],
)

# Continue the workflow with the response
async for update in workflow_agent.run_stream([response_message], thread=thread):
    if update.text:
        print(update.text, end="", flush=True)

Ejemplo completo

Este es un ejemplo completo que muestra un agente de flujo de trabajo con salida de streaming:

import asyncio
from agent_framework import (
    ChatAgent,
    ChatMessage,
    Role,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework._workflows import SequentialBuilder
from azure.identity import AzureCliCredential


async def main():
    # Set up the chat client
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Create specialized agents
    researcher = ChatAgent(
        name="Researcher",
        instructions="Research the given topic and provide key facts.",
        chat_client=chat_client,
    )

    writer = ChatAgent(
        name="Writer",
        instructions="Write engaging content based on the research provided.",
        chat_client=chat_client,
    )

    reviewer = ChatAgent(
        name="Reviewer",
        instructions="Review the content and provide a final polished version.",
        chat_client=chat_client,
    )

    # Build a sequential workflow
    workflow = (
        SequentialBuilder()
        .add_agents([researcher, writer, reviewer])
        .build()
    )

    # Convert to a workflow agent
    workflow_agent = workflow.as_agent(name="Content Creation Pipeline")

    # Create a thread and run the workflow
    thread = workflow_agent.get_new_thread()
    messages = [ChatMessage(role=Role.USER, content="Write about quantum computing")]

    print("Starting workflow...")
    print("=" * 60)

    current_author = None
    async for update in workflow_agent.run_stream(messages, thread=thread):
        # Show when different agents are responding
        if update.author_name and update.author_name != current_author:
            if current_author:
                print("\n" + "-" * 40)
            print(f"\n[{update.author_name}]:")
            current_author = update.author_name

        if update.text:
            print(update.text, end="", flush=True)

    print("\n" + "=" * 60)
    print("Workflow completed!")


if __name__ == "__main__":
    asyncio.run(main())

Comprensión de la conversión de eventos

Cuando un flujo de trabajo se ejecuta como agente, los eventos de flujo de trabajo se convierten en respuestas del agente. El tipo de respuesta depende del método que use:

  • run(): devuelve un objeto AgentRunResponse que contiene el resultado completo una vez finalizado el flujo de trabajo.
  • run_stream(): produce AgentRunResponseUpdate objetos a medida que se ejecuta el flujo de trabajo, lo que proporciona actualizaciones en tiempo real.

Durante la ejecución, los eventos de flujo de trabajo internos se asignan a las respuestas del agente de la siguiente manera:

Evento de flujo de trabajo Respuesta del agente
AgentRunUpdateEvent Pasado como AgentRunResponseUpdate (streaming) o agregado a AgentRunResponse (sin streaming)
RequestInfoEvent Convertido en FunctionCallContent y FunctionApprovalRequestContent
Otros eventos Incluido en raw_representation para la observabilidad

Esta conversión le permite usar la interfaz del agente estándar mientras sigue teniendo acceso a información detallada del flujo de trabajo cuando sea necesario.

Casos de uso

1. Canalizaciones de agente complejas

Encapsular un flujo de trabajo de varios agentes como un único agente para su uso en aplicaciones:

User Request --> [Workflow Agent] --> Final Response
                      |
                      +-- Researcher Agent
                      +-- Writer Agent  
                      +-- Reviewer Agent

2. Composición del agente

Use agentes de flujo de trabajo como componentes en sistemas más grandes:

  • Otro agente puede usar un agente de flujo de trabajo como herramienta
  • Se pueden organizar varios agentes de flujo de trabajo juntos
  • Los agentes de flujo de trabajo se pueden anidar dentro de otros flujos de trabajo

3. Integración de API

Exponga flujos de trabajo complejos a través de APIs que esperan la interfaz del Agente estándar, lo que permite:

  • Interfaces de chat que usan flujos de trabajo de back-end sofisticados
  • Integración con sistemas existentes basados en agentes
  • Migración gradual de agentes simples a flujos de trabajo complejos

Pasos siguientes