Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette page montre comment créer un agent IA en Python à l’aide de Mosaïque AI Agent Framework et des bibliothèques de création d’agents populaires telles que LangGraph et OpenAI.
Spécifications
Conseil / Astuce
Databricks recommande d’installer la dernière version du client Python MLflow lors du développement d’agents.
Pour créer et déployer des agents à l’aide de l’approche de cette page, installez les éléments suivants :
-
databricks-agents1.2.0 ou version ultérieure -
mlflow3.1.3 ou version ultérieure - Python 3.10 ou version ultérieure.
- Utilisez le calcul serverless ou Databricks Runtime 13.3 LTS ou version ultérieure pour répondre à cette exigence.
%pip install -U -qqqq databricks-agents mlflow
Databricks recommande également d’installer des packages d’intégration Databricks AI Bridge pour créer des agents. Ces packages d’intégration fournissent une couche partagée d’API qui interagissent avec les fonctionnalités d’IA Databricks, telles que Databricks AI/BI Genie et Recherche vectorielle, entre les frameworks de création d’agents et les kits SDK.
OpenAI
%pip install -U -qqqq databricks-openai
LangChain/LangGraph
%pip install -U -qqqq databricks-langchain
DSPy
%pip install -U -qqqq databricks-dspy
Agents Pure Python
%pip install -U -qqqq databricks-ai-bridge
Utiliser ResponsesAgent pour créer des agents
Databricks recommande l’interface ResponsesAgent MLflow pour créer des agents de niveau production.
ResponsesAgent vous permet de créer des agents avec n’importe quelle infrastructure tierce, puis de l’intégrer à des fonctionnalités d’IA Databricks pour des fonctionnalités de journalisation, de suivi, d’évaluation, de déploiement et de supervision robustes.
Le ResponsesAgent schéma est compatible avec le schéma OpenAI Responses . Pour en savoir plus sur OpenAI, consultez OpenAI Responses: Réponses et ChatCompletion.
Remarque
L’ancienne ChatAgent interface est toujours prise en charge sur Databricks. Toutefois, pour les nouveaux agents, Databricks recommande d’utiliser la dernière version de MLflow et l’interface ResponsesAgent .
Consultez le schéma de l’agent d’entrée et de sortie hérité.
ResponsesAgent offre les avantages suivants :
Fonctionnalités avancées de l’agent
- Prise en charge multi-agent
- Sortie de streaming : diffusez en continu la sortie en blocs plus petits.
- Historique complet des messages appelant les outils : retournez plusieurs messages, y compris les messages d’appel intermédiaires, pour améliorer la qualité et la gestion des conversations.
- Support pour la confirmation des invocations d’outils
- Prise en charge des outils de longue durée
Développement, déploiement et supervision simplifiés
-
Créer des agents à l’aide de n’importe quelle infrastructure : encapsulez n’importe quel agent existant à l’aide de l’interface pour obtenir une compatibilité prête à l’emploi
ResponsesAgentavec AI Playground, Agent Evaluation et Agent Monitoring. - Interfaces de développement typées: écrire du code d'agent à l’aide de classes Python typées, en bénéficiant de l’auto-complétion de l’IDE et du notebook.
-
Inférence de signature automatique : MLflow déduit automatiquement les
ResponsesAgentsignatures lors de la journalisation d’un agent, ce qui simplifie l’inscription et le déploiement. Consultez Déduire la signature du modèle pendant l’enregistrement. -
Suivi automatique : MLflow trace automatiquement vos fonctions et
predictvospredict_streamfonctions, agrégeant les réponses diffusées en continu pour faciliter l’évaluation et l’affichage. - tables d’inférence améliorées par la passerelle AI: les tables d’inférence ai Gateway sont automatiquement activées pour les agents déployés, ce qui permet d’accéder aux métadonnées détaillées du journal des requêtes.
-
Créer des agents à l’aide de n’importe quelle infrastructure : encapsulez n’importe quel agent existant à l’aide de l’interface pour obtenir une compatibilité prête à l’emploi
Pour savoir comment créer un ResponsesAgent, consultez les exemples de la section suivante et de la documentation MLflow - ResponsesAgent for Model Serving.
ResponsesAgent Exemples
Les notebooks suivants montrent comment créer la diffusion en continu et la non-diffusion en continu ResponsesAgent à l’aide de bibliothèques populaires. Pour savoir comment développer les fonctionnalités de ces agents, consultez les outils de l’agent IA.
OpenAI
Agent de conversation simple OpenAI à l’aide de modèles hébergés par Databricks
Agent d’appel d’outils OpenAI à l’aide de modèles hébergés par Databricks
Agent d’appel d’outils OpenAI à l’aide de modèles hébergés par OpenAI
LangGraph
Agent d’appel de l’outil LangGraph
DSPy
Agent d’appel d’outil à tour unique DSPy
Exemple multiagent
Pour savoir comment créer un système multi-agent, consultez Utiliser Genie dans les systèmes multi-agents.
Exemple d’agent avec état
Pour savoir comment créer des agents avec état avec une mémoire à court terme et à long terme à l’aide de Lakebase comme magasin de mémoire, consultez la mémoire de l’agent IA.
Exemple d’agent non conversationnel
Contrairement aux agents conversationnels qui gèrent les dialogues multitours, les agents non conversationnels se concentrent sur l’exécution efficace de tâches bien définies. Cette architecture simplifiée permet un débit plus élevé pour les demandes indépendantes.
Pour savoir comment créer un agent non conversationnel, consultez les agents IA non conversationnels à l’aide de MLflow.
Que se passe-t-il si j’ai déjà un agent ?
Si vous disposez déjà d’un agent créé avec LangChain, LangGraph ou une infrastructure similaire, vous n’avez pas besoin de réécrire votre agent pour l’utiliser sur Databricks. Au lieu de cela, il vous suffit d’encapsuler votre agent existant avec l’interface MLflow ResponsesAgent :
Écrire une classe wrapper Python qui hérite de
mlflow.pyfunc.ResponsesAgent.Dans la classe wrapper, référencez l’agent existant en tant qu’attribut
self.agent = your_existing_agent.La
ResponsesAgentclasse nécessite l’implémentation d’unepredictméthode qui retourne unResponsesAgentResponsepour gérer les demandes de non-diffusion en continu. Voici un exemple deResponsesAgentResponsesschéma :import uuid # input as a dict {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]} # output example ResponsesAgentResponse( output=[ { "type": "message", "id": str(uuid.uuid4()), "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}], "role": "assistant", }, ] )Dans la
predictfonction, convertissez les messages entrants dansResponsesAgentRequestle format attendu par l’agent. Une fois que l’agent a généré une réponse, convertissez sa sortie en objetResponsesAgentResponse.
Consultez les exemples de code suivants pour voir comment convertir des agents existants en ResponsesAgent:
Conversion de base
Pour les agents non de diffusion en continu, convertissez les entrées et les sorties dans la predict fonction.
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
)
class MyWrappedAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Call your existing agent (non-streaming)
agent_response = self.agent.invoke(messages)
# Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)
# Return the response
return ResponsesAgentResponse(output=[output_item])
Diffusion en continu avec le code réutilisateur
Pour les agents de streaming, vous pouvez être intelligent et réutiliser une logique pour éviter de dupliquer le code qui convertit les messages :
from typing import Generator
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Stream from your existing agent
item_id = str(uuid4())
aggregated_stream = ""
for chunk in self.agent.stream(messages):
# Convert each chunk to ResponsesAgent format
yield self.create_text_delta(delta=chunk, item_id=item_id)
aggregated_stream += chunk
# Emit an aggregated output_item for all the text deltas with id=item_id
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(text=aggregated_stream, id=item_id),
)
Migrer à partir de ChatCompletions
Si votre agent existant utilise l’API ChatCompletions OpenAI, vous pouvez la ResponsesAgent migrer sans réécrire sa logique principale. Ajoutez un wrapper qui :
- Convertit les messages entrants
ResponsesAgentRequestauChatCompletionsformat attendu par votre agent. - Convertit les
ChatCompletionssorties dans leResponsesAgentResponseschéma. - Vous pouvez éventuellement prendre en charge la diffusion en continu en mappant des deltas incrémentiels d’objets
ChatCompletionsResponsesAgentStreamEvent.
from typing import Generator
from uuid import uuid4
from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
def __init__(self):
self.w = WorkspaceClient()
self.OpenAI = self.w.serving_endpoints.get_open_ai_client()
def stream(self, messages):
for chunk in self.OpenAI.chat.completions.create(
model="databricks-claude-sonnet-4-5",
messages=messages,
stream=True,
):
yield chunk.to_dict()
# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# `agent` is your existing ChatCompletions agent
self.agent = agent
def prep_msgs_for_llm(self, messages):
# dummy example of prep_msgs_for_llm
# real example of prep_msgs_for_llm included in full examples linked below
return [{"role": "user", "content": "Hello, how are you?"}]
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# process the ChatCompletion output stream
agent_content = ""
tool_calls = []
msg_id = None
for chunk in self.agent.stream(messages): # call the underlying agent's stream method
delta = chunk["choices"][0]["delta"]
msg_id = chunk.get("id", None)
content = delta.get("content", None)
if tc := delta.get("tool_calls"):
if not tool_calls: # only accommodate for single tool call right now
tool_calls = tc
else:
tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
elif content is not None:
agent_content += content
yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))
# aggregate the streamed text content
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(agent_content, msg_id),
)
for tool_call in tool_calls:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
str(uuid4()),
tool_call["id"],
tool_call["function"]["name"],
tool_call["function"]["arguments"],
),
)
agent = MyWrappedStreamingAgent(LegacyAgent())
for chunk in agent.predict_stream(
ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
print(chunk)
Pour obtenir des exemples complets, consultez ResponsesAgent des exemples.
Réponses de diffusion en continu
Le streaming permet aux agents d’envoyer des réponses en segments en temps réel au lieu d’attendre la réponse complète. Pour implémenter la diffusion en continu avec ResponsesAgent, émettez une série d’événements delta suivis d’un événement d’achèvement final :
-
Émettre des événements delta : envoyez plusieurs
output_text.deltaévénements avec le mêmeitem_idpour diffuser en continu des blocs de texte en temps réel. -
Terminer avec l’événement terminé : envoyez un événement final
response.output_item.doneavec le mêmeitem_idque les événements delta contenant le texte de sortie final complet.
Chaque événement delta diffuse un bloc de texte vers le client. L’événement terminé final contient le texte de réponse complet et signale Databricks pour effectuer les opérations suivantes :
- Tracer la sortie de votre agent avec le suivi MLflow
- Agréger les réponses diffusées en continu dans les tables d’inférence de passerelle AI
- Afficher la sortie complète dans l’interface utilisateur AI Playground
Propagation d'erreurs de streaming
Mosaïque IA propage toutes les erreurs rencontrées lors de la diffusion en continu avec le dernier jeton sous databricks_output.error. Il incombe au client appelant de gérer correctement et de faire apparaître cette erreur.
{
"delta": …,
"databricks_output": {
"trace": {...},
"error": {
"error_code": BAD_REQUEST,
"message": "TimeoutException: Tool XYZ failed to execute."
}
}
}
Fonctionnalités avancées
entrées et sorties personnalisées
Certains scénarios peuvent nécessiter des entrées d’agent supplémentaires, telles que client_type et session_id, ou des sorties telles que des liens sources de récupération qui ne doivent pas être inclus dans l’historique des conversations pour les interactions futures.
Pour ces scénarios, MLflow ResponsesAgent prend en charge en mode natif les champs custom_inputs et custom_outputs. Vous pouvez accéder aux entrées personnalisées via request.custom_inputs tous les exemples liés ci-dessus dans Les exemples ResponsesAgent.
Avertissement
L’application d’évaluation de l’agent ne prend pas en charge le rendu des traces pour les agents avec des champs d’entrée supplémentaires.
Consultez les blocs-notes suivants pour savoir comment définir des entrées et sorties personnalisées.
Fournir custom_inputs dans l’ia Playground et examiner l’application
Si votre agent accepte des entrées supplémentaires à l’aide du custom_inputs champ, vous pouvez fournir manuellement ces entrées dans l’aire de jeu IA et l’application de révision.
Dans AI Playground ou l’application Agent Review, sélectionnez l'icône rouage
.
Activez custom_inputs.
Fournissez un objet JSON qui correspond au schéma d’entrée défini de votre agent.
Spécifier des schémas de récupérateur personnalisés
Les agents IA utilisent couramment des récupérateurs pour rechercher et interroger des données non structurées à partir d’index de recherche vectorielle. Pour obtenir des exemples d'outils de récupération, consultez Outils de création et de traçage pour des données non structurées.
Tracez ces extracteurs au sein de votre agent avec des intervalles MLflow RETRIEVER pour activer les fonctionnalités de produit Databricks, notamment :
- Affichage automatique des liens vers des documents sources récupérés dans l’interface utilisateur AI Playground
- Exécution automatique du fondement de l’extraction et des jugements de pertinence dans l’évaluation des agents
Remarque
Databricks recommande d’utiliser les outils de récupération fournis par les packages Databricks AI Bridge comme databricks_langchain.VectorSearchRetrieverTool et databricks_openai.VectorSearchRetrieverTool, car ils sont déjà conformes au schéma du récupérateur MLflow. Consultez Développer localement des outils d’extraction de recherche vectorielle avec AI Bridge.
Si votre agent inclut des étendues de récupérateur avec un schéma personnalisé, appelez mlflow.models.set_retriever_schema quand vous définissez votre agent dans le code. Cela mappe les colonnes de sortie de votre récupérateur aux champs attendus de MLflow (primary_key, text_column, doc_uri).
import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
# {
# 'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
# 'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
# 'doc_uri': 'https://mlflow.org/docs/latest/index.html',
# 'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
# },
# {
# 'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
# 'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
# 'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
# 'title': 'Getting Started with MLflow'
# },
# ...
# ]
mlflow.models.set_retriever_schema(
# Specify the name of your retriever span
name="mlflow_docs_vector_search",
# Specify the output column name to treat as the primary key (ID) of each retrieved document
primary_key="document_id",
# Specify the output column name to treat as the text content (page content) of each retrieved document
text_column="chunk_text",
# Specify the output column name to treat as the document URI of each retrieved document
doc_uri="doc_uri",
# Specify any other columns returned by the retriever
other_columns=["title"],
)
Remarque
La colonne doc_uri est particulièrement importante lors de l’évaluation des performances du récupérateur.
doc_uri est l’identificateur principal des documents retournés par l’extracteur, ce qui vous permet de les comparer aux jeux d’évaluation de la vérité terrain. Consultez les jeux d’évaluation (MLflow 2).
Points à prendre en considération pour le déploiement
Préparer le service de modèle Databricks
Databricks déploie s ResponsesAgentdans un environnement distribué sur Databricks Model Serve. Cela signifie que pendant une conversation à plusieurs tour, le même réplica de service peut ne pas gérer toutes les requêtes. Soyez attentifs aux implications suivantes pour la gestion de l’état de l’agent :
Évitez la mise en cache locale : lors du déploiement d’un
ResponsesAgentréplica, ne supposez pas que le même réplica gère toutes les requêtes d’une conversation multitour. Reconstruire l’état interne en utilisant un schéma de dictionnaireResponsesAgentRequestpour chaque étape.État thread-safe : concevez l’état de l’agent pour qu’il soit thread-safe, ce qui empêche les conflits dans les environnements à threads multiples.
Initialiser l’état dans la fonction
predict: initialisez l’état chaque fois que la fonctionpredictest appelée, et non pendant l’initialisationResponsesAgent. Le stockage de l’état au niveauResponsesAgentpeut divulguer des informations entre les conversations et provoquer des conflits, car une seule répliqueResponsesAgentpeut traiter les requêtes de plusieurs conversations.
Paramétrer le code pour le déploiement dans les environnements
Paramétrez le code de l’agent pour réutiliser le même code d’agent dans différents environnements.
Les paramètres sont des paires clé-valeur que vous définissez dans un dictionnaire Python ou un fichier .yaml.
Pour configurer le code, créez un ModelConfig dictionnaire Python ou un fichier à l’aide d’un .yaml fichier.
ModelConfig est un ensemble de paramètres clé-valeur qui permet une gestion flexible de la configuration. Par exemple, vous pouvez utiliser un dictionnaire pendant le développement, puis le convertir en fichier .yaml pour le déploiement de production et CI/CD.
Voici un exemple ModelConfig :
llm_parameters:
max_tokens: 500
temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
question that indicates your prompt template came from a YAML file. Your response
must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
- question
Dans votre code d’agent, vous pouvez référencer une configuration par défaut (développement) à partir du fichier ou du dictionnaire .yaml :
import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)
# Example of using a dictionary
config_dict = {
"prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
"prompt_template_input_vars": ["question"],
"model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
"llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}
model_config = mlflow.models.ModelConfig(development_config=config_dict)
# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')
Ensuite, lors de la journalisation de votre agent, spécifiez le paramètre model_config à log_model pour spécifier un ensemble personnalisé de paramètres à utiliser lors du chargement de l’agent journalisé. Consultez la documentation de MLflow - ModelConfig.
Utiliser du code synchrone ou des modèles de rappel
Pour garantir la stabilité et la compatibilité, utilisez du code synchrone ou des modèles basés sur un rappel dans votre implémentation de l’agent.
Azure Databricks gère automatiquement la communication asynchrone pour fournir une concurrence et des performances optimales lorsque vous déployez un agent. L’introduction de boucles d’événements personnalisées ou d’infrastructures asynchrones peut entraîner des erreurs telles que RuntimeError: This event loop is already running and caused unpredictable behavior.
Azure Databricks recommande d’éviter la programmation asynchrone, comme l’utilisation d’asyncio ou la création de boucles d’événements personnalisées, lors du développement d’agents.