非会话代理处理结构化输入以生成特定输出,而无需保持会话状态。 每个请求都是独立且自包含的,这使得这些代理理想用于特定任务操作,如文档分类、数据提取、批量分析和结构化问答。
与管理多轮对话的对话代理不同,非对话代理侧重于高效执行定义完善的任务。 这种简化的架构能够为独立请求提供更高的吞吐量。
你将了解如何:
- 创建非对话代理
- 全面实现 MLflow 的跟踪和可观测性
- 将代理部署到 Model Serving,并启用自动跟踪收集。
- 使用 MLflow 3 评估器设置生产监控
要求
依赖项:
- MLflow 3.2.0 或更高版本
- databricks-agents 1.2.0 或更高版本
- 用于 LLM 集成的 databricks-sdk[openai]
- Python 3.10 或更高版本
工作区访问权限:
- 访问基础模型 API(默认值:Claude 3.7 Sonnet,可配置)
- 访问用于注册 AI 模型的目录和架构
%pip install --upgrade mlflow[databricks]==3.6.0 pydantic databricks-sdk[openai] databricks-agents databricks-sdk
%restart_python
示例方案
此示例中的代理会处理有关财务文档内容的结构化问题,并提供附理由的“是/否”答案。 用户直接在输入中提供文档文本和问题,无需在此简化的示例中使用矢量搜索基础结构。 这演示了非会话代理如何在没有会话上下文的情况下处理定义良好的任务。
可以通过集成其他工具和功能,为生产用例扩展此示例。 示例包括用于文档检索的矢量搜索、用于外部集成的 MCP(模型上下文协议)工具,或用于结构化数据访问的 Genie 等其他 Databricks 代理。
设置服务主体
非会话代理不支持自动身份验证传递,无法从模型服务写入跟踪。 相反,必须实现自定义 MLflow 3 跟踪集成,并使用服务主体手动处理身份验证。
# TODO: Configuration constants - Update these for your environment
CATALOG = "main"
SCHEMA = "default" # Replace with your schema name
SECRET_SCOPE = "<YOUR_SECRET_SCOPE>" # Replace with your secret scope name
DATABRICKS_HOST = (
"https://host.databricks.com" # Replace with your workspace URL
)
# TODO: If you have not yet stored your service principal's OAuth client id and client secret as Databricks secrets,
# uncomment the following code and replace the <client_id> and <client_secret> with your service principal's id and secret.
# from databricks.sdk import WorkspaceClient
# w = WorkspaceClient()
# w.secrets.put_secret(SECRET_SCOPE, "client_id", string_value ="<YOUR_SERVICE_PRINCIPAL_CLIENT_ID>")
# w.secrets.put_secret(SECRET_SCOPE, "client_secret", string_value ="<YOUR_SERVICE_PRINCIPAL_CLIENT_SECRET>")
配置 MLflow 实验:
- 如果不存在,创建试验。
- 向试验授予服务主体
CAN_EDIT权限。
# Mlflow experiment to capture traces
EXPERIMENT_NAME = "/Workspace/Shared/non-conversational"
# LLM Configuration
LLM_MODEL = "databricks-claude-3-7-sonnet" # Change this to use different models
# Model and endpoint names - do not need to be changed
MODEL_NAME = "document_analyser"
ENDPOINT_NAME = "document_analyser_agent"
REGISTERED_MODEL_NAME = f"{CATALOG}.{SCHEMA}.{MODEL_NAME}"
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.ml import ExperimentAccessControlRequest
from databricks.sdk.service.iam import PermissionLevel
import mlflow
# Set experiment and get the experiment object directly
experiment = mlflow.set_experiment(EXPERIMENT_NAME)
experiment_id = experiment.experiment_id
# Fetch the service principal client_id from secret scope
client_id = dbutils.secrets.get(scope=SECRET_SCOPE, key="client_id")
# Set permissions for the SPN which will later write the traces from the serving endpoint
w = WorkspaceClient()
# Set CAN_EDIT permissions for the service principal
w.experiments.set_permissions(
experiment_id=experiment_id,
access_control_list=[
ExperimentAccessControlRequest(
service_principal_name=client_id,
permission_level=PermissionLevel.CAN_EDIT
)
]
)
print(f"✓ CAN_EDIT permissions granted to SPN {client_id[:8]}... for experiment: {experiment_id}")
输入和输出格式
与使用灵活聊天消息格式的对话代理不同,非聊天代理需要结构化 Pydantic 模型进行输入和输出:
- 使用任务执行的所有必填字段创建输入架构。
- 在输出架构中包含跟踪元数据 (
trace_id,span_id)以启用反馈日志记录。 - 在适当的时候,设计输出提供详细的推理或思维过程解释。
- 在开发过程中验证架构,以在部署之前捕获错误。
输入格式 (AgentInput)
{
"document_text": "Document content to analyze...",
"questions": [
{ "text": "Do the documents contain a balance sheet?" },
{ "text": "Do the documents contain an income statement?" },
{ "text": "Do the documents contain a cash flow statement?" }
]
}
输出格式 (AgentOutput)
{
"results": [
{
"question_text": "Do the documents contain a balance sheet?",
"answer": "Yes",
"chain_of_thought": "Detailed reasoning for the answer...",
"span_id": "abc123def456"
}
],
"trace_id": "tr-xyz789abc123"
}
- 结构化输入:用户在单个请求中同时提供文档文本和问题
- 详细推理:每个答案都包含分步思考链
-
可跟踪性:响应包括
trace_id和span_id反馈收集
生成非会话代理
使用 MLflow 跟踪创建非会话代理。 代理使用 @mlflow.trace 修饰器 自动捕获 LLM 调用和完整的请求流,实现完全可观测性。
用户直接在输入中提供文档文本和问题。
%%writefile model.py
import json
import logging
from typing import Optional
import uuid
import os
import sys
from databricks.sdk import WorkspaceClient
import mlflow
from mlflow.pyfunc import PythonModel
from mlflow.tracing import set_destination
from mlflow.tracing.destination import Databricks
from mlflow.entities import SpanType
from pydantic import BaseModel, Field
class Question(BaseModel):
"""Represents a question in the input."""
text: str = Field(..., description="Yes/no question about document content")
class AgentInput(BaseModel):
"""Input model for the document analyser agent."""
document_text: str = Field(..., description="The document text to analyze")
questions: list[Question] = Field(..., description="List of yes/no questions")
class Answer(BaseModel):
"""Represents a structured response from the LLM."""
answer: str = Field(..., description="Yes or No answer")
chain_of_thought: str = Field(..., description="Step-by-step reasoning for the answer")
class AnalysisResult(BaseModel):
"""Represents an analysis result in the output."""
question_text: str = Field(..., description="Original question text")
answer: str = Field(..., description="Yes or No answer")
chain_of_thought: str = Field(..., description="Step-by-step reasoning for the answer")
span_id: str | None = Field(None, description="MLflow span ID for this specific answer (None during offline evaluation)")
class AgentOutput(BaseModel):
"""Output model for the document analyser agent."""
results: list[AnalysisResult] = Field(..., description="List of analysis results")
trace_id: str | None = Field(None, description="MLflow trace ID for user feedback collection (None during offline evaluation)")
class DocumentAnalyser(PythonModel):
"""Non-conversational agent for document analysis using MLflow model serving."""
def __init__(self) -> None:
"""Initialize the document analyser.
Sets up logging configuration, initializes model properties, and prepares
the model for serving.
"""
self._setup_logging()
self.model_name = "document_analyser_v1"
self.logger.debug(f"Initialized {self.model_name}")
def _setup_logging(self) -> None:
"""Set up logging configuration for Model Serving.
Configures a logger that uses stderr for better visibility in Model Serving
environments. Log level can be controlled via MODEL_LOG_LEVEL environment
variable (defaults to INFO).
"""
self.logger = logging.getLogger("ModelLogger")
# Set log level from environment variable or default to INFO
log_level = os.getenv("MODEL_LOG_LEVEL", "INFO").upper()
self.logger.setLevel(getattr(logging, log_level, logging.INFO))
if not self.logger.handlers:
handler = logging.StreamHandler()
handler.setLevel(getattr(logging, log_level, logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def load_context(self, context) -> None:
"""Load model context and initialize clients.
This method is called once when the model is loaded in the serving environment.
It sets up MLflow tracing destination, initializes the Databricks workspace
client, and configures the OpenAI-compatible client for LLM inference.
Args:
context: MLflow model context containing artifacts and configuration
"""
self.logger.debug("Loading model context")
set_destination(Databricks(experiment_id=os.getenv("MONITORING_EXPERIMENT_ID")))
self.logger.debug("Instantiate workspace client")
self.w = WorkspaceClient()
# You can load any artifacts here if needed
# self.artifacts = context.artifacts
self.logger.debug("Instantiate openai client")
# Get an OpenAI-compatible client configured for Databricks serving endpoints
self.openai_client = self.w.serving_endpoints.get_open_ai_client()
@mlflow.trace(name="answer_question", span_type=SpanType.LLM)
def answer_question(self, question_text: str, document_text: str) -> tuple[object, str | None]:
"""Answer a question using LLM with structured response format.
Uses the OpenAI-compatible client to call a language model with a structured
JSON response format. The LLM analyzes the provided document text and returns
a yes/no answer with reasoning.
Args:
question_text (str): The yes/no question to answer about the document
document_text (str): The document text to analyze
Returns:
tuple: (openai.ChatCompletion, str|None) - LLM response and span_id
"""
# Create a chat completion request with structured response for questions
question_prompt = f"""
You are a document analysis expert. Answer the following yes/no question based on the provided document.
Question: "{question_text}"
Document:
{document_text}
Analyze the document and provide a structured response.
"""
# Create a separate sub-span for the actual OpenAI API call
llm_response = self._call_openai_completion(question_prompt)
# Get the current span ID for this specific answer
current_span = mlflow.get_current_active_span()
span_id = current_span.span_id if current_span is not None else None
return llm_response, span_id
@mlflow.trace(name="openai_completion", span_type=SpanType.LLM)
def _call_openai_completion(self, prompt: str):
"""Make the actual OpenAI API call with its own sub-span.
Args:
prompt (str): The formatted prompt to send to the LLM
Returns:
OpenAI ChatCompletion response
"""
return self.openai_client.chat.completions.create(
model=os.getenv("LLM_MODEL", "databricks-claude-3-7-sonnet"), # Configurable LLM model
messages=[
{
"role": "user",
"content": prompt
}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": "question_response",
"schema": Answer.model_json_schema()
}
}
)
@mlflow.trace(name="document_analysis")
def predict(self, context, model_input: list[AgentInput]) -> list[AgentOutput]:
"""Process document analysis questions with yes/no answers.
Args:
context: MLflow model context
model_input: List of structured inputs containing document text and questions
Returns:
List of AgentOutput with yes/no answers and reasoning
"""
self.logger.debug(f"Processing {len(model_input)} classification request(s)")
# Get the current trace ID for user feedback collection
# Will be None during offline evaluation when no active span exists
current_span = mlflow.get_current_active_span()
trace_id = current_span.trace_id if current_span is not None else None
results = []
for input_data in model_input:
self.logger.debug(f"Number of questions: {len(input_data.questions)}")
self.logger.debug(f"Document length: {len(input_data.document_text)} characters")
analysis_results = []
for question in input_data.questions:
self.logger.debug(f"Processing question: {question.text}")
# Answer the question using LLM with structured response
llm_response, answer_span_id = self.answer_question(question.text, input_data.document_text)
# Parse structured JSON response
try:
response_data = json.loads(llm_response.choices[0].message.content)
answer_obj = Answer(**response_data)
except Exception as e:
self.logger.debug(f"Failed to parse structured response: {e}")
# Fallback to default response
answer_obj = Answer(
answer="No",
chain_of_thought="Unable to process the question due to parsing error."
)
analysis_results.append(AnalysisResult(
question_text=question.text,
answer=answer_obj.answer,
chain_of_thought=answer_obj.chain_of_thought,
span_id=answer_span_id
))
self.logger.debug(f"Generated {len(analysis_results)} analysis results")
results.append(AgentOutput(
results=analysis_results,
trace_id=trace_id
))
return results
mlflow.models.set_model(DocumentAnalyser())
记录并注册代理
必须先将代理记录到 MLflow 试验并在 Unity 目录中注册,然后才能将代理部署到服务终结点。
import os
import mlflow
import json
from mlflow.pyfunc import PythonModel
from pydantic import BaseModel, Field
from model import DocumentAnalyser, AgentInput, Question
# Create example input for signature inference
def create_example_input() -> AgentInput:
"""Create example input for the non-conversational agent."""
return AgentInput(
document_text="Total assets: $2,300,000. Total liabilities: $1,200,000. Shareholder's equity: $1,100,000. Net income for the period was $450,000. Revenues: $1,700,000. Expenses: $1,250,000. Net cash provided by operating activities: $80,000. Cash flows from investing activities: -$20,000",
questions=[
Question(text="Do the documents contain a balance sheet?"),
Question(text="Do the documents contain an income statement?"),
Question(text="Do the documents contain a cash flow statement?"),
],
)
input_example = create_example_input()
with mlflow.start_run(run_name="deploy_non_conversational_agent"):
active_run = mlflow.active_run()
current_experiment_id = active_run.info.experiment_id
# Set environment variables for the model using current notebook experiment
os.environ["MONITORING_EXPERIMENT_ID"] = current_experiment_id
print(
f"✓ Using current notebook experiment ID for tracing: {current_experiment_id}"
)
# Log the non-conversational agent with auto-inferred dependencies
model_info = mlflow.pyfunc.log_model(
name=MODEL_NAME,
python_model="model.py", # Path to the model code file
input_example=[create_example_input().model_dump()],
registered_model_name=REGISTERED_MODEL_NAME,
)
# Set logged model as current active model to associate it with the below evaluation results
mlflow.set_active_model(model_id=mlflow.last_logged_model().model_id)
print(f"✓ Model logged and registered: {REGISTERED_MODEL_NAME}")
print(f"✓ Model version: {model_info.registered_model_version}")
评估代理
在部署到生产环境之前,请使用 MLflow 的 GenAI 评估框架和预构建的评分器评估代理的性能。 某些记分器需要一个基本事实数据集。
import mlflow
import mlflow.genai.datasets
from requests import HTTPError
# Create an evaluation dataset in Unity Catalog
uc_schema = f"{CATALOG}.{SCHEMA}"
evaluation_dataset_table_name = "document_analyser_eval"
try:
# Try to create a new evaluation dataset
eval_dataset = mlflow.genai.datasets.create_dataset(
uc_table_name=f"{uc_schema}.{evaluation_dataset_table_name}",
)
print(f"✓ Created evaluation dataset: {uc_schema}.{evaluation_dataset_table_name}")
except HTTPError as e:
# Check if it's a TABLE_ALREADY_EXISTS error
if e.response.status_code == 400 and "TABLE_ALREADY_EXISTS" in str(e):
print(
f"Dataset {uc_schema}.{evaluation_dataset_table_name} already exists, loading existing dataset..."
)
eval_dataset = mlflow.genai.datasets.get_dataset(
uc_table_name=f"{uc_schema}.{evaluation_dataset_table_name}"
)
print(
f"✓ Loaded existing evaluation dataset: {uc_schema}.{evaluation_dataset_table_name}"
)
else:
# Different HTTP error, re-raise
raise
# Define comprehensive test cases with expected facts for ground truth comparison
sample_document = "Total assets: $2,300,000. Total liabilities: $1,200,000. Shareholder's equity: $1,100,000. Net income for the period was $450,000. Revenues: $1,700,000. Expenses: $1,250,000. Net cash provided by operating activities: $80,000. Cash flows from investing activities: -$20,000"
evaluation_examples = [
{
"inputs": {
"document_text": sample_document,
"questions": [{"text": "Do the documents contain a balance sheet?"}],
},
"expectations": {
"expected_facts": [
"answer is Yes",
"balance sheet information",
"total assets mentioned",
"total liabilities mentioned",
"shareholder's equity mentioned",
]
},
},
{
"inputs": {
"document_text": sample_document,
"questions": [{"text": "Do the documents contain an income statement?"}],
},
"expectations": {
"expected_facts": [
"answer is Yes",
"income statement information",
"net income mentioned",
"revenues mentioned",
"expenses mentioned",
]
},
},
{
"inputs": {
"document_text": sample_document,
"questions": [{"text": "Do the documents contain a cash flow statement?"}],
},
"expectations": {
"expected_facts": [
"answer is Yes",
"cash flow information",
"operating activities mentioned",
"investing activities mentioned",
"cash flows mentioned",
]
},
},
{
"inputs": {
"document_text": sample_document,
"questions": [
{
"text": "Do the documents contain information about employee benefits?"
}
],
},
"expectations": {
"expected_facts": [
"answer is No",
"no employee benefits information",
"financial statements focus",
"no HR-related content",
]
},
},
]
# Add the examples to the evaluation dataset
eval_dataset.merge_records(evaluation_examples)
print(f"✓ Added {len(evaluation_examples)} records to evaluation dataset")
# Preview the dataset
df = eval_dataset.to_df()
print(f"✓ Dataset preview - Total records: {len(df)}")
df.display()
import warnings
import mlflow
from mlflow.genai.scorers import (
RelevanceToQuery,
Correctness,
Guidelines,
)
# Suppress harmless threadpoolctl warnings that can appear in Databricks environments
warnings.filterwarnings("ignore", message=".*threadpoolctl.*")
warnings.filterwarnings("ignore", category=UserWarning, module="threadpoolctl")
# Load the logged model for evaluation
model_uri = f"models:/{REGISTERED_MODEL_NAME}/{model_info.registered_model_version}"
print(f"Loading model for evaluation: {model_uri}")
# Load the model as a predict function
loaded_model = mlflow.pyfunc.load_model(model_uri)
def my_app(document_text, questions):
"""Wrapper function for the model prediction."""
# The evaluation dataset's inputs field contains {"document_text": "...", "questions": [...]}
# but the predict_fn parameter names must match the keys in inputs
input_data = {"document_text": document_text, "questions": questions}
return loaded_model.predict([input_data])
# Define scorers for evaluation including ground truth comparison
correctness_scorer = Correctness() # Compares against expected_facts
relevance_scorer = RelevanceToQuery() # Evaluates relevance of response to question
response_schema_scorer = Guidelines(
name="response_schema",
guidelines="The response must be structured JSON with an 'answer' field containing 'Yes' or 'No' and a 'chain_of_thought' field with clear reasoning. There also needs to be a 'question_text' field that contains the question that was asked. All these fields are part of the 'results' array field.",
) # Validates structured output format
# This creates an evaluation run using the MLflow-managed dataset
results = mlflow.genai.evaluate(
data=eval_dataset, # Use the MLflow-managed dataset
predict_fn=my_app,
scorers=[
correctness_scorer,
relevance_scorer,
response_schema_scorer,
],
)
# Access the run ID
print(f"✓ Evaluation completed")
print(f"Evaluation run ID: {results.run_id}")
# Display evaluation results summary
if hasattr(results, "metrics") and results.metrics:
print("\n📊 Evaluation Results Summary:")
for metric_name, metric_value in results.metrics.items():
if isinstance(metric_value, (int, float)):
print(f" • {metric_name}: {metric_value:.3f}")
else:
print(f" • {metric_name}: {metric_value}")
else:
print("✓ Evaluation completed - view detailed results in the evaluation experiment")
# Display link to the evaluation dataset
print(f"\n📊 Evaluation Dataset: {uc_schema}.{evaluation_dataset_table_name}")
print(f"🔗 View dataset in Unity Catalog Data Explorer")
部署到模型服务
将经过评估的代理程序部署到模型服务端点,并配置用于 MLflow 3 跟踪的必要环境变量。 这可确保自动跟踪所有生产请求并将其记录到指定的 MLflow 试验。
import mlflow
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import (
ServedEntityInput,
ServingModelWorkloadType,
EndpointCoreConfigInput,
)
from model import DocumentAnalyser, AgentInput, Question
workspace = WorkspaceClient()
# Use the model version from the logged model
model_version = model_info.registered_model_version
print(f"Using model version: {model_version}")
new_entity = ServedEntityInput(
entity_name=REGISTERED_MODEL_NAME,
entity_version=model_version,
name=f"{MODEL_NAME}-{model_version}",
workload_size="Small",
workload_type=ServingModelWorkloadType.CPU,
scale_to_zero_enabled=True,
environment_vars={
"DATABRICKS_CLIENT_ID": f"{{{{secrets/{SECRET_SCOPE}/client_id}}}}",
"DATABRICKS_CLIENT_SECRET": f"{{{{secrets/{SECRET_SCOPE}/client_secret}}}}",
"DATABRICKS_HOST": DATABRICKS_HOST,
"MLFLOW_TRACKING_URI": "databricks",
"MONITORING_EXPERIMENT_ID": current_experiment_id,
"MODEL_LOG_LEVEL": "INFO",
"LLM_MODEL": LLM_MODEL,
},
)
# Check if endpoint exists and create or update accordingly
try:
# Try to get the existing endpoint
existing_endpoint = workspace.serving_endpoints.get(ENDPOINT_NAME)
print(
f"Endpoint {ENDPOINT_NAME} exists, updating with model version {model_version}"
)
# Update existing endpoint with new model version
workspace.serving_endpoints.update_config(
name=ENDPOINT_NAME, served_entities=[new_entity]
)
print("Endpoint update initiated, waiting for completion...")
# Wait for update to complete
workspace.serving_endpoints.wait_get_serving_endpoint_not_updating(ENDPOINT_NAME)
print("Endpoint updated successfully and is ready")
except Exception as e:
# Endpoint doesn't exist, create it
print(f"Endpoint {ENDPOINT_NAME} doesn't exist, creating new endpoint...")
workspace.serving_endpoints.create(
name=ENDPOINT_NAME,
config=EndpointCoreConfigInput(name=ENDPOINT_NAME, served_entities=[new_entity]),
)
print("Endpoint creation initiated, waiting for completion...")
# Wait for creation to complete
workspace.serving_endpoints.wait_get_serving_endpoint_not_updating(ENDPOINT_NAME)
print("Endpoint created successfully and is ready")
# Final status check
endpoint_status = workspace.serving_endpoints.get(ENDPOINT_NAME)
print(f"Final endpoint status: {endpoint_status.state}")
print(
f"Endpoint URL: https://{DATABRICKS_HOST.replace('https://', '')}/serving-endpoints/{ENDPOINT_NAME}/invocations"
)
使用评分器来设置生产监控
使用 MLflow 3 记分器为生产流量配置自动质量评估。 评分器自动分析从生产请求记录的跟踪,以提供持续质量监视。
from mlflow.genai.scorers import (
RelevanceToQuery,
Guidelines,
ScorerSamplingConfig,
list_scorers,
get_scorer,
)
# Set the active experiment for scoring (use the current notebook's experiment)
print(f"Setting experiment to: {current_experiment_id}")
mlflow.set_experiment(experiment_id=current_experiment_id)
# Verify the experiment is set correctly
current_experiment = mlflow.get_experiment(current_experiment_id)
print(
f"Current experiment: {current_experiment.name} (ID: {current_experiment.experiment_id})"
)
# Setup scorers for production monitoring
print("Setting up production monitoring scorers...")
# Relevance scorer - always create new to avoid conflicts
relevance_scorer = RelevanceToQuery().register(name="financial_relevance_check")
relevance_scorer = relevance_scorer.start(
sampling_config=ScorerSamplingConfig(sample_rate=0.5)
)
print("✅ Created relevance scorer (50% sampling)")
# Guidelines scorer for response schema validation
response_schema_scorer = Guidelines(
name="response_schema",
guidelines="The response must be structured JSON with an 'answer' field containing 'Yes' or 'No' and a 'chain_of_thought' field with clear reasoning.",
).register(name="response_schema_check")
response_schema_scorer = response_schema_scorer.start(
sampling_config=ScorerSamplingConfig(sample_rate=0.4)
)
print("✅ Created response schema scorer (40% sampling)")
# List all active scorers
print(f"\nActive Scorers in Experiment {current_experiment_id}:")
scorers = list_scorers()
for scorer in scorers:
print(f"• {scorer.name}: {scorer.sample_rate*100}% sampling")
测试已部署的代理
使用示例输入测试已部署的代理。 每个请求都会自动生成捕获完整请求流的 MLflow 3 跟踪,生产评分器将评估这些跟踪,以便进行质量监视。
from databricks.sdk import WorkspaceClient
# Test the non-conversational agent endpoint using Databricks SDK
workspace = WorkspaceClient()
# Example payload with structured input for the non-conversational agent
test_input = {
"inputs": [
{
"document_text": "Total assets: $2,300,000. Total liabilities: $1,200,000. Shareholder's equity: $1,100,000. Net income for the period was $450,000. Revenues: $1,700,000. Expenses: $1,250,000. Net cash provided by operating activities: $80,000. Cash flows from investing activities: -$20,000",
"questions": [
{"text": "Do the documents contain a balance sheet?"},
{"text": "Do the documents contain an income statement?"},
{"text": "Do the documents contain a cash flow statement?"},
],
}
]
}
# Query the serving endpoint using the workspace client
response = workspace.serving_endpoints.query(
name=ENDPOINT_NAME, inputs=test_input["inputs"]
)
print("Endpoint Response:")
print(response.as_dict())
# Generate MLflow experiment URL
experiment_url = f"{DATABRICKS_HOST}/ml/experiments/{current_experiment_id}"
print(f"\nMLflow Experiment URL: {experiment_url}")
记录用户反馈
即使对于非聊天代理,收集用户反馈对于持续改进至关重要。 面向用户的前端应用程序可以允许用户接受或拒绝代理提供的单个答案。 然后,可以使用响应中包含的trace_id和span_id将该反馈记录到 MLflow。
非会话代理的常见反馈方案:
- 准确性反馈:“这是是/否答案正确吗?
- 相关性反馈:“推理是否适合该问题?”
- 质量反馈:“支持证据是否足够?
- 错误报告:“代理是否误解了文档内容?
以下单元格演示如何使用响应中返回的 span_id 将反馈记录到单个答案中。
import mlflow
from mlflow.entities import AssessmentSource
# Get the response from the previous test (extract span_id from first result)
# In a real application, this would come from the API response
response_dict = response.as_dict()
first_prediction = response_dict["predictions"][0]
first_result = first_prediction["results"][0]
# Assert we have the required IDs for feedback logging
assert (
first_result.get("span_id") is not None
), "span_id is required for feedback logging"
assert (
first_prediction.get("trace_id") is not None
), "trace_id is required for feedback logging"
span_id = first_result["span_id"]
trace_id = first_prediction["trace_id"]
question_text = first_result["question_text"]
answer = first_result["answer"]
print(f"Logging feedback for question: '{question_text}'")
print(f"Agent answer: {answer}")
print(f"Span ID: {span_id}")
print(f"Trace ID: {trace_id}")
try:
# Example: User provides positive feedback on this specific answer
mlflow.log_feedback(
trace_id=trace_id,
span_id=span_id,
name="user_feedback",
value=True, # True for positive, False for negative
source=AssessmentSource(source_type="HUMAN"),
rationale="Answer was accurate and well-reasoned",
)
print("✅ Feedback logged successfully!")
except Exception as e:
print(f"Note: Could not log feedback in this environment: {e}")
后续步骤
- 了解如何 将工具添加到代理 以扩展功能
- 查看 MLflow 3 跟踪文档 ,了解高级可观测性功能
- 生产监控文档