Udostępnij przez


Przechowywanie historii czatów w zewnętrznym przechowywaniu danych

W tym samouczku pokazano, jak przechowywać historię czatów agentów w magazynie zewnętrznym przez zaimplementowanie niestandardowego ChatMessageStore obiektu i użycie go za pomocą elementu ChatClientAgent.

Domyślnie, przy korzystaniu z ChatClientAgent, historia czatów jest przechowywana w pamięci w obiekcie AgentThread lub w podstawowej usłudze wnioskowania, jeżeli ta usługa ją obsługuje.

Jeśli usługi nie wymagają przechowywania historii czatów w usłudze, można udostępnić magazyn niestandardowy do utrwalania historii czatów zamiast polegać na domyślnym zachowaniu w pamięci.

Wymagania wstępne

Aby uzyskać wymagania wstępne, zobacz krok Tworzenie i uruchamianie prostego agenta w tym samouczku.

Instalowanie pakietów NuGet

Aby używać programu Microsoft Agent Framework z usługą Azure OpenAI, należy zainstalować następujące pakiety NuGet:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.OpenAI --prerelease

Ponadto będziesz używać magazynu wektorów w pamięci do przechowywania wiadomości czatu.

dotnet add package Microsoft.SemanticKernel.Connectors.InMemory --prerelease

Utwórz niestandardowe repozytorium ChatMessage

Aby utworzyć niestandardowy ChatMessageStoreelement, należy zaimplementować klasę abstrakcyjną ChatMessageStore i udostępnić implementacje dla wymaganych metod.

Metody przechowywania i pobierania komunikatów

Najważniejsze metody implementacji to:

  • AddMessagesAsync — wywoływana w celu dodania nowych komunikatów do sklepu.
  • GetMessagesAsync - wywoływane w celu pobrania komunikatów ze sklepu.

GetMessagesAsync powinna zwrócić komunikaty w kolejności chronologicznej rosnącej. Wszystkie komunikaty zwrócone przez niego będą używane przez ChatClientAgent podczas wykonywania wywołań do bazowego elementu IChatClient. Dlatego ważne jest, aby ta metoda uwzględniała limity modelu bazowego i zwraca tylko tyle komunikatów, ile można obsłużyć przez model.

Przed zwróceniem wiadomości z programu GetMessagesAsyncnależy wykonać dowolną logikę redukcji historii czatów, taką jak podsumowywanie lub przycinanie.

Serializacja

ChatMessageStore wystąpienia są tworzone i dołączane do elementu AgentThread po utworzeniu wątku oraz po wznowieniu wątku ze stanu zserializowanego.

Podczas gdy rzeczywiste wiadomości tworzące historię czatów są przechowywane zewnętrznie, ChatMessageStore wystąpienie może wymagać przechowywania kluczy lub innych danych w celu identyfikacji historii czatu w repozytorium zewnętrznym.

Aby umożliwić utrwalanie wątków, należy zaimplementować metodę SerializeStateAsync klasy ChatMessageStore. Należy również dostarczyć konstruktor przyjmujący parametr JsonElement, który umożliwia deserializację stanu podczas wznawiania wątku.

Przykładowa implementacja chatMessageStore

Poniższa przykładowa implementacja przechowuje wiadomości czatu w sklepie wektorów.

AddMessagesAsync upserts wiadomości do magazynu wektorów z wykorzystaniem unikatowego klucza dla każdej wiadomości.

GetMessagesAsync pobiera komunikaty dla bieżącego wątku z magazynu wektorów, porządkuje je według znacznika czasu i zwraca je w kolejności rosnącej.

Po odebraniu pierwszej wiadomości magazyn generuje unikatowy klucz dla wątku, który jest następnie używany do identyfikowania historii czatów w wektorowym magazynie danych w kolejnych wywołaniach.

Unikatowy klucz jest przechowywany we ThreadDbKey właściwości, która jest serializowana i deserializowana przy użyciu metody SerializeStateAsync oraz konstruktora, który przyjmuje JsonElement. Ten klucz zostanie zatem utrwalony w stanie AgentThread, co umożliwi wznowienie wątku później i kontynuowanie korzystania z tej samej historii czatu.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.VectorData;
using Microsoft.SemanticKernel.Connectors.InMemory;

internal sealed class VectorChatMessageStore : ChatMessageStore
{
    private readonly VectorStore _vectorStore;

    public VectorChatMessageStore(
        VectorStore vectorStore,
        JsonElement serializedStoreState,
        JsonSerializerOptions? jsonSerializerOptions = null)
    {
        this._vectorStore = vectorStore ?? throw new ArgumentNullException(nameof(vectorStore));
        if (serializedStoreState.ValueKind is JsonValueKind.String)
        {
            this.ThreadDbKey = serializedStoreState.Deserialize<string>();
        }
    }

    public string? ThreadDbKey { get; private set; }

    public override async Task AddMessagesAsync(
        IEnumerable<ChatMessage> messages,
        CancellationToken cancellationToken)
    {
        this.ThreadDbKey ??= Guid.NewGuid().ToString("N");
        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);
        await collection.UpsertAsync(messages.Select(x => new ChatHistoryItem()
        {
            Key = this.ThreadDbKey + x.MessageId,
            Timestamp = DateTimeOffset.UtcNow,
            ThreadId = this.ThreadDbKey,
            SerializedMessage = JsonSerializer.Serialize(x),
            MessageText = x.Text
        }), cancellationToken);
    }

    public override async Task<IEnumerable<ChatMessage>> GetMessagesAsync(
        CancellationToken cancellationToken)
    {
        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);
        var records = collection
            .GetAsync(
                x => x.ThreadId == this.ThreadDbKey, 10,
                new() { OrderBy = x => x.Descending(y => y.Timestamp) },
                cancellationToken);

        List<ChatMessage> messages = [];
        await foreach (var record in records)
        {
            messages.Add(JsonSerializer.Deserialize<ChatMessage>(record.SerializedMessage!)!);
        }

        messages.Reverse();
        return messages;
    }

    public override JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptions = null) =>
        // We have to serialize the thread id, so that on deserialization you can retrieve the messages using the same thread id.
        JsonSerializer.SerializeToElement(this.ThreadDbKey);

    private sealed class ChatHistoryItem
    {
        [VectorStoreKey]
        public string? Key { get; set; }
        [VectorStoreData]
        public string? ThreadId { get; set; }
        [VectorStoreData]
        public DateTimeOffset? Timestamp { get; set; }
        [VectorStoreData]
        public string? SerializedMessage { get; set; }
        [VectorStoreData]
        public string? MessageText { get; set; }
    }
}

Używanie niestandardowego obiektu ChatMessageStore z agentem ChatClientAgent

Aby użyć niestandardowego ChatMessageStoreelementu , należy podać element ChatMessageStoreFactory podczas tworzenia agenta. Ta fabryka pozwala agentowi utworzyć nową instancję pożądanego ChatMessageStore dla każdego wątku.

Podczas tworzenia ChatClientAgent obiektu można podać ChatClientAgentOptions obiekt, który umożliwia udostępnianie ChatMessageStoreFactory elementu oprócz wszystkich innych opcji agenta.

using Azure.AI.OpenAI;
using Azure.Identity;
using OpenAI;

AIAgent agent = new AzureOpenAIClient(
    new Uri("https://<myresource>.openai.azure.com"),
    new AzureCliCredential())
     .GetChatClient("gpt-4o-mini")
     .CreateAIAgent(new ChatClientAgentOptions
     {
         Name = "Joker",
         Instructions = "You are good at telling jokes.",
         ChatMessageStoreFactory = ctx =>
         {
             // Create a new chat message store for this agent that stores the messages in a vector store.
             return new VectorChatMessageStore(
                new InMemoryVectorStore(),
                ctx.SerializedState,
                ctx.JsonSerializerOptions);
         }
     });

W tym samouczku pokazano, jak przechowywać historię czatów agentów w magazynie zewnętrznym przez zaimplementowanie niestandardowego ChatMessageStore obiektu i użycie go za pomocą elementu ChatAgent.

Domyślnie, przy korzystaniu z ChatAgent, historia czatów jest przechowywana w pamięci w obiekcie AgentThread lub w podstawowej usłudze wnioskowania, jeżeli ta usługa ją obsługuje.

Jeśli usługi nie wymagają historii czatów lub nie mogą być przechowywane w usłudze, można udostępnić magazyn niestandardowy do utrwalania historii czatów zamiast polegać na domyślnym zachowaniu w pamięci.

Wymagania wstępne

Aby uzyskać wymagania wstępne, zobacz krok Tworzenie i uruchamianie prostego agenta w tym samouczku.

Utwórz niestandardowe repozytorium ChatMessage

Aby utworzyć niestandardowy ChatMessageStore element, należy zaimplementować protokół ChatMessageStore i dostarczyć implementacje dla wymaganych metod.

Metody przechowywania i pobierania komunikatów

Najważniejsze metody implementacji to:

  • add_messages — wywoływana w celu dodania nowych komunikatów do sklepu.
  • list_messages - wywoływane w celu pobrania komunikatów ze sklepu.

list_messages powinna zwrócić komunikaty w kolejności chronologicznej rosnącej. Wszystkie komunikaty zwrócone przez ChatAgent będą używane podczas wykonywania wywołań do bazowego klienta czatu. Dlatego ważne jest, aby ta metoda uwzględniała limity modelu bazowego i zwraca tylko tyle komunikatów, ile można obsłużyć przez model.

Przed zwróceniem wiadomości z programu list_messagesnależy wykonać dowolną logikę redukcji historii czatów, taką jak podsumowywanie lub przycinanie.

Serializacja

ChatMessageStore wystąpienia są tworzone i dołączane do elementu AgentThread po utworzeniu wątku oraz po wznowieniu wątku ze stanu zserializowanego.

Podczas gdy rzeczywiste wiadomości tworzące historię czatów są przechowywane zewnętrznie, ChatMessageStore wystąpienie może wymagać przechowywania kluczy lub innych danych w celu identyfikacji historii czatu w repozytorium zewnętrznym.

Aby umożliwić utrwalanie wątków, należy zaimplementować serialize_state metody deserialize_state i ChatMessageStore protokołu . Te metody umożliwiają utrwalanie i przywracanie stanu sklepu podczas wznawiania wątku.

Przykładowa implementacja chatMessageStore

Poniższa przykładowa implementacja przechowuje wiadomości czatu w usłudze Redis przy użyciu struktury danych Redis Lists.

W add_messages przechowuje komunikaty w Redis, używając RPUSH, aby dołączyć je na końcu listy w kolejności chronologicznej.

list_messages pobiera wiadomości dla bieżącego wątku z Redis przy użyciu polecenia LRANGE i zwraca je w kolejności chronologicznej rosnącej.

Po odebraniu pierwszej wiadomości sklep generuje unikatowy klucz dla wątku, który jest następnie używany do identyfikowania historii czatów w usłudze Redis na potrzeby kolejnych wywołań.

Unikatowy klucz i inne ustawienia są przechowywane i mogą być serializowane oraz deserializowane przy użyciu metod serialize_state i deserialize_state. W związku z tym ten stan zostanie utrwalony jako część stanu AgentThread, co pozwoli później wznowić wątek i dalej używać tej samej historii czatu.

from collections.abc import Sequence
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
import json
import redis.asyncio as redis
from agent_framework import ChatMessage


class RedisStoreState(BaseModel):
    """State model for serializing and deserializing Redis chat message store data."""

    thread_id: str
    redis_url: str | None = None
    key_prefix: str = "chat_messages"
    max_messages: int | None = None


class RedisChatMessageStore:
    """Redis-backed implementation of ChatMessageStore using Redis Lists."""

    def __init__(
        self,
        redis_url: str | None = None,
        thread_id: str | None = None,
        key_prefix: str = "chat_messages",
        max_messages: int | None = None,
    ) -> None:
        """Initialize the Redis chat message store.

        Args:
            redis_url: Redis connection URL (for example, "redis://localhost:6379").
            thread_id: Unique identifier for this conversation thread.
                      If not provided, a UUID will be auto-generated.
            key_prefix: Prefix for Redis keys to namespace different applications.
            max_messages: Maximum number of messages to retain in Redis.
                         When exceeded, oldest messages are automatically trimmed.
        """
        if redis_url is None:
            raise ValueError("redis_url is required for Redis connection")

        self.redis_url = redis_url
        self.thread_id = thread_id or f"thread_{uuid4()}"
        self.key_prefix = key_prefix
        self.max_messages = max_messages

        # Initialize Redis client
        self._redis_client = redis.from_url(redis_url, decode_responses=True)

    @property
    def redis_key(self) -> str:
        """Get the Redis key for this thread's messages."""
        return f"{self.key_prefix}:{self.thread_id}"

    async def add_messages(self, messages: Sequence[ChatMessage]) -> None:
        """Add messages to the Redis store.

        Args:
            messages: Sequence of ChatMessage objects to add to the store.
        """
        if not messages:
            return

        # Serialize messages and add to Redis list
        serialized_messages = [self._serialize_message(msg) for msg in messages]
        await self._redis_client.rpush(self.redis_key, *serialized_messages)

        # Apply message limit if configured
        if self.max_messages is not None:
            current_count = await self._redis_client.llen(self.redis_key)
            if current_count > self.max_messages:
                # Keep only the most recent max_messages using LTRIM
                await self._redis_client.ltrim(self.redis_key, -self.max_messages, -1)

    async def list_messages(self) -> list[ChatMessage]:
        """Get all messages from the store in chronological order.

        Returns:
            List of ChatMessage objects in chronological order (oldest first).
        """
        # Retrieve all messages from Redis list (oldest to newest)
        redis_messages = await self._redis_client.lrange(self.redis_key, 0, -1)

        messages = []
        for serialized_message in redis_messages:
            message = self._deserialize_message(serialized_message)
            messages.append(message)

        return messages

    async def serialize_state(self, **kwargs: Any) -> Any:
        """Serialize the current store state for persistence.

        Returns:
            Dictionary containing serialized store configuration.
        """
        state = RedisStoreState(
            thread_id=self.thread_id,
            redis_url=self.redis_url,
            key_prefix=self.key_prefix,
            max_messages=self.max_messages,
        )
        return state.model_dump(**kwargs)

    async def deserialize_state(self, serialized_store_state: Any, **kwargs: Any) -> None:
        """Deserialize state data into this store instance.

        Args:
            serialized_store_state: Previously serialized state data.
            **kwargs: Additional arguments for deserialization.
        """
        if serialized_store_state:
            state = RedisStoreState.model_validate(serialized_store_state, **kwargs)
            self.thread_id = state.thread_id
            self.key_prefix = state.key_prefix
            self.max_messages = state.max_messages

            # Recreate Redis client if the URL changed
            if state.redis_url and state.redis_url != self.redis_url:
                self.redis_url = state.redis_url
                self._redis_client = redis.from_url(self.redis_url, decode_responses=True)

    def _serialize_message(self, message: ChatMessage) -> str:
        """Serialize a ChatMessage to JSON string."""
        message_dict = message.model_dump()
        return json.dumps(message_dict, separators=(",", ":"))

    def _deserialize_message(self, serialized_message: str) -> ChatMessage:
        """Deserialize a JSON string to ChatMessage."""
        message_dict = json.loads(serialized_message)
        return ChatMessage.model_validate(message_dict)

    async def clear(self) -> None:
        """Remove all messages from the store."""
        await self._redis_client.delete(self.redis_key)

    async def aclose(self) -> None:
        """Close the Redis connection."""
        await self._redis_client.aclose()

Używanie niestandardowego chatMessageStore z agentem ChatAgent

Aby użyć niestandardowego ChatMessageStoreelementu , należy podać element chat_message_store_factory podczas tworzenia agenta. Ta fabryka pozwala agentowi utworzyć nową instancję pożądanego ChatMessageStore dla każdego wątku.

Podczas tworzenia elementu ChatAgentmożna podać chat_message_store_factory parametr oprócz wszystkich innych opcji agenta.

from azure.identity import AzureCliCredential
from agent_framework import ChatAgent
from agent_framework.openai import AzureOpenAIChatClient

# Create the chat agent with custom message store factory
agent = ChatAgent(
    chat_client=AzureOpenAIChatClient(
        endpoint="https://<myresource>.openai.azure.com",
        credential=AzureCliCredential(),
        ai_model_id="gpt-4o-mini"
    ),
    name="Joker",
    instructions="You are good at telling jokes.",
    chat_message_store_factory=lambda: RedisChatMessageStore(
        redis_url="redis://localhost:6379"
    )
)

# Use the agent with persistent chat history
thread = agent.get_new_thread()
response = await agent.run("Tell me a joke about pirates", thread=thread)
print(response.text)

Dalsze kroki