使用 Databricks 托管 MCP 服务器

Important

此功能在 Beta 版中。

Databricks 托管 MCP 服务器是随时可用的服务器,可将 AI 代理连接到 Unity 目录、Databricks 矢量搜索索引、Genie 空间和自定义函数中存储的数据。

可用的托管服务器

若要查看 MCP 服务器及其终结点 URL,请转到工作区 >代理>MCP 服务器

“代理 MCP 服务器”选项卡

Databricks 提供以下现装的 MCP 服务器。

MCP 服务器 Description URL 模式
矢量搜索 查询 矢量搜索索引 以查找相关文档。 仅支持由 Databricks 托管嵌入的索引。 https://<workspace-hostname>/api/2.0/mcp/vector-search/{catalog}/{schema}
Genie 空间 使用自然语言查询 Genie 空间 以分析结构化数据。 将外部 AI 助手(如 Claude 或 ChatGPT)连接到数据进行只读操作时,请使用 Genie MCP 服务器。
注意: Genie 的托管 MCP 服务器调用 Genie 作为 MCP 工具,这意味着在调用 Genie API 时不会传递历史记录。 或者,可以在 多代理系统中使用 Genie
https://<workspace-hostname>/api/2.0/mcp/genie/{genie_space_id}
Unity Catalog 函数 使用 Unity 目录函数 运行预定义的 SQL 查询。 https://<workspace-hostname>/api/2.0/mcp/functions/{catalog}/{schema}
DBSQL 运行 AI 生成的 SQL。 使用 DBSQL MCP 服务器通过 AI 编码工具(Claude Code、Cursor、Codex 等)创作数据管道。 对于只读数据检索和聊天机器人集成,请改用 Genie。 https://<workspace-hostname>/api/2.0/mcp/sql

示例方案

请考虑一个用于客户支持的自定义代理。 可以将它连接到多个托管 MCP 服务器:

  • 矢量搜索https://<workspace-hostname>/api/2.0/mcp/vector-search/prod/customer_support
    • 搜索支持票证和文档
  • Genie 空间https://<workspace-hostname>/api/2.0/mcp/genie/{billing_space_id}
    • 查询计费数据和客户信息
  • UC 函数https://<workspace-hostname>/api/2.0/mcp/functions/prod/billing
    • 为帐户查找和更新运行自定义函数

这使代理能够访问非结构化数据(支持票证)、结构化数据(计费表)和自定义业务逻辑。

示例笔记本:使用 Databricks MCP 服务器生成代理

以下笔记本演示如何创作使用 Databricks 托管 MCP 服务器调用 MCP 工具的 LangGraph 和 OpenAI 代理。

LangGraph MCP 工具调用代理

获取笔记本

OpenAI MCP 工具调用代理

获取笔记本

从本地环境生成代理

连接到 Databricks 上的 MCP 服务器类似于连接到任何其他远程 MCP 服务器。 可以使用标准 SDK(如 MCP Python SDK)连接到服务器。 主要区别是,默认情况下 Databricks MCP 服务器是安全的,并要求客户端指定身份验证。

databricks-mcp Python 库有助于简化自定义代理代码中的身份验证。

开发代理代码的最简单方法是在本地运行它,并向工作区进行身份验证。 按照以下步骤生成连接到 Databricks MCP 服务器的 AI 代理。

配置你的环境

  1. 使用 OAuth 向工作区进行身份验证。 在本地终端中运行以下命令:

    databricks auth login --host https://<your-workspace-hostname>
    
  2. 出现提示时,请重新创建配置文件名称,并记下该名称供以后使用。 默认配置文件名称为 DEFAULT。

  3. 确保具有 Python 3.12 或更高版本的本地环境,然后安装依赖项:

    pip install -U "mcp>=1.9" "databricks-sdk[openai]" "mlflow>=3.1.0" "databricks-agents>=1.0.0" "databricks-mcp"
    

测试本地环境连接

必须在工作区中启用无服务器计算才能运行此代码片段。

通过列出 Unity 目录工具并运行 内置的 Python 代码解释器工具来验证与 MCP 服务器的连接。

  1. 运行以下代码来验证与 MCP 服务器的连接:
from databricks_mcp import DatabricksMCPClient
from databricks.sdk import WorkspaceClient

# TODO: Update to the Databricks CLI profile name you specified when
# configuring authentication to the workspace.
databricks_cli_profile = "YOUR_DATABRICKS_CLI_PROFILE"
assert (
    databricks_cli_profile != "YOUR_DATABRICKS_CLI_PROFILE"
), "Set databricks_cli_profile to the Databricks CLI profile name you specified when configuring authentication to the workspace"
workspace_client = WorkspaceClient(profile=databricks_cli_profile)
workspace_hostname = workspace_client.config.host
mcp_server_url = f"{workspace_hostname}/api/2.0/mcp/functions/system/ai"

# This code uses the Unity Catalog functions MCP server to expose built-in
# AI tools under `system.ai`, like the `system.ai.python_exec` code interpreter tool
def test_connect_to_server():
    mcp_client = DatabricksMCPClient(server_url=mcp_server_url, workspace_client=workspace_client)
    tools = mcp_client.list_tools()

    print(
        f"Discovered tools {[t.name for t in tools]} "
        f"from MCP server {mcp_server_url}"
    )

    result = mcp_client.call_tool(
        "system__ai__python_exec", {"code": "print('Hello, world!')"}
    )
    print(
        f"Called system__ai__python_exec tool and got result "
        f"{result.content}"
    )


if __name__ == "__main__":
    test_connect_to_server()

创建代理

  1. 在上述代码的基础上,定义一个使用工具的基本单轮代理。 将本地保存的代理代码命名为文件mcp_agent.py:

     import json
     import uuid
     import asyncio
     from typing import Any, Callable, List
     from pydantic import BaseModel
    
     import mlflow
     from mlflow.pyfunc import ResponsesAgent
     from mlflow.types.responses import ResponsesAgentRequest, ResponsesAgentResponse
    
     from databricks_mcp import DatabricksMCPClient
     from databricks.sdk import WorkspaceClient
    
     # 1) CONFIGURE YOUR ENDPOINTS/PROFILE
     LLM_ENDPOINT_NAME = "databricks-claude-sonnet-4-5"
     SYSTEM_PROMPT = "You are a helpful assistant."
     DATABRICKS_CLI_PROFILE = "YOUR_DATABRICKS_CLI_PROFILE"
     assert (
         DATABRICKS_CLI_PROFILE != "YOUR_DATABRICKS_CLI_PROFILE"
     ), "Set DATABRICKS_CLI_PROFILE to the Databricks CLI profile name you specified when configuring authentication to the workspace"
     workspace_client = WorkspaceClient(profile=DATABRICKS_CLI_PROFILE)
     host = workspace_client.config.host
     # Add more MCP server URLs here if desired, for example:
     # f"{host}/api/2.0/mcp/vector-search/prod/billing"
     # to include vector search indexes under the prod.billing schema, or
     # f"{host}/api/2.0/mcp/genie/<genie_space_id>"
     # to include a Genie space
     MANAGED_MCP_SERVER_URLS = [
         f"{host}/api/2.0/mcp/functions/system/ai",
     ]
     # Add Custom MCP Servers hosted on Databricks Apps
     CUSTOM_MCP_SERVER_URLS = []
    
    
    
     # 2) HELPER: convert between ResponsesAgent “message dict” and ChatCompletions format
     def _to_chat_messages(msg: dict[str, Any]) -> List[dict]:
         """
         Take a single ResponsesAgent‐style dict and turn it into one or more
         ChatCompletions‐compatible dict entries.
         """
         msg_type = msg.get("type")
         if msg_type == "function_call":
             return [
                 {
                     "role": "assistant",
                     "content": None,
                     "tool_calls": [
                         {
                             "id": msg["call_id"],
                             "type": "function",
                             "function": {
                                 "name": msg["name"],
                                 "arguments": msg["arguments"],
                             },
                         }
                     ],
                 }
             ]
         elif msg_type == "message" and isinstance(msg["content"], list):
             return [
                 {
                     "role": "assistant" if msg["role"] == "assistant" else msg["role"],
                     "content": content["text"],
                 }
                 for content in msg["content"]
             ]
         elif msg_type == "function_call_output":
             return [
                 {
                     "role": "tool",
                     "content": msg["output"],
                     "tool_call_id": msg["tool_call_id"],
                 }
             ]
         else:
             # fallback for plain {"role": ..., "content": "..."} or similar
             return [
                 {
                     k: v
                     for k, v in msg.items()
                     if k in ("role", "content", "name", "tool_calls", "tool_call_id")
                 }
             ]
    
    
     # 3) “MCP SESSION” + TOOL‐INVOCATION LOGIC
     def _make_exec_fn(
         server_url: str, tool_name: str, ws: WorkspaceClient
     ) -> Callable[..., str]:
         def exec_fn(**kwargs):
             mcp_client = DatabricksMCPClient(server_url=server_url, workspace_client=ws)
             response = mcp_client.call_tool(tool_name, kwargs)
             return "".join([c.text for c in response.content])
    
         return exec_fn
    
    
     class ToolInfo(BaseModel):
         name: str
         spec: dict
         exec_fn: Callable
    
    
     def _fetch_tool_infos(ws: WorkspaceClient, server_url: str) -> List[ToolInfo]:
         print(f"Listing tools from MCP server {server_url}")
         infos: List[ToolInfo] = []
         mcp_client = DatabricksMCPClient(server_url=server_url, workspace_client=ws)
         mcp_tools = mcp_client.list_tools()
         for t in mcp_tools:
             schema = t.inputSchema.copy()
             if "properties" not in schema:
                 schema["properties"] = {}
             spec = {
                 "type": "function",
                 "function": {
                     "name": t.name,
                     "description": t.description,
                     "parameters": schema,
                 },
             }
             infos.append(
                 ToolInfo(
                     name=t.name, spec=spec, exec_fn=_make_exec_fn(server_url, t.name, ws)
                 )
             )
         return infos
    
    
     # 4) “SINGLE‐TURN” AGENT CLASS
     class SingleTurnMCPAgent(ResponsesAgent):
         def _call_llm(self, history: List[dict], ws: WorkspaceClient, tool_infos):
             """
             Send current history → LLM, returning the raw response dict.
             """
             client = ws.serving_endpoints.get_open_ai_client()
             flat_msgs = []
             for msg in history:
                 flat_msgs.extend(_to_chat_messages(msg))
             return client.chat.completions.create(
                 model=LLM_ENDPOINT_NAME,
                 messages=flat_msgs,
                 tools=[ti.spec for ti in tool_infos],
             )
    
         def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
             ws = WorkspaceClient(profile=DATABRICKS_CLI_PROFILE)
    
             # 1) build initial history: system + user
             history: List[dict] = [{"role": "system", "content": SYSTEM_PROMPT}]
             for inp in request.input:
                 history.append(inp.model_dump())
    
             # 2) call LLM once
             tool_infos = [
                 tool_info
                 for mcp_server_url in (MANAGED_MCP_SERVER_URLS + CUSTOM_MCP_SERVER_URLS)
                 for tool_info in _fetch_tool_infos(ws, mcp_server_url)
             ]
             tools_dict = {tool_info.name: tool_info for tool_info in tool_infos}
             llm_resp = self._call_llm(history, ws, tool_infos)
             raw_choice = llm_resp.choices[0].message.to_dict()
             raw_choice["id"] = uuid.uuid4().hex
             history.append(raw_choice)
    
             tool_calls = raw_choice.get("tool_calls") or []
             if tool_calls:
                 # (we only support a single tool in this “single‐turn” example)
                 fc = tool_calls[0]
                 name = fc["function"]["name"]
                 args = json.loads(fc["function"]["arguments"])
                 try:
                     tool_info = tools_dict[name]
                     result = tool_info.exec_fn(**args)
                 except Exception as e:
                     result = f"Error invoking {name}: {e}"
    
                 # 4) append the “tool” output
                 history.append(
                     {
                         "type": "function_call_output",
                         "role": "tool",
                         "id": uuid.uuid4().hex,
                         "tool_call_id": fc["id"],
                         "output": result,
                     }
                 )
    
                 # 5) call LLM a second time and treat that reply as final
                 followup = (
                     self._call_llm(history, ws, tool_infos=[]).choices[0].message.to_dict()
                 )
                 followup["id"] = uuid.uuid4().hex
    
                 assistant_text = followup.get("content", "")
                 return ResponsesAgentResponse(
                     output=[
                         {
                             "id": uuid.uuid4().hex,
                             "type": "message",
                             "role": "assistant",
                             "content": [{"type": "output_text", "text": assistant_text}],
                         }
                     ],
                     custom_outputs=request.custom_inputs,
                 )
    
             # 6) if no tool_calls at all, return the assistant’s original reply
             assistant_text = raw_choice.get("content", "")
             return ResponsesAgentResponse(
                 output=[
                     {
                         "id": uuid.uuid4().hex,
                         "type": "message",
                         "role": "assistant",
                         "content": [{"type": "output_text", "text": assistant_text}],
                     }
                 ],
                 custom_outputs=request.custom_inputs,
             )
    
    
     mlflow.models.set_model(SingleTurnMCPAgent())
    
     if __name__ == "__main__":
         req = ResponsesAgentRequest(
             input=[{"role": "user", "content": "What's the 100th Fibonacci number?"}]
         )
         resp = SingleTurnMCPAgent().predict(req)
         for item in resp.output:
             print(item)
    
    

部署您的代理

准备好部署连接到托管 MCP 服务器的代理时,请参阅 为生成式 AI 应用程序部署代理

指定代理在日志记录时所需的所有资源。 请参阅 Databricks 资源的身份验证

例如,如果代理使用下面列出的 MCP 服务器 URL,则必须在prod.customer_supportprod.billing两个架构中指定所有矢量搜索索引。 您还必须在 prod.billing 中指定所有 Unity Catalog 函数。

  • https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/customer_support
  • https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/billing
  • https://<your-workspace-hostname>/api/2.0/mcp/functions/prod/billing

如果您的代理连接到 Databricks 上的 MCP 服务器以发现和运行工具,请使用代理记录这些 MCP 服务器所需的资源。 Databricks 建议安装 databricks-mcp PyPI 包以简化此过程。

特别是,如果使用 托管 MCP 服务器,则可以用于 databricks_mcp.DatabricksMCPClient().get_databricks_resources(<server_url>) 检索托管 MCP 服务器所需的资源。 如果代理查询 Databricks 应用上托管的自定义 MCP 服务器,则可以在记录模型时将服务器显式包括为资源来配置授权。

例如,若要部署上面定义的代理,请运行以下代码,假设你保存了代理代码定义:mcp_agent.py

import os
from databricks.sdk import WorkspaceClient
from databricks import agents
import mlflow
from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint, DatabricksVectorSearchIndex
from mcp_agent import LLM_ENDPOINT_NAME
from databricks_mcp import DatabricksMCPClient

# TODO: Update this to your Databricks CLI profile name
databricks_cli_profile = "YOUR_DATABRICKS_CLI_PROFILE"
assert (
    databricks_cli_profile != "YOUR_DATABRICKS_CLI_PROFILE"
), "Set databricks_cli_profile to the Databricks CLI profile name you specified when configuring authentication to the workspace"
workspace_client = WorkspaceClient(profile=databricks_cli_profile)


# Configure MLflow and the Databricks SDK to use your Databricks CLI profile
current_user = workspace_client.current_user.me().user_name
mlflow.set_tracking_uri(f"databricks://{databricks_cli_profile}")
mlflow.set_registry_uri(f"databricks-uc://{databricks_cli_profile}")
mlflow.set_experiment(f"/Users/{current_user}/databricks_docs_example_mcp_agent")
os.environ["DATABRICKS_CONFIG_PROFILE"] = databricks_cli_profile

MANAGED_MCP_SERVER_URLS = [
    f"{host}/api/2.0/mcp/functions/system/ai",
]
# Log the agent defined in mcp_agent.py
here = os.path.dirname(os.path.abspath(__file__))
agent_script = os.path.join(here, "mcp_agent.py")
resources = [
    DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
    DatabricksFunction("system.ai.python_exec"),
    # --- Uncomment and edit the following lines to include custom mcp servers hosted on Databricks Apps ---
    # DatabricksApp(app_name="app-name")
]

for mcp_server_url in MANAGED_MCP_SERVER_URLS:
    mcp_client = DatabricksMCPClient(server_url=mcp_server_url, workspace_client=workspace_client)
    resources.extend(mcp_client.get_databricks_resources())

with mlflow.start_run():
    logged_model_info = mlflow.pyfunc.log_model(
        artifact_path="mcp_agent",
        python_model=agent_script,
        resources=resources,
    )

# TODO Specify your UC model name here
UC_MODEL_NAME = "main.default.databricks_docs_mcp_agent"
registered_model = mlflow.register_model(logged_model_info.model_uri, UC_MODEL_NAME)

agents.deploy(
    model_name=UC_MODEL_NAME,
    model_version=registered_model.version,
)

后续步骤