Compartilhar via


Programar agentes de IA em código

Esta página mostra como criar um agente de IA no Python usando o Mosaic AI Agent Framework e bibliotecas populares de criação de agente, como LangGraph e OpenAI.

Requisitos

Dica

O Databricks recomenda instalar a versão mais recente do cliente Python do MLflow ao desenvolver agentes.

Para criar e implantar agentes usando a abordagem nesta página, instale o seguinte:

  • databricks-agents 1.2.0 ou superior
  • mlflow 3.1.3 ou superior
  • Python 3.10 ou superior.
    • Use a computação sem servidor ou o Databricks Runtime 13.3 LTS ou superior para atender a esse requisito.
%pip install -U -qqqq databricks-agents mlflow

O Databricks também recomenda a instalação de pacotes de integração da Ponte de IA do Databricks para criar agentes. Esses pacotes de integração fornecem uma camada compartilhada de APIs que interagem com os recursos de IA do Databricks, como o Genie de IA/BI do Databricks e o Vector Search, entre estruturas de criação de agente e SDKs.

OpenAI

%pip install -U -qqqq databricks-openai

LangChain/LangGraph

%pip install -U -qqqq databricks-langchain

DSPy

%pip install -U -qqqq databricks-dspy

Agentes puros do Python

%pip install -U -qqqq databricks-ai-bridge

Usar ResponsesAgent para criar agentes

O Databricks recomenda a interface ResponsesAgent MLflow para criar agentes de nível de produção. ResponsesAgent permite criar agentes com qualquer estrutura de terceiros e integrá-la aos recursos de IA do Databricks para recursos robustos de registro em log, rastreamento, avaliação, implantação e monitoramento.

O ResponsesAgent esquema é compatível com o esquema OpenAI Responses . Para saber mais sobre o OpenAI Responses, consulte OpenAI: Responses vs. ChatCompletion.

Nota

A interface mais antiga ChatAgent ainda tem suporte no Databricks. No entanto, para novos agentes, o Databricks recomenda usar a versão mais recente do MLflow e da ResponsesAgent interface.

Consulte o esquema de agente de entrada e saída herdado.

ResponsesAgent encapsula facilmente agentes existentes para compatibilidade do Databricks.

ResponsesAgent fornece os seguintes benefícios:

  • Funcionalidades avançadas do agente

    • Suporte a vários agentes
    • Saída de streaming: transmita a saída em partes menores.
    • Histórico abrangente de chamada de mensagens de ferramentas: Retorne várias mensagens, incluindo as intermediárias de chamada de ferramentas, para melhorar a qualidade e o gerenciamento da conversa.
    • Suporte à confirmação de chamada de ferramenta
    • Suporte à ferramenta de execução longa
  • Desenvolvimento, implantação e monitoramento simplificados

    • Agentes de criação usando qualquer estrutura: encapsular qualquer agente existente usando a ResponsesAgent interface para obter compatibilidade pronta para uso com o Playground de IA, a Avaliação do Agente e o Monitoramento do Agente.
    • Interfaces de criação tipadas: escrever código do agente usando classes Python tipadas, beneficiando-se do preenchimento automático do IDE e do notebook.
    • Inferência automática de assinatura: o MLflow infere automaticamente assinaturas ResponsesAgent ao registrar um agente em log, simplificando o registro e a implantação. Confira Inferir assinatura de modelo durante o registro em log.
    • Rastreamento automático: o MLflow rastreia automaticamente suas predict funções e predict_stream funções, agregando respostas transmitidas para facilitar a avaliação e a exibição.
    • tabelas de inferência aprimoradas pelo Gateway de IA: as tabelas de inferência do Gateway de IA são habilitadas automaticamente para agentes implantados, fornecendo acesso a metadados detalhados do log de solicitações.

Para saber como criar um ResponsesAgent, consulte os exemplos na seção a seguir e a documentação do MLflow – ResponsesAgent for Model Serving.

ResponsesAgent Exemplos

Os blocos de anotações a seguir mostram como criar streaming e não streaming ResponsesAgent usando bibliotecas populares. Para saber como expandir os recursos desses agentes, consulte as ferramentas do agente de IA.

OpenAI

Agente de chat simples do OpenAI usando modelos hospedados pelo Databricks

Obter notebook

Agente de chamada de ferramentas openAI usando modelos hospedados pelo Databricks

Obter notebook

Agente de chamada de ferramenta OpenAI usando modelos hospedados pela OpenAI

Obter notebook

LangGraph

Agente de chamada de ferramentas do LangGraph

Obter notebook

DSPy

Agente de chamada de ferramenta de ciclo único DSPy

Obter notebook

Exemplo de vários agentes

Para saber como criar um sistema de vários agentes, consulte Usar o Genie em sistemas de vários agentes.

Exemplo de agente com estado

Para saber como criar agentes com estado com memória de curto e longo prazo usando o Lakebase como um repositório de memória, consulte a memória do agente de IA.

Exemplo de agente não conversacional

Ao contrário dos agentes de conversação que gerenciam diálogos de vários turnos, os agentes não conversacionais se concentram na execução de tarefas bem definidas com eficiência. Essa arquitetura simplificada permite maior taxa de transferência para solicitações independentes.

Para saber como criar um agente não conversacional, consulte agentes de IA não conversacionais usando o MLflow.

E se eu já tiver um agente?

Se você já tiver um agente criado com LangChain, LangGraph ou uma estrutura semelhante, você não precisará reescrever seu agente para usá-lo no Databricks. Em vez disso, basta encapsular seu agente existente com a interface MLflow ResponsesAgent :

  1. Escreva uma classe de wrapper do Python que herda de mlflow.pyfunc.ResponsesAgent.

    Dentro da classe wrapper, faça referência ao agente existente como um atributo self.agent = your_existing_agent.

  2. A ResponsesAgent classe requer a implementação de um predict método que retorna um ResponsesAgentResponse para lidar com solicitações que não são de streaming. Veja a seguir um exemplo do ResponsesAgentResponses esquema:

    import uuid
    # input as a dict
    {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]}
    
    # output example
    ResponsesAgentResponse(
        output=[
            {
                "type": "message",
                "id": str(uuid.uuid4()),
                "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}],
                "role": "assistant",
            },
        ]
    )
    
  3. predict Na função, converta as mensagens de entrada no ResponsesAgentRequest formato que o agente espera. Depois que o agente gerar uma resposta, converta sua saída em um ResponsesAgentResponse objeto.

Confira os seguintes exemplos de código para ver como converter agentes existentes em ResponsesAgent:

Conversão básica

Para agentes que não são de streaming, converta entradas e saídas na predict função.

from uuid import uuid4

from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
)


class MyWrappedAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # Call your existing agent (non-streaming)
        agent_response = self.agent.invoke(messages)

        # Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
        output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)

        # Return the response
        return ResponsesAgentResponse(output=[output_item])

Streaming com reutilção de código

Para agentes de streaming, você pode ser inteligente e reutilizar a lógica para evitar duplicar o código que converte mensagens:

from typing import Generator
from uuid import uuid4

from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)


class MyWrappedStreamingAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming predict: collects all streaming chunks into a single response."""
        # Reuse the streaming logic and collect all output items
        output_items = []
        for stream_event in self.predict_stream(request):
            if stream_event.type == "response.output_item.done":
                output_items.append(stream_event.item)

        # Return all collected items as a single response
        return ResponsesAgentResponse(output=output_items)

    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming predict: the core logic that both methods use."""
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # Stream from your existing agent
        item_id = str(uuid4())
        aggregated_stream = ""
        for chunk in self.agent.stream(messages):
            # Convert each chunk to ResponsesAgent format
            yield self.create_text_delta(delta=chunk, item_id=item_id)
            aggregated_stream += chunk

        # Emit an aggregated output_item for all the text deltas with id=item_id
        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item(text=aggregated_stream, id=item_id),
        )

Migrar de ChatCompletions

Se o agente existente usar a API de ChatCompletions do OpenAI, você poderá migrá-la ResponsesAgent sem reescrever sua lógica principal. Adicione um wrapper que:

  1. Converte mensagens de entrada ResponsesAgentRequest no ChatCompletions formato esperado pelo agente.
  2. Converte saídas ChatCompletions no ResponsesAgentResponse esquema.
  3. Opcionalmente, dá suporte ao streaming mapeando deltas incrementais de ChatCompletions em ResponsesAgentStreamEvent objetos.
from typing import Generator
from uuid import uuid4

from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)


# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
    def __init__(self):
        self.w = WorkspaceClient()
        self.OpenAI = self.w.serving_endpoints.get_open_ai_client()

    def stream(self, messages):
        for chunk in self.OpenAI.chat.completions.create(
            model="databricks-claude-sonnet-4-5",
            messages=messages,
            stream=True,
        ):
            yield chunk.to_dict()


# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
    def __init__(self, agent):
        # `agent` is your existing ChatCompletions agent
        self.agent = agent

    def prep_msgs_for_llm(self, messages):
        # dummy example of prep_msgs_for_llm
        # real example of prep_msgs_for_llm included in full examples linked below
        return [{"role": "user", "content": "Hello, how are you?"}]

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming predict: collects all streaming chunks into a single response."""
        # Reuse the streaming logic and collect all output items
        output_items = []
        for stream_event in self.predict_stream(request):
            if stream_event.type == "response.output_item.done":
                output_items.append(stream_event.item)

        # Return all collected items as a single response
        return ResponsesAgentResponse(output=output_items)

    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming predict: the core logic that both methods use."""
        # Convert incoming messages to your agent's format
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # process the ChatCompletion output stream
        agent_content = ""
        tool_calls = []
        msg_id = None
        for chunk in self.agent.stream(messages):  # call the underlying agent's stream method
            delta = chunk["choices"][0]["delta"]
            msg_id = chunk.get("id", None)
            content = delta.get("content", None)
            if tc := delta.get("tool_calls"):
                if not tool_calls:  # only accommodate for single tool call right now
                    tool_calls = tc
                else:
                    tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
            elif content is not None:
                agent_content += content
                yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))

        # aggregate the streamed text content
        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item(agent_content, msg_id),
        )

        for tool_call in tool_calls:
            yield ResponsesAgentStreamEvent(
                type="response.output_item.done",
                item=self.create_function_call_item(
                    str(uuid4()),
                    tool_call["id"],
                    tool_call["function"]["name"],
                    tool_call["function"]["arguments"],
                ),
            )


agent = MyWrappedStreamingAgent(LegacyAgent())

for chunk in agent.predict_stream(
    ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
    print(chunk)

Para obter exemplos completos, consulte ResponsesAgent exemplos.

Respostas de streaming

O streaming permite que os agentes enviem respostas em partes em tempo real em vez de aguardar a resposta completa. Para implementar o streaming com ResponsesAgent, emita uma série de eventos delta seguidos por um evento de conclusão final:

  1. Emitir eventos delta: enviar vários output_text.delta eventos com o mesmo item_id para transmitir partes de texto em tempo real.
  2. Concluir com o evento concluído: enviar um evento final response.output_item.done com o mesmo item_id que os eventos delta que contêm o texto de saída final completo.

Cada evento delta transmite uma parte do texto para o cliente. O evento final concluído contém o texto de resposta completo e sinaliza o Databricks para fazer o seguinte:

  • Rastrear a saída do agente com o rastreamento do MLflow
  • Agregar respostas transmitidas em tabelas de inferência do Gateway de IA
  • Mostrar a saída completa na interface do usuário do AI Playground

Propagação de erro na transmissão

A IA do Mosaico propaga todos os erros encontrados durante o streaming com o último token em databricks_output.error. Cabe ao cliente de chamada tratar e exibir corretamente esse erro.

{
  "delta": …,
  "databricks_output": {
    "trace": {...},
    "error": {
      "error_code": BAD_REQUEST,
      "message": "TimeoutException: Tool XYZ failed to execute."
    }
  }
}

Recursos avançados

entradas e saídas personalizadas

Alguns cenários podem exigir entradas adicionais do agente, como e client_typeou saídas, como session_id links de origem de recuperação que não devem ser incluídos no histórico de chat para interações futuras.

Para esses cenários, o MLflow ResponsesAgent dá suporte nativo aos campos custom_inputs e custom_outputs. Você pode acessar as entradas personalizadas por meio request.custom_inputs de todos os exemplos vinculados acima em Exemplos responsesAgent.

Aviso

O aplicativo de revisão de Avaliação do Agente não dá suporte à renderização de rastreamentos para agentes com campos de entrada adicionais.

Consulte os blocos de anotações a seguir para saber como definir entradas e saídas personalizadas.

Fornecer custom_inputs no Playground de IA e examinar o aplicativo

Se o agente aceitar entradas adicionais usando o custom_inputs campo, você poderá fornecer manualmente essas entradas no AI Playground e no aplicativo de revisão.

  1. No Playground de IA ou no Aplicativo de Revisão do Agente, selecione o ícone de engrenagem Ícone de engrenagem..

  2. Habilitar custom_inputs.

  3. Forneça um objeto JSON que corresponda ao esquema de entrada definido do agente.

    Fornecer custom_inputs no playground de IA.

Especificar esquemas de recuperador personalizados

Os agentes de IA geralmente usam recuperadores para localizar e consultar dados não estruturados de índices de pesquisa de vetor. Por exemplo, veja ferramentas de recuperação; consulte Construção e rastreamento de ferramentas de recuperação para dados não estruturados.

Rastreie esses recuperadores em seu agente com intervalos MLflow RETRIEVER para habilitar recursos de produtos do Databricks, incluindo:

  • Exibir automaticamente links para documentos de origem recuperados na interface do Playground de IA
  • Executar automaticamente juízes de base e relevância de recuperação na Avaliação do Agente

Nota

O Databricks recomenda o uso de ferramentas de recuperação fornecidas por pacotes Databricks AI Bridge, como databricks_langchain.VectorSearchRetrieverTool e databricks_openai.VectorSearchRetrieverTool, pois elas já estão em conformidade com o esquema de recuperação do MLflow. Consulte Desenvolver localmente as ferramentas de recuperação da busca em vetores com a Ponte de IA.

Se o agente incluir intervalos de recuperação com um esquema personalizado, chame mlflow.models.set_retriever_schema quando você definir seu agente no código. Isso mapeia as colunas de saída do extrator para os campos esperados do MLflow (primary_key, text_column, doc_uri).

import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
#     {
#         'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
#         'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
#         'doc_uri': 'https://mlflow.org/docs/latest/index.html',
#         'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
#     },
#     {
#         'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
#         'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
#         'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
#         'title': 'Getting Started with MLflow'
#     },
# ...
# ]
mlflow.models.set_retriever_schema(
    # Specify the name of your retriever span
    name="mlflow_docs_vector_search",
    # Specify the output column name to treat as the primary key (ID) of each retrieved document
    primary_key="document_id",
    # Specify the output column name to treat as the text content (page content) of each retrieved document
    text_column="chunk_text",
    # Specify the output column name to treat as the document URI of each retrieved document
    doc_uri="doc_uri",
    # Specify any other columns returned by the retriever
    other_columns=["title"],
)

Nota

A coluna doc_uri é importante principalmente ao avaliar o desempenho do recuperador. O doc_uri é o principal identificador de documentos retornados pelo mecanismo de recuperação, permitindo compará-los com conjuntos de avaliação de verdade fundamentada. Consulte Conjuntos de Avaliação (MLflow 2).

Considerações de implantação

Preparar-se para o Serviço de Modelo do Databricks

O Databricks implanta ResponsesAgents em um ambiente distribuído no Databricks Model Serving. Isso significa que, durante uma conversa de vários turnos, a mesma réplica de serviço pode não lidar com todas as solicitações. Preste atenção às seguintes implicações para gerenciar o estado do agente:

  • Evite o cache local: ao implantar um ResponsesAgent, não suponha que a mesma réplica manipule todas as solicitações em uma conversa de vários turnos. Reconstrua o estado interno usando um esquema de dicionário ResponsesAgentRequest para cada turno.

  • Estado thread-safe: o estado do agente de design é thread-safe, evitando conflitos em ambientes com vários threads.

  • Inicialize o estado na função predict: Inicialize o estado sempre que a função predict for chamada, não durante a inicialização de ResponsesAgent. Armazenar o estado no nível ResponsesAgent pode vazar informações entre conversas e causar conflitos porque uma única réplica ResponsesAgent pode lidar com solicitações de várias conversas.

Parametrizar código para implantação em ambientes

Parametrize o código do agente para reutilizar o mesmo código de agente em ambientes diferentes.

Parâmetros são pares chave-valor que você define em um dicionário Python ou um arquivo .yaml.

Para configurar o código, crie um ModelConfig usando um dicionário Python ou um .yaml arquivo. ModelConfig é um conjunto de parâmetros chave-valor que permite o gerenciamento de configuração flexível. Por exemplo, você pode usar um dicionário durante o desenvolvimento e convertê-lo em um arquivo .yaml para implantação de produção e CI/CD.

Um exemplo ModelConfig é mostrado abaixo:

llm_parameters:
  max_tokens: 500
  temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
  question that indicates your prompt template came from a YAML file. Your response
  must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
  - question

No código do agente, você pode referenciar uma configuração padrão (desenvolvimento) do arquivo .yaml ou dicionário:

import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)

# Example of using a dictionary
config_dict = {
    "prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
    "prompt_template_input_vars": ["question"],
    "model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
    "llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}

model_config = mlflow.models.ModelConfig(development_config=config_dict)

# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')

Em seguida, ao fazer o log do seu agente, especifique o parâmetro model_config para log_model, a fim de definir um conjunto personalizado de parâmetros a serem usados ao carregar o agente registrado. Consulte a documentação do MLflow - ModelConfig.

Usar padrões de código síncrono ou de callback

Para garantir a estabilidade e a compatibilidade, use o código síncrono ou padrões baseados em retorno de chamada na implementação do agente.

O Azure Databricks gerencia automaticamente a comunicação assíncrona para fornecer simultaneidade e desempenho ideais ao implantar um agente. A introdução de loops de eventos personalizados ou estruturas assíncronas pode levar a erros como RuntimeError: This event loop is already running and caused unpredictable behavior.

O Azure Databricks recomenda evitar programação assíncrona, como usar asyncio ou criar loops de eventos personalizados, ao desenvolver agentes.

Próximas etapas