Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Esta página mostra como criar um agente de IA no Python usando o Mosaic AI Agent Framework e bibliotecas populares de criação de agente, como LangGraph e OpenAI.
Requisitos
Dica
O Databricks recomenda instalar a versão mais recente do cliente Python do MLflow ao desenvolver agentes.
Para criar e implantar agentes usando a abordagem nesta página, instale o seguinte:
-
databricks-agents1.2.0 ou superior -
mlflow3.1.3 ou superior - Python 3.10 ou superior.
- Use a computação sem servidor ou o Databricks Runtime 13.3 LTS ou superior para atender a esse requisito.
%pip install -U -qqqq databricks-agents mlflow
O Databricks também recomenda a instalação de pacotes de integração da Ponte de IA do Databricks para criar agentes. Esses pacotes de integração fornecem uma camada compartilhada de APIs que interagem com os recursos de IA do Databricks, como o Genie de IA/BI do Databricks e o Vector Search, entre estruturas de criação de agente e SDKs.
OpenAI
%pip install -U -qqqq databricks-openai
LangChain/LangGraph
%pip install -U -qqqq databricks-langchain
DSPy
%pip install -U -qqqq databricks-dspy
Agentes puros do Python
%pip install -U -qqqq databricks-ai-bridge
Usar ResponsesAgent para criar agentes
O Databricks recomenda a interface ResponsesAgent MLflow para criar agentes de nível de produção.
ResponsesAgent permite criar agentes com qualquer estrutura de terceiros e integrá-la aos recursos de IA do Databricks para recursos robustos de registro em log, rastreamento, avaliação, implantação e monitoramento.
O ResponsesAgent esquema é compatível com o esquema OpenAI Responses . Para saber mais sobre o OpenAI Responses, consulte OpenAI: Responses vs. ChatCompletion.
Nota
A interface mais antiga ChatAgent ainda tem suporte no Databricks. No entanto, para novos agentes, o Databricks recomenda usar a versão mais recente do MLflow e da ResponsesAgent interface.
ResponsesAgent fornece os seguintes benefícios:
Funcionalidades avançadas do agente
- Suporte a vários agentes
- Saída de streaming: transmita a saída em partes menores.
- Histórico abrangente de chamada de mensagens de ferramentas: Retorne várias mensagens, incluindo as intermediárias de chamada de ferramentas, para melhorar a qualidade e o gerenciamento da conversa.
- Suporte à confirmação de chamada de ferramenta
- Suporte à ferramenta de execução longa
Desenvolvimento, implantação e monitoramento simplificados
-
Agentes de criação usando qualquer estrutura: encapsular qualquer agente existente usando a
ResponsesAgentinterface para obter compatibilidade pronta para uso com o Playground de IA, a Avaliação do Agente e o Monitoramento do Agente. - Interfaces de criação tipadas: escrever código do agente usando classes Python tipadas, beneficiando-se do preenchimento automático do IDE e do notebook.
-
Inferência automática de assinatura: o MLflow infere automaticamente assinaturas
ResponsesAgentao registrar um agente em log, simplificando o registro e a implantação. Confira Inferir assinatura de modelo durante o registro em log. -
Rastreamento automático: o MLflow rastreia automaticamente suas
predictfunções epredict_streamfunções, agregando respostas transmitidas para facilitar a avaliação e a exibição. - tabelas de inferência aprimoradas pelo Gateway de IA: as tabelas de inferência do Gateway de IA são habilitadas automaticamente para agentes implantados, fornecendo acesso a metadados detalhados do log de solicitações.
-
Agentes de criação usando qualquer estrutura: encapsular qualquer agente existente usando a
Para saber como criar um ResponsesAgent, consulte os exemplos na seção a seguir e a documentação do MLflow – ResponsesAgent for Model Serving.
ResponsesAgent Exemplos
Os blocos de anotações a seguir mostram como criar streaming e não streaming ResponsesAgent usando bibliotecas populares. Para saber como expandir os recursos desses agentes, consulte as ferramentas do agente de IA.
OpenAI
Agente de chat simples do OpenAI usando modelos hospedados pelo Databricks
Agente de chamada de ferramentas openAI usando modelos hospedados pelo Databricks
Agente de chamada de ferramenta OpenAI usando modelos hospedados pela OpenAI
LangGraph
Agente de chamada de ferramentas do LangGraph
DSPy
Agente de chamada de ferramenta de ciclo único DSPy
Exemplo de vários agentes
Para saber como criar um sistema de vários agentes, consulte Usar o Genie em sistemas de vários agentes.
Exemplo de agente com estado
Para saber como criar agentes com estado com memória de curto e longo prazo usando o Lakebase como um repositório de memória, consulte a memória do agente de IA.
Exemplo de agente não conversacional
Ao contrário dos agentes de conversação que gerenciam diálogos de vários turnos, os agentes não conversacionais se concentram na execução de tarefas bem definidas com eficiência. Essa arquitetura simplificada permite maior taxa de transferência para solicitações independentes.
Para saber como criar um agente não conversacional, consulte agentes de IA não conversacionais usando o MLflow.
E se eu já tiver um agente?
Se você já tiver um agente criado com LangChain, LangGraph ou uma estrutura semelhante, você não precisará reescrever seu agente para usá-lo no Databricks. Em vez disso, basta encapsular seu agente existente com a interface MLflow ResponsesAgent :
Escreva uma classe de wrapper do Python que herda de
mlflow.pyfunc.ResponsesAgent.Dentro da classe wrapper, faça referência ao agente existente como um atributo
self.agent = your_existing_agent.A
ResponsesAgentclasse requer a implementação de umpredictmétodo que retorna umResponsesAgentResponsepara lidar com solicitações que não são de streaming. Veja a seguir um exemplo doResponsesAgentResponsesesquema:import uuid # input as a dict {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]} # output example ResponsesAgentResponse( output=[ { "type": "message", "id": str(uuid.uuid4()), "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}], "role": "assistant", }, ] )predictNa função, converta as mensagens de entrada noResponsesAgentRequestformato que o agente espera. Depois que o agente gerar uma resposta, converta sua saída em umResponsesAgentResponseobjeto.
Confira os seguintes exemplos de código para ver como converter agentes existentes em ResponsesAgent:
Conversão básica
Para agentes que não são de streaming, converta entradas e saídas na predict função.
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
)
class MyWrappedAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Call your existing agent (non-streaming)
agent_response = self.agent.invoke(messages)
# Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)
# Return the response
return ResponsesAgentResponse(output=[output_item])
Streaming com reutilção de código
Para agentes de streaming, você pode ser inteligente e reutilizar a lógica para evitar duplicar o código que converte mensagens:
from typing import Generator
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Stream from your existing agent
item_id = str(uuid4())
aggregated_stream = ""
for chunk in self.agent.stream(messages):
# Convert each chunk to ResponsesAgent format
yield self.create_text_delta(delta=chunk, item_id=item_id)
aggregated_stream += chunk
# Emit an aggregated output_item for all the text deltas with id=item_id
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(text=aggregated_stream, id=item_id),
)
Migrar de ChatCompletions
Se o agente existente usar a API de ChatCompletions do OpenAI, você poderá migrá-la ResponsesAgent sem reescrever sua lógica principal. Adicione um wrapper que:
- Converte mensagens de entrada
ResponsesAgentRequestnoChatCompletionsformato esperado pelo agente. - Converte saídas
ChatCompletionsnoResponsesAgentResponseesquema. - Opcionalmente, dá suporte ao streaming mapeando deltas incrementais de
ChatCompletionsemResponsesAgentStreamEventobjetos.
from typing import Generator
from uuid import uuid4
from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
def __init__(self):
self.w = WorkspaceClient()
self.OpenAI = self.w.serving_endpoints.get_open_ai_client()
def stream(self, messages):
for chunk in self.OpenAI.chat.completions.create(
model="databricks-claude-sonnet-4-5",
messages=messages,
stream=True,
):
yield chunk.to_dict()
# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# `agent` is your existing ChatCompletions agent
self.agent = agent
def prep_msgs_for_llm(self, messages):
# dummy example of prep_msgs_for_llm
# real example of prep_msgs_for_llm included in full examples linked below
return [{"role": "user", "content": "Hello, how are you?"}]
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# process the ChatCompletion output stream
agent_content = ""
tool_calls = []
msg_id = None
for chunk in self.agent.stream(messages): # call the underlying agent's stream method
delta = chunk["choices"][0]["delta"]
msg_id = chunk.get("id", None)
content = delta.get("content", None)
if tc := delta.get("tool_calls"):
if not tool_calls: # only accommodate for single tool call right now
tool_calls = tc
else:
tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
elif content is not None:
agent_content += content
yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))
# aggregate the streamed text content
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(agent_content, msg_id),
)
for tool_call in tool_calls:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
str(uuid4()),
tool_call["id"],
tool_call["function"]["name"],
tool_call["function"]["arguments"],
),
)
agent = MyWrappedStreamingAgent(LegacyAgent())
for chunk in agent.predict_stream(
ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
print(chunk)
Para obter exemplos completos, consulte ResponsesAgent exemplos.
Respostas de streaming
O streaming permite que os agentes enviem respostas em partes em tempo real em vez de aguardar a resposta completa. Para implementar o streaming com ResponsesAgent, emita uma série de eventos delta seguidos por um evento de conclusão final:
-
Emitir eventos delta: enviar vários
output_text.deltaeventos com o mesmoitem_idpara transmitir partes de texto em tempo real. -
Concluir com o evento concluído: enviar um evento final
response.output_item.donecom o mesmoitem_idque os eventos delta que contêm o texto de saída final completo.
Cada evento delta transmite uma parte do texto para o cliente. O evento final concluído contém o texto de resposta completo e sinaliza o Databricks para fazer o seguinte:
- Rastrear a saída do agente com o rastreamento do MLflow
- Agregar respostas transmitidas em tabelas de inferência do Gateway de IA
- Mostrar a saída completa na interface do usuário do AI Playground
Propagação de erro na transmissão
A IA do Mosaico propaga todos os erros encontrados durante o streaming com o último token em databricks_output.error. Cabe ao cliente de chamada tratar e exibir corretamente esse erro.
{
"delta": …,
"databricks_output": {
"trace": {...},
"error": {
"error_code": BAD_REQUEST,
"message": "TimeoutException: Tool XYZ failed to execute."
}
}
}
Recursos avançados
entradas e saídas personalizadas
Alguns cenários podem exigir entradas adicionais do agente, como e client_typeou saídas, como session_id links de origem de recuperação que não devem ser incluídos no histórico de chat para interações futuras.
Para esses cenários, o MLflow ResponsesAgent dá suporte nativo aos campos custom_inputs e custom_outputs. Você pode acessar as entradas personalizadas por meio request.custom_inputs de todos os exemplos vinculados acima em Exemplos responsesAgent.
Aviso
O aplicativo de revisão de Avaliação do Agente não dá suporte à renderização de rastreamentos para agentes com campos de entrada adicionais.
Consulte os blocos de anotações a seguir para saber como definir entradas e saídas personalizadas.
Fornecer custom_inputs no Playground de IA e examinar o aplicativo
Se o agente aceitar entradas adicionais usando o custom_inputs campo, você poderá fornecer manualmente essas entradas no AI Playground e no aplicativo de revisão.
No Playground de IA ou no Aplicativo de Revisão do Agente, selecione o ícone de engrenagem
.
Habilitar custom_inputs.
Forneça um objeto JSON que corresponda ao esquema de entrada definido do agente.
Especificar esquemas de recuperador personalizados
Os agentes de IA geralmente usam recuperadores para localizar e consultar dados não estruturados de índices de pesquisa de vetor. Por exemplo, veja ferramentas de recuperação; consulte Construção e rastreamento de ferramentas de recuperação para dados não estruturados.
Rastreie esses recuperadores em seu agente com intervalos MLflow RETRIEVER para habilitar recursos de produtos do Databricks, incluindo:
- Exibir automaticamente links para documentos de origem recuperados na interface do Playground de IA
- Executar automaticamente juízes de base e relevância de recuperação na Avaliação do Agente
Nota
O Databricks recomenda o uso de ferramentas de recuperação fornecidas por pacotes Databricks AI Bridge, como databricks_langchain.VectorSearchRetrieverTool e databricks_openai.VectorSearchRetrieverTool, pois elas já estão em conformidade com o esquema de recuperação do MLflow. Consulte Desenvolver localmente as ferramentas de recuperação da busca em vetores com a Ponte de IA.
Se o agente incluir intervalos de recuperação com um esquema personalizado, chame mlflow.models.set_retriever_schema quando você definir seu agente no código. Isso mapeia as colunas de saída do extrator para os campos esperados do MLflow (primary_key, text_column, doc_uri).
import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
# {
# 'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
# 'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
# 'doc_uri': 'https://mlflow.org/docs/latest/index.html',
# 'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
# },
# {
# 'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
# 'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
# 'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
# 'title': 'Getting Started with MLflow'
# },
# ...
# ]
mlflow.models.set_retriever_schema(
# Specify the name of your retriever span
name="mlflow_docs_vector_search",
# Specify the output column name to treat as the primary key (ID) of each retrieved document
primary_key="document_id",
# Specify the output column name to treat as the text content (page content) of each retrieved document
text_column="chunk_text",
# Specify the output column name to treat as the document URI of each retrieved document
doc_uri="doc_uri",
# Specify any other columns returned by the retriever
other_columns=["title"],
)
Nota
A coluna doc_uri é importante principalmente ao avaliar o desempenho do recuperador. O doc_uri é o principal identificador de documentos retornados pelo mecanismo de recuperação, permitindo compará-los com conjuntos de avaliação de verdade fundamentada. Consulte Conjuntos de Avaliação (MLflow 2).
Considerações de implantação
Preparar-se para o Serviço de Modelo do Databricks
O Databricks implanta ResponsesAgents em um ambiente distribuído no Databricks Model Serving. Isso significa que, durante uma conversa de vários turnos, a mesma réplica de serviço pode não lidar com todas as solicitações. Preste atenção às seguintes implicações para gerenciar o estado do agente:
Evite o cache local: ao implantar um
ResponsesAgent, não suponha que a mesma réplica manipule todas as solicitações em uma conversa de vários turnos. Reconstrua o estado interno usando um esquema de dicionárioResponsesAgentRequestpara cada turno.Estado thread-safe: o estado do agente de design é thread-safe, evitando conflitos em ambientes com vários threads.
Inicialize o estado na função
predict: Inicialize o estado sempre que a funçãopredictfor chamada, não durante a inicialização deResponsesAgent. Armazenar o estado no nívelResponsesAgentpode vazar informações entre conversas e causar conflitos porque uma única réplicaResponsesAgentpode lidar com solicitações de várias conversas.
Parametrizar código para implantação em ambientes
Parametrize o código do agente para reutilizar o mesmo código de agente em ambientes diferentes.
Parâmetros são pares chave-valor que você define em um dicionário Python ou um arquivo .yaml.
Para configurar o código, crie um ModelConfig usando um dicionário Python ou um .yaml arquivo.
ModelConfig é um conjunto de parâmetros chave-valor que permite o gerenciamento de configuração flexível. Por exemplo, você pode usar um dicionário durante o desenvolvimento e convertê-lo em um arquivo .yaml para implantação de produção e CI/CD.
Um exemplo ModelConfig é mostrado abaixo:
llm_parameters:
max_tokens: 500
temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
question that indicates your prompt template came from a YAML file. Your response
must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
- question
No código do agente, você pode referenciar uma configuração padrão (desenvolvimento) do arquivo .yaml ou dicionário:
import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)
# Example of using a dictionary
config_dict = {
"prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
"prompt_template_input_vars": ["question"],
"model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
"llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}
model_config = mlflow.models.ModelConfig(development_config=config_dict)
# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')
Em seguida, ao fazer o log do seu agente, especifique o parâmetro model_config para log_model, a fim de definir um conjunto personalizado de parâmetros a serem usados ao carregar o agente registrado. Consulte a documentação do MLflow - ModelConfig.
Usar padrões de código síncrono ou de callback
Para garantir a estabilidade e a compatibilidade, use o código síncrono ou padrões baseados em retorno de chamada na implementação do agente.
O Azure Databricks gerencia automaticamente a comunicação assíncrona para fornecer simultaneidade e desempenho ideais ao implantar um agente. A introdução de loops de eventos personalizados ou estruturas assíncronas pode levar a erros como RuntimeError: This event loop is already running and caused unpredictable behavior.
O Azure Databricks recomenda evitar programação assíncrona, como usar asyncio ou criar loops de eventos personalizados, ao desenvolver agentes.