Compartir a través de


Almacenamiento del historial de chats en almacenamiento de terceros

En este tutorial se muestra cómo almacenar el historial de chat del agente en almacenamiento externo mediante la implementación de un elemento personalizado ChatMessageStore y su uso con un ChatClientAgent.

De forma predeterminada, cuando se usa ChatClientAgent, el historial de chat se almacena en memoria en el AgentThread objeto o en el servicio de inferencia subyacente, si el servicio lo admite.

Cuando los servicios no requieren que el historial de chat se almacene en el servicio, es posible proporcionar un almacén personalizado para conservar el historial de chat en lugar de confiar en el comportamiento predeterminado en memoria.

Prerrequisitos

Para conocer los requisitos previos, consulte el paso Crear y ejecutar un agente sencillo en este tutorial.

Instalación de paquetes NuGet

Para usar Microsoft Agent Framework con Azure OpenAI, debe instalar los siguientes paquetes NuGet:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.OpenAI --prerelease

Además, usará el almacén de vectores en memoria para almacenar los mensajes de chat.

dotnet add package Microsoft.SemanticKernel.Connectors.InMemory --prerelease

Creación de un almacén de ChatMessage personalizado

Para crear un personalizado ChatMessageStore, debe implementar la clase abstracta ChatMessageStore y proporcionar implementaciones para los métodos necesarios.

Métodos de recuperación y almacenamiento de mensajes

Los métodos más importantes para implementar son:

  • AddMessagesAsync: se llama para agregar nuevos mensajes al almacenamiento.
  • GetMessagesAsync - se llama para recuperar los mensajes del almacén.

GetMessagesAsync debe devolver los mensajes en orden cronológico ascendente. Todos los mensajes devueltos por ChatClientAgent se usarán al realizar llamadas al IChatClient subyacente. Por lo tanto, es importante que este método tenga en cuenta los límites del modelo subyacente y solo devuelve tantos mensajes como pueda controlar el modelo.

Cualquier lógica de reducción del historial de chat, como el resumen o el recorte, debe realizarse antes de devolver mensajes de GetMessagesAsync.

Serialización

ChatMessageStore Las instancias se crean y se adjuntan a un AgentThread cuando se crea el subproceso y cuando se reanuda un subproceso desde un estado serializado.

Aunque los mensajes reales que componen el historial de chat se almacenan externamente, es posible que la ChatMessageStore instancia tenga que almacenar claves u otro estado para identificar el historial de chat en el almacén externo.

Para permitir hilos persistentes, debe implementar el método de la clase SerializeStateAsyncChatMessageStore. También debe proporcionar un constructor que tome un parámetro JsonElement, que se puede usar para deserializar el estado al reanudar un hilo de ejecución.

Implementación de ChatMessageStore de ejemplo

La siguiente implementación de ejemplo almacena mensajes de chat en un almacén de vectores.

AddMessagesAsync inserta o actualiza mensajes en el almacenamiento vectorial, utilizando una clave única para cada mensaje.

GetMessagesAsync recupera los mensajes del hilo actual del almacén de vectores, los ordena por marca de tiempo y luego los devuelve en orden ascendente.

Cuando se recibe el primer mensaje, el almacén genera una clave única para el hilo, que luego se usa para identificar el historial de chat en el almacenamiento de vectores para las llamadas posteriores.

La clave única se almacena en la ThreadDbKey propiedad, que se serializa y deserializa mediante el SerializeStateAsync método y el constructor que toma un JsonElement. Por lo tanto, esta clave se conservará como parte del AgentThread estado, lo que permite que el subproceso se reanude más adelante y siga usando el mismo historial de chat.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.VectorData;
using Microsoft.SemanticKernel.Connectors.InMemory;

internal sealed class VectorChatMessageStore : ChatMessageStore
{
    private readonly VectorStore _vectorStore;

    public VectorChatMessageStore(
        VectorStore vectorStore,
        JsonElement serializedStoreState,
        JsonSerializerOptions? jsonSerializerOptions = null)
    {
        this._vectorStore = vectorStore ?? throw new ArgumentNullException(nameof(vectorStore));
        if (serializedStoreState.ValueKind is JsonValueKind.String)
        {
            this.ThreadDbKey = serializedStoreState.Deserialize<string>();
        }
    }

    public string? ThreadDbKey { get; private set; }

    public override async Task AddMessagesAsync(
        IEnumerable<ChatMessage> messages,
        CancellationToken cancellationToken)
    {
        this.ThreadDbKey ??= Guid.NewGuid().ToString("N");
        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);
        await collection.UpsertAsync(messages.Select(x => new ChatHistoryItem()
        {
            Key = this.ThreadDbKey + x.MessageId,
            Timestamp = DateTimeOffset.UtcNow,
            ThreadId = this.ThreadDbKey,
            SerializedMessage = JsonSerializer.Serialize(x),
            MessageText = x.Text
        }), cancellationToken);
    }

    public override async Task<IEnumerable<ChatMessage>> GetMessagesAsync(
        CancellationToken cancellationToken)
    {
        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);
        var records = collection
            .GetAsync(
                x => x.ThreadId == this.ThreadDbKey, 10,
                new() { OrderBy = x => x.Descending(y => y.Timestamp) },
                cancellationToken);

        List<ChatMessage> messages = [];
        await foreach (var record in records)
        {
            messages.Add(JsonSerializer.Deserialize<ChatMessage>(record.SerializedMessage!)!);
        }

        messages.Reverse();
        return messages;
    }

    public override JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptions = null) =>
        // We have to serialize the thread id, so that on deserialization you can retrieve the messages using the same thread id.
        JsonSerializer.SerializeToElement(this.ThreadDbKey);

    private sealed class ChatHistoryItem
    {
        [VectorStoreKey]
        public string? Key { get; set; }
        [VectorStoreData]
        public string? ThreadId { get; set; }
        [VectorStoreData]
        public DateTimeOffset? Timestamp { get; set; }
        [VectorStoreData]
        public string? SerializedMessage { get; set; }
        [VectorStoreData]
        public string? MessageText { get; set; }
    }
}

Uso del ChatMessageStore personalizado con ChatClientAgent

Para usar la configuración personalizada ChatMessageStore, debe proporcionar un ChatMessageStoreFactory al crear el agente. Esta factoría permite al agente crear una nueva instancia del ChatMessageStore deseado para cada subproceso.

Al crear un ChatClientAgent, es posible proporcionar un objeto ChatClientAgentOptions que permite incluir ChatMessageStoreFactory además de todas las demás opciones del agente.

using Azure.AI.OpenAI;
using Azure.Identity;
using OpenAI;

AIAgent agent = new AzureOpenAIClient(
    new Uri("https://<myresource>.openai.azure.com"),
    new AzureCliCredential())
     .GetChatClient("gpt-4o-mini")
     .CreateAIAgent(new ChatClientAgentOptions
     {
         Name = "Joker",
         Instructions = "You are good at telling jokes.",
         ChatMessageStoreFactory = ctx =>
         {
             // Create a new chat message store for this agent that stores the messages in a vector store.
             return new VectorChatMessageStore(
                new InMemoryVectorStore(),
                ctx.SerializedState,
                ctx.JsonSerializerOptions);
         }
     });

En este tutorial se muestra cómo almacenar el historial de chat del agente en almacenamiento externo mediante la implementación de un elemento personalizado ChatMessageStore y su uso con un ChatAgent.

De forma predeterminada, cuando se usa ChatAgent, el historial de chat se almacena en memoria en el AgentThread objeto o en el servicio de inferencia subyacente, si el servicio lo admite.

Cuando los servicios no requieren o no son capaces de almacenar el historial de chat en el servicio, es posible proporcionar un almacén personalizado para conservar el historial de chat en lugar de confiar en el comportamiento predeterminado en memoria.

Prerrequisitos

Para conocer los requisitos previos, consulte el paso Crear y ejecutar un agente sencillo en este tutorial.

Creación de un almacén de ChatMessage personalizado

Para crear un personalizado ChatMessageStore, debe implementar el ChatMessageStore protocolo y proporcionar implementaciones para los métodos necesarios.

Métodos de recuperación y almacenamiento de mensajes

Los métodos más importantes para implementar son:

  • add_messages: se llama para agregar nuevos mensajes al almacenamiento.
  • list_messages - se llama para recuperar los mensajes del almacén.

list_messages debe devolver los mensajes en orden cronológico ascendente. Todos los mensajes devueltos por ChatAgent se usarán al realizar llamadas al cliente de chat subyacente. Por lo tanto, es importante que este método tenga en cuenta los límites del modelo subyacente y solo devuelve tantos mensajes como pueda controlar el modelo.

Cualquier lógica de reducción del historial de chat, como el resumen o el recorte, debe realizarse antes de devolver mensajes de list_messages.

Serialización

ChatMessageStore Las instancias se crean y se adjuntan a un AgentThread cuando se crea el subproceso y cuando se reanuda un subproceso desde un estado serializado.

Aunque los mensajes reales que componen el historial de chat se almacenan externamente, es posible que la ChatMessageStore instancia tenga que almacenar claves u otro estado para identificar el historial de chat en el almacén externo.

Para permitir subprocesos persistentes, debe implementar los métodos serialize_state y deserialize_state del protocolo ChatMessageStore. Estos métodos permiten que el estado de almacenamiento se conserve y restaure cuando se reanuda un subproceso.

Implementación de ChatMessageStore de ejemplo

La siguiente implementación de ejemplo almacena mensajes de chat en Redis mediante la estructura de datos Listas de Redis.

En add_messages, almacena los mensajes en Redis mediante RPUSH para anexarlos al final de la lista en orden cronológico.

list_messages recupera los mensajes del subproceso actual de Redis mediante LRANGE y los devuelve en orden cronológico ascendente.

Cuando se recibe el primer mensaje, la tienda genera una clave única para el hilo, que luego se utiliza para identificar el historial de chat en Redis en las siguientes llamadas.

La clave única y otras configuraciones se almacenan y se pueden serializar y deserializar mediante los métodos serialize_state y deserialize_state. Por lo tanto, este estado se conservará como parte del AgentThread estado, lo que permite que el subproceso se reanude más adelante y siga usando el mismo historial de chat.

from collections.abc import Sequence
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
import json
import redis.asyncio as redis
from agent_framework import ChatMessage


class RedisStoreState(BaseModel):
    """State model for serializing and deserializing Redis chat message store data."""

    thread_id: str
    redis_url: str | None = None
    key_prefix: str = "chat_messages"
    max_messages: int | None = None


class RedisChatMessageStore:
    """Redis-backed implementation of ChatMessageStore using Redis Lists."""

    def __init__(
        self,
        redis_url: str | None = None,
        thread_id: str | None = None,
        key_prefix: str = "chat_messages",
        max_messages: int | None = None,
    ) -> None:
        """Initialize the Redis chat message store.

        Args:
            redis_url: Redis connection URL (for example, "redis://localhost:6379").
            thread_id: Unique identifier for this conversation thread.
                      If not provided, a UUID will be auto-generated.
            key_prefix: Prefix for Redis keys to namespace different applications.
            max_messages: Maximum number of messages to retain in Redis.
                         When exceeded, oldest messages are automatically trimmed.
        """
        if redis_url is None:
            raise ValueError("redis_url is required for Redis connection")

        self.redis_url = redis_url
        self.thread_id = thread_id or f"thread_{uuid4()}"
        self.key_prefix = key_prefix
        self.max_messages = max_messages

        # Initialize Redis client
        self._redis_client = redis.from_url(redis_url, decode_responses=True)

    @property
    def redis_key(self) -> str:
        """Get the Redis key for this thread's messages."""
        return f"{self.key_prefix}:{self.thread_id}"

    async def add_messages(self, messages: Sequence[ChatMessage]) -> None:
        """Add messages to the Redis store.

        Args:
            messages: Sequence of ChatMessage objects to add to the store.
        """
        if not messages:
            return

        # Serialize messages and add to Redis list
        serialized_messages = [self._serialize_message(msg) for msg in messages]
        await self._redis_client.rpush(self.redis_key, *serialized_messages)

        # Apply message limit if configured
        if self.max_messages is not None:
            current_count = await self._redis_client.llen(self.redis_key)
            if current_count > self.max_messages:
                # Keep only the most recent max_messages using LTRIM
                await self._redis_client.ltrim(self.redis_key, -self.max_messages, -1)

    async def list_messages(self) -> list[ChatMessage]:
        """Get all messages from the store in chronological order.

        Returns:
            List of ChatMessage objects in chronological order (oldest first).
        """
        # Retrieve all messages from Redis list (oldest to newest)
        redis_messages = await self._redis_client.lrange(self.redis_key, 0, -1)

        messages = []
        for serialized_message in redis_messages:
            message = self._deserialize_message(serialized_message)
            messages.append(message)

        return messages

    async def serialize_state(self, **kwargs: Any) -> Any:
        """Serialize the current store state for persistence.

        Returns:
            Dictionary containing serialized store configuration.
        """
        state = RedisStoreState(
            thread_id=self.thread_id,
            redis_url=self.redis_url,
            key_prefix=self.key_prefix,
            max_messages=self.max_messages,
        )
        return state.model_dump(**kwargs)

    async def deserialize_state(self, serialized_store_state: Any, **kwargs: Any) -> None:
        """Deserialize state data into this store instance.

        Args:
            serialized_store_state: Previously serialized state data.
            **kwargs: Additional arguments for deserialization.
        """
        if serialized_store_state:
            state = RedisStoreState.model_validate(serialized_store_state, **kwargs)
            self.thread_id = state.thread_id
            self.key_prefix = state.key_prefix
            self.max_messages = state.max_messages

            # Recreate Redis client if the URL changed
            if state.redis_url and state.redis_url != self.redis_url:
                self.redis_url = state.redis_url
                self._redis_client = redis.from_url(self.redis_url, decode_responses=True)

    def _serialize_message(self, message: ChatMessage) -> str:
        """Serialize a ChatMessage to JSON string."""
        message_dict = message.model_dump()
        return json.dumps(message_dict, separators=(",", ":"))

    def _deserialize_message(self, serialized_message: str) -> ChatMessage:
        """Deserialize a JSON string to ChatMessage."""
        message_dict = json.loads(serialized_message)
        return ChatMessage.model_validate(message_dict)

    async def clear(self) -> None:
        """Remove all messages from the store."""
        await self._redis_client.delete(self.redis_key)

    async def aclose(self) -> None:
        """Close the Redis connection."""
        await self._redis_client.aclose()

Uso de ChatMessageStore personalizado con un ChatAgent

Para usar la configuración personalizada ChatMessageStore, debe proporcionar un chat_message_store_factory al crear el agente. Esta factoría permite al agente crear una nueva instancia del ChatMessageStore deseado para cada subproceso.

Al crear un ChatAgent, puede proporcionar el chat_message_store_factory parámetro además de todas las demás opciones del agente.

from azure.identity import AzureCliCredential
from agent_framework import ChatAgent
from agent_framework.openai import AzureOpenAIChatClient

# Create the chat agent with custom message store factory
agent = ChatAgent(
    chat_client=AzureOpenAIChatClient(
        endpoint="https://<myresource>.openai.azure.com",
        credential=AzureCliCredential(),
        ai_model_id="gpt-4o-mini"
    ),
    name="Joker",
    instructions="You are good at telling jokes.",
    chat_message_store_factory=lambda: RedisChatMessageStore(
        redis_url="redis://localhost:6379"
    )
)

# Use the agent with persistent chat history
thread = agent.get_new_thread()
response = await agent.run("Tell me a joke about pirates", thread=thread)
print(response.text)

Pasos siguientes