Freigeben über


KI-Agent-Speicher

Von Bedeutung

Dieses Feature befindet sich in der Public Preview.

Memory ermöglicht es KI-Agents, sich Informationen aus früheren Dialogen oder Gesprächen zu merken. Auf diese Weise können Agents kontextbezogene Antworten bereitstellen und personalisierte Erfahrungen im Laufe der Zeit erstellen. Verwenden Sie Databricks Lakebase, eine vollständig verwaltete Postgres OLTP-Datenbank, um den Konversationsstatus und -historie zu verwalten.

Anforderungen

Kurzzeitgedächtnis vs. Langzeitgedächtnis

Kurzzeitspeicher erfasst Kontext in einer einzelnen Unterhaltungssitzung, während der Langzeitspeicher wichtige Informationen in mehreren Unterhaltungen extrahiert und speichert. Sie können Ihren Agenten mit einem der beiden Speichertypen oder mit beiden erstellen.

Agenten mit kurzfristigem und langfristigem Speicher

Kurzzeitspeicher Langzeitspeicher
Den Kontext in einer einzelnen Unterhaltungssitzung mithilfe von Thread-IDs und Checkpointing erfassen.
Beibehalten des Kontexts für Folgefragen innerhalb einer Sitzung
Debuggen und Testen von Gesprächsflüssen mithilfe von Zeitreisen
Automatisches Extrahieren und Speichern wichtiger Erkenntnisse in mehreren Sitzungen
Personalisieren von Interaktionen basierend auf früheren Einstellungen
Erstellen einer Wissensbasis zu Benutzern, die die Antworten im Laufe der Zeit verbessern

Notebook-Beispiele

Agent mit kurzfristigem Speicher

Notebook abrufen

Agent mit langfristigem Speicher

Notebook abrufen

Fragen Sie Ihren bereitgestellten Agenten ab

Nachdem Sie Ihren Agent auf einem Model Serving-Endpunkt bereitgestellt haben, finden Sie informationen zu Abfrageanweisungen unter Abfragen eines bereitgestellten Mosaik AI-Agents .

Um eine Thread-ID zu übergeben, verwenden Sie den extra_body Param. Das folgende Beispiel zeigt, wie Sie eine Thread-ID an einen ResponsesAgent Endpunkt übergeben:

   response1 = client.responses.create(
    model=endpoint,
    input=[{"role": "user", "content": "What are stateful agents?"}],
    extra_body={
        "custom_inputs": {"thread_id": thread_id}
    }
)

Wenn Sie einen Client verwenden, der ChatContext automatisch übergibt, z. B. die Playground- oder Überprüfungs-App, wird die Unterhaltungs-ID und die Benutzer-ID automatisch für kurzfristige/langfristige Speichernutzungsfälle übergeben.

Zeitreise durch das Kurzzeitgedächtnis

Verwenden Sie für Agenten mit kurzzeitigem Arbeitsspeicher LangGraph Time-Travel, um die Ausführung von Prüfpunkten wieder aufzunehmen. Sie können die Unterhaltung entweder wiedergeben oder ändern, um alternative Pfade zu erkunden. Jedes Mal, wenn Sie eine Sitzung von einem Prüfpunkt aus fortsetzen, erstellt LangGraph eine neue Verzweigung in der aufgezeichneten Unterhaltung, wobei das Original beibehalten wird und gleichzeitig experimentiert werden kann.

  1. Erstellen Sie im Agentcode Funktionen, die den Prüfpunktverlauf abrufen und den Prüfpunktstatus in der LangGraphResponsesAgent Klasse aktualisieren:

    from typing import List, Dict
    def get_checkpoint_history(self, thread_id: str, limit: int = 10) -> List[Dict[str, Any]]:
        """Retrieve checkpoint history for a thread.
    
        Args:
            thread_id: The thread identifier
            limit: Maximum number of checkpoints to return
    
        Returns:
            List of checkpoint information including checkpoint_id, timestamp, and next nodes
        """
        config = {"configurable": {"thread_id": thread_id}}
    
       with CheckpointSaver(instance_name=LAKEBASE_INSTANCE_NAME) as checkpointer:
            graph = self._create_graph(checkpointer)
    
            history = []
            for state in graph.get_state_history(config):
                if len(history) >= limit:
                    break
    
                history.append({
                    "checkpoint_id": state.config["configurable"]["checkpoint_id"],
                    "thread_id": thread_id,
                    "timestamp": state.created_at,
                    "next_nodes": state.next,
                    "message_count": len(state.values.get("messages", [])),
                    # Include last message summary for context
                    "last_message": self._get_last_message_summary(state.values.get("messages", []))
                })
    
            return history
    
    def _get_last_message_summary(self, messages: List[Any]) -> Optional[str]:
        """Get a snippet of the last message for checkpoint identification"""
        return getattr(messages[-1], "content", "")[:100] if messages else None
    
    def update_checkpoint_state(self, thread_id: str, checkpoint_id: str,
                            new_messages: Optional[List[Dict]] = None) -> Dict[str, Any]:
        """Update state at a specific checkpoint (used for modifying conversation history).
    
        Args:
            thread_id: The thread identifier
            checkpoint_id: The checkpoint to update
            new_messages: Optional new messages to set at this checkpoint
    
        Returns:
            New checkpoint configuration including the new checkpoint_id
        """
        config = {
            "configurable": {
                "thread_id": thread_id,
                "checkpoint_id": checkpoint_id
            }
        }
    
        with CheckpointSaver(instance_name=LAKEBASE_INSTANCE_NAME) as checkpointer:
            graph = self._create_graph(checkpointer)
    
            # Prepare the values to update
            values = {}
            if new_messages:
                cc_msgs = self.prep_msgs_for_cc_llm(new_messages)
                values["messages"] = cc_msgs
    
            # Update the state (creates a new checkpoint)
            new_config = graph.update_state(config, values=values)
    
            return {
                "thread_id": thread_id,
                "checkpoint_id": new_config["configurable"]["checkpoint_id"],
                "parent_checkpoint_id": checkpoint_id
            }
    
  2. Aktualisieren Sie die Funktionen predict und predict_stream, um das Übergeben von Prüfpunkten zu unterstützen.

    Predict

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming prediction"""
        # The same thread_id is used by BOTH predict() and predict_stream()
        ci = dict(request.custom_inputs or {})
        if "thread_id" not in ci:
            ci["thread_id"] = str(uuid.uuid4())
        request.custom_inputs = ci
    
        outputs = [
            event.item
            for event in self.predict_stream(request)
            if event.type == "response.output_item.done"
        ]
    
        # Include thread_id and checkpoint_id in custom outputs
        custom_outputs = {
            "thread_id": ci["thread_id"]
        }
        if "checkpoint_id" in ci:
            custom_outputs["parent_checkpoint_id"] = ci["checkpoint_id"]
    
        try:
            history = self.get_checkpoint_history(ci["thread_id"], limit=1)
            if history:
                custom_outputs["checkpoint_id"] = history[0]["checkpoint_id"]
        except Exception as e:
            logger.warning(f"Could not retrieve new checkpoint_id: {e}")
    
        return ResponsesAgentResponse(output=outputs, custom_outputs=custom_outputs)
    

    Predict_stream

    def predict_stream(
        self,
        request: ResponsesAgentRequest,
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming prediction with PostgreSQL checkpoint branching support.
    
        Accepts in custom_inputs:
        - thread_id: Conversation thread identifier for session
        - checkpoint_id (optional): Checkpoint to resume from (for branching)
        """
        # Get thread ID and checkpoint ID from custom inputs
        custom_inputs = request.custom_inputs or {}
        thread_id = custom_inputs.get("thread_id", str(uuid.uuid4()))  # generate new thread ID if one is not passed in
        checkpoint_id = custom_inputs.get("checkpoint_id")  # Optional for branching
    
        # Convert incoming Responses messages to LangChain format
        langchain_msgs = self.prep_msgs_for_cc_llm([i.model_dump() for i in request.input])
    
        # Build checkpoint configuration
        checkpoint_config = {"configurable": {"thread_id": thread_id}}
        # If checkpoint_id is provided, we're branching from that checkpoint
        if checkpoint_id:
            checkpoint_config["configurable"]["checkpoint_id"] = checkpoint_id
            logger.info(f"Branching from checkpoint: {checkpoint_id} in thread: {thread_id}")
    
        # DATABASE CONNECTION POOLING LOGIC FOLLOWS
        # Use connection from pool
    

Testen Sie dann die Verzweigung des Prüfpunkts:

  1. Starten Sie einen Unterhaltungsthread, und fügen Sie einige Nachrichten hinzu:

    from agent import AGENT
    # Initial conversation - starts a new thread
    response1 = AGENT.predict({
        "input": [{"role": "user", "content": "I'm planning for an upcoming trip!"}],
    })
    print(response1.model_dump(exclude_none=True))
    thread_id = response1.custom_outputs["thread_id"]
    
    # Within the same thread, ask a follow-up question - short-term memory will remember previous messages in the same thread/conversation session
    response2 = AGENT.predict({
        "input": [{"role": "user", "content": "I'm headed to SF!"}],
        "custom_inputs": {"thread_id": thread_id}
    })
    print(response2.model_dump(exclude_none=True))
    
    # Within the same thread, ask a follow-up question - short-term memory will remember previous messages in the same thread/conversation session
    response3 = AGENT.predict({
        "input": [{"role": "user", "content": "Where did I say I'm going?"}],
        "custom_inputs": {"thread_id": thread_id}
    })
    print(response3.model_dump(exclude_none=True))
    
    
  2. Abrufen der Checkpoint-Historie und Lenken der Unterhaltung in eine andere Richtung mit einer anderen Nachricht:

    # Get checkpoint history to find branching point
    history = AGENT.get_checkpoint_history(thread_id, 20)
    # Retrieve checkpoint at index - indices count backward from most recent checkpoint
    index = max(1, len(history) - 4)
    branch_checkpoint = history[index]["checkpoint_id"]
    
    # Branch from node with next_node = `('__start__',)` to re-input message to agent at certain part of conversation
    # I want to update the information of which city I am going to
    # Within the same thread, branch from a checkpoint and override it with different context to continue the conversation in a new fork
    response4 = AGENT.predict({
        "input": [{"role": "user", "content": "I'm headed to New York!"}],
        "custom_inputs": {
            "thread_id": thread_id,
            "checkpoint_id": branch_checkpoint # Branch from this checkpoint!
        }
    })
    print(response4.model_dump(exclude_none=True))
    
    # Thread ID stays the same even though it branched from a checkpoint:
    branched_thread_id = response4.custom_outputs["thread_id"]
    print(f"original thread id was {thread_id}")
    print(f"new thread id after branching is the same as original: {branched_thread_id}")
    
    # Continue the conversation in the same thread and it will pick up from the information you tell it in your branch
    response5 = AGENT.predict({
        "input": [{"role": "user", "content": "Where am I going?"}],
        "custom_inputs": {
            "thread_id": thread_id,
        }
    })
    print(response5.model_dump(exclude_none=True))
    

Nächste Schritte