在生产环境中监控 GenAI

重要

此功能在 Beta 版中。

使用 Azure Databricks 的 GenAI 生产监控,您可以在生产 GenAI 应用的跟踪上自动运行 MLflow 3 评分器,以持续监控质量。

可以安排评分系统自动评估生产流量的样本。 评分器的评估结果会自动作为反馈附加到被评估的记录上。

生产监视包括以下内容:

  • 使用内置或自定义评分器自动进行质量评估。
  • 可配置的采样率,以便控制覆盖率与计算成本之间的权衡。
  • 在开发和生产中使用相同的评分器来确保一致的评估。
  • 持续质量评估,后台运行监控。

注释

MLflow 3 生产监视与从 MLflow 2 记录的跟踪兼容。

有关旧版生产监视的信息,请参阅生产监视 API 参考(旧版)。

先决条件

在设置质量监视之前,请确保具备:

  1. MLflow 试验:记录跟踪的 MLflow 试验。 如果未指定试验,则使用活动试验。
  2. 检测的生产应用程序:GenAI 应用必须使用 MLflow 跟踪记录跟踪。 请参阅 生产跟踪指南
  3. 定义的记分器:测试了使用应用程序的跟踪格式的 记分器

小窍门

如果您在开发过程中将生产应用用作 predict_fn 中的 mlflow.genai.evaluate(),那么您的记分器可能已经兼容。

生产监视入门

本部分包含演示如何创建不同类型的记分器的示例代码。

有关记分器的详细信息,请参阅以下内容:

注释

在任何给定时间,最多 20 名评分者都可以与持续质量监视的实验相关联。

通过 UI 创建和安排 LLM 法官

通过 MLflow UI 可以轻松创建和测试 LLM 法官评分器。

已注册的评分员

  1. 导航到 MLflow 试验 UI 中的“评分者”选项卡
  2. 单击 “新建记分器”
  3. LLM 模板 下拉列表中选择内置的 LLM 评判器
  4. (可选)单击运行记分器在跟踪子集上执行操作。
  5. 单击“ 创建评分器”

使用内置的 LLM 评估器

MLflow 提供了多个现成可用的内置 LLM 评估器,你可以用于监控。

from mlflow.genai.scorers import Safety, ScorerSamplingConfig

# Register the scorer with a name and start monitoring
safety_judge = Safety().register(name="my_safety_judge")  # name must be unique to experiment
safety_judge = safety_judge.start(sampling_config=ScorerSamplingConfig(sample_rate=0.7))

默认情况下,每个法官都使用 Databricks 托管的 LLM 来执行 GenAI 质量评估。 可以使用记分器定义中的参数,将评判模型更改为使用 model 的 Databricks 模型服务端点。 必须以格式 databricks:/<databricks-serving-endpoint-name>指定模型。

safety_judge = Safety(model="databricks:/databricks-gpt-oss-20b").register(name="my_custom_safety_judge")

使用指南 LLM 法官

LLM 法官 可以使用通过/不通过的自然语言标准来评估输入和输出。

from mlflow.genai.scorers import Guidelines

# Create and register the guidelines scorer
english_judge = Guidelines(
  name="english",
  guidelines=["The response must be in English"]
).register(name="is_english")  # name must be unique to experiment

# Start monitoring with the specified sample rate
english_judge = english_judge.start(sampling_config=ScorerSamplingConfig(sample_rate=0.7))

与内置法官一样,可以更改判断模型以改用 Databricks 模型服务终结点。

english_judge = Guidelines(
  name="english",
  guidelines=["The response must be in English"],
  model="databricks:/databricks-gpt-oss-20b",
).register(name="custom_is_english")

在自定义提示中使用 LLM 法官

对于比准则评委更大的灵活性,可以将 LLM 法官与自定义提示一起使用 ,从而允许使用可自定义的选择类别(例如优秀/良好/差)和可选的数值评分进行多级质量评估。

from mlflow.genai.scorers import scorer, ScorerSamplingConfig


@scorer
def formality(inputs, outputs, trace):
    # Must be imported inline within the scorer function body
    from mlflow.genai.judges import custom_prompt_judge
    from mlflow.entities.assessment import DEFAULT_FEEDBACK_NAME

    formality_prompt = """
    You will look at the response and determine the formality of the response.

    <request>{{request}}</request>
    <response>{{response}}</response>

    You must choose one of the following categories.

    [[formal]]: The response is very formal.
    [[semi_formal]]: The response is somewhat formal. The response is somewhat formal if the response mentions friendship, etc.
    [[not_formal]]: The response is not formal.
    """

    my_prompt_judge = custom_prompt_judge(
        name="formality",
        prompt_template=formality_prompt,
        numeric_values={
            "formal": 1,
            "semi_formal": 0.5,
            "not_formal": 0,
        },
        model="databricks:/databricks-gpt-oss-20b",  # optional
    )

    result = my_prompt_judge(request=inputs, response=inputs)
    if hasattr(result, "name"):
        result.name = DEFAULT_FEEDBACK_NAME
    return result

# Register the custom judge and start monitoring
formality_judge = formality.register(name="my_formality_judge")  # name must be unique to experiment
formality_judge = formality_judge.start(sampling_config=ScorerSamplingConfig(sample_rate=0.1))

使用自定义记分器函数

为了获得最大的灵活性,包括放弃基于 LLM 的评分的选项,可以定义和使用 自定义记分器函数 进行监视。

定义自定义记分器时,请勿使用需要在函数签名中导入的类型提示。 如果记分器函数正文使用需要导入的包,请将这些包内联导入函数中,以便进行适当的序列化。

某些包默认可用,无需内联导入。 这些包包括databricks-agentsmlflow-skinnyopenai无服务器环境版本 2 中包含的所有包。

from mlflow.genai.scorers import scorer, ScorerSamplingConfig


# Custom metric: Check if response mentions Databricks
@scorer
def mentions_databricks(outputs):
    """Check if the response mentions Databricks"""
    return "databricks" in str(outputs.get("response", "")).lower()

# Custom metric: Response length check
@scorer(aggregations=["mean", "min", "max"])
def response_length(outputs):
    """Measure response length in characters"""
    return len(str(outputs.get("response", "")))

# Custom metric with multiple inputs
@scorer
def response_relevance_score(inputs, outputs):
    """Score relevance based on keyword matching"""
    query = str(inputs.get("query", "")).lower()
    response = str(outputs.get("response", "")).lower()

    # Simple keyword matching (replace with your logic)
    query_words = set(query.split())
    response_words = set(response.split())

    if not query_words:
        return 0.0

    overlap = len(query_words & response_words)
    return overlap / len(query_words)

# Register and start monitoring custom scorers
databricks_scorer = mentions_databricks.register(name="databricks_mentions")
databricks_scorer = databricks_scorer.start(sampling_config=ScorerSamplingConfig(sample_rate=0.5))

length_scorer = response_length.register(name="response_length")
length_scorer = length_scorer.start(sampling_config=ScorerSamplingConfig(sample_rate=1.0))

relevance_scorer = response_relevance_score.register(name="response_relevance_score")  # name must be unique to experiment
relevance_scorer = relevance_scorer.start(sampling_config=ScorerSamplingConfig(sample_rate=1.0))

多个记分器配置

若要进行全面的监视设置,可以单独注册和启动多个评分器。

from mlflow.genai.scorers import Safety, Guidelines, ScorerSamplingConfig, list_scorers

# # Register multiple scorers for comprehensive monitoring
safety_judge = Safety().register(name="safety") # name must be unique within an MLflow experiment
safety_judge = safety_judge.start(
    sampling_config=ScorerSamplingConfig(sample_rate=1.0), # Check all traces
)

guidelines_judge = Guidelines(
    name="english",
    guidelines=["Response must be in English"]
).register(name="english_check")
guidelines_judge = guidelines_judge.start(
    sampling_config=ScorerSamplingConfig(sample_rate=0.5), # Sample 50%
)

# List and manage all scorers
all_scorers = list_scorers()
for scorer in all_scorers:
    if scorer.sample_rate > 0:
        print(f"{scorer.name} is active")
    else:
        print(f"{scorer.name} is stopped")

管理计划的记分器

记分器生命周期以 MLflow 试验为中心。 记分器生命周期状态如下:

  1. 未注册:已定义 Scorer 函数,但服务器未知。
  2. 已注册:计分器已注册到当前活动的 MLflow 实验。 若要注册记分器,请使用 .register()
  3. 活动:评分器以采样率 > 0 运行。 若要启动记分器,请使用 .start()
  4. 已停止:记分器已注册但未运行(采样率 = 0)。 若要停止记分器,请使用 .stop()
  5. 已删除:记分器已从服务器中删除,不再与其 MLflow 试验相关联。 若要删除记分器,请使用 delete_scorer()

所有生命周期作业都是 不可变的。 这意味着每个操作都不会修改原始评分器。 而是返回一个新的记分器实例。

基本记分器生命周期

from mlflow.genai.scorers import Safety, scorer, ScorerSamplingConfig

# Built-in scorer lifecycle
safety_judge = Safety().register(name="safety_check")
safety_judge = safety_judge.start(
    sampling_config=ScorerSamplingConfig(sample_rate=1.0),
)
safety_judge = safety_judge.update(
    sampling_config=ScorerSamplingConfig(sample_rate=0.8),
)
safety_judge = safety_judge.stop()
delete_scorer(name="safety_check")

# Custom scorer lifecycle
@scorer
def response_length(outputs):
    return len(str(outputs.get("response", "")))

length_scorer = response_length.register(name="length_check")
length_scorer = length_scorer.start(
    sampling_config=ScorerSamplingConfig(sample_rate=0.5),
)

列出当前评分者

若要查看试验的所有已注册记分器,请执行以下作:

from mlflow.genai.scorers import list_scorers

# List all registered scorers
scorers = list_scorers()
for scorer in scorers:
    print(f"Name: {scorer._server_name}")
    print(f"Sample rate: {scorer.sample_rate}")
    print(f"Filter: {scorer.filter_string}")
    print("---")

更新记分器

修改现有记分器配置:

from mlflow.genai.scorers import get_scorer

# Get existing scorer and update its configuration (immutable operation)
safety_judge = get_scorer(name="safety_monitor")
updated_judge = safety_judge.update(sampling_config=ScorerSamplingConfig(sample_rate=0.8))  # Increased from 0.5

# Note: The original scorer remains unchanged; update() returns a new scorer instance
print(f"Original sample rate: {safety_judge.sample_rate}")  # Original rate
print(f"Updated sample rate: {updated_judge.sample_rate}")   # New rate

停止和删除记分器

完全停止监视或删除记分器:

from mlflow.genai.scorers import get_scorer, delete_scorer

# Get existing scorer
databricks_scorer = get_scorer(name="databricks_mentions")

# Stop monitoring (sets sample_rate to 0, keeps scorer registered)
stopped_scorer = databricks_scorer.stop()
print(f"Sample rate after stop: {stopped_scorer.sample_rate}")  # 0

# Remove scorer entirely from the server
delete_scorer(name=databricks_scorer.name)

# Or restart monitoring from a stopped scorer
restarted_scorer = stopped_scorer.start(sampling_config=ScorerSamplingConfig(sample_rate=0.5))

不可变更新

评分员(包括 LLM 法官)是不可变的对象:更新记分器不会修改原始记分器,而是返回评分员的更新副本。 这种不可变性有助于确保用于生产的记分器不会被意外修改。 下面的代码片段演示了不可变更新的工作原理。

# Demonstrate immutability
original_judge = Safety().register(name="safety")
original_judge = original_judge.start(
   sampling_config=ScorerSamplingConfig(sample_rate=0.3),
)

# Update returns new instance
updated_judge = original_judge.update(
    sampling_config=ScorerSamplingConfig(sample_rate=0.8),
)

# Original remains unchanged
print(f"Original: {original_judge.sample_rate}")  # 0.3
print(f"Updated: {updated_judge.sample_rate}")    # 0.8

评估历史跟踪(指标回填)

可以追溯性地将新的或更新的指标应用于历史跟踪。

使用当前采样率的基本指标回填

from databricks.agents.scorers import backfill_scorers

safety_judge = Safety()
safety_judge.register(name="safety_check")
safety_judge.start(sampling_config=ScorerSamplingConfig(sample_rate=0.5))

#custom scorer
@scorer(aggregations=["mean", "min", "max"])
def response_length(outputs):
    """Measure response length in characters"""
    return len(outputs)

response_length.register(name="response_length")
response_length.start(sampling_config=ScorerSamplingConfig(sample_rate=0.5))

# Use existing sample rates for specified scorers
job_id = backfill_scorers(
    scorers=["safety_check", "response_length"]
)

使用自定义采样率和时间范围的指标回填

from databricks.agents.scorers import backfill_scorers, BackfillScorerConfig
from datetime import datetime
from mlflow.genai.scorers import Safety, Correctness

safety_judge = Safety()
safety_judge.register(name="safety_check")
safety_judge.start(sampling_config=ScorerSamplingConfig(sample_rate=0.5))

#custom scorer
@scorer(aggregations=["mean", "min", "max"])
def response_length(outputs):
    """Measure response length in characters"""
    return len(outputs)

response_length.register(name="response_length")
response_length.start(sampling_config=ScorerSamplingConfig(sample_rate=0.5))

# Define custom sample rates for backfill
custom_scorers = [
    BackfillScorerConfig(scorer=safety_judge, sample_rate=0.8),
    BackfillScorerConfig(scorer=response_length, sample_rate=0.9)
]

job_id = backfill_scorers(
    experiment_id=YOUR_EXPERIMENT_ID,
    scorers=custom_scorers,
    start_time=datetime(2024, 6, 1),
    end_time=datetime(2024, 6, 30)
)

最近的数据回填

from datetime import datetime, timedelta

# Backfill last week's data with higher sample rates
one_week_ago = datetime.now() - timedelta(days=7)

job_id = backfill_scorers(
    scorers=[
        BackfillScorerConfig(scorer=safety_judge, sample_rate=0.8),
        BackfillScorerConfig(scorer=response_length, sample_rate=0.9)
    ],
    start_time=one_week_ago
)

查看结果

计划记分器后,允许 15-20 分钟进行初始处理。 然后:

  1. 导航到 MLflow 试验。
  2. 打开 “跟踪 ”选项卡,查看附加到跟踪的评估。
  3. 使用监视仪表板跟踪质量趋势。

最佳做法

记分器状态管理

  • 在进行操作之前,使用 sample_rate 属性检查记分器状态。
  • 使用不可变模式。 将结果.start().update().stop()分配给变量。
  • 了解记分器生命周期。 .stop() 保留注册, delete_scorer() 完全删除记分器。

指标回填

  • 从小开始。 从较小的时间范围开始,以估计作业持续时间和资源使用情况。
  • 使用适当的采样率。 考虑使用高采样率的成本和时间影响。

采样策略

  • 对于安全性和安保检查等关键评分标准,请使用 sample_rate=1.0

  • 对于昂贵的评分系统,例如复杂的 LLM 模型,应使用较低的采样率(0.05-0.2)。

  • 若要在开发期间进行迭代改进,请使用中等速率(0.3-0.5)。

  • 平衡覆盖范围与成本,如以下示例所示:

    # High-priority scorers: higher sampling
    safety_judge = Safety().register(name="safety")
    safety_judge = safety_judge.start(sampling_config=ScorerSamplingConfig(sample_rate=1.0))  # 100% coverage for critical safety
    
    # Expensive scorers: lower sampling
    complex_scorer = ComplexCustomScorer().register(name="complex_analysis")
    complex_scorer = complex_scorer.start(sampling_config=ScorerSamplingConfig(sample_rate=0.05))  # 5% for expensive operations
    

自定义记分器设计

使自定义评分器保持独立,如以下示例所示:

@scorer
def well_designed_scorer(inputs, outputs):
    # ✅ All imports inside the function
    import re
    import json

    # ✅ Handle missing data gracefully
    response = outputs.get("response", "")
    if not response:
        return 0.0

    # ✅ Return consistent types
    return float(len(response) > 100)

故障排除

记分器未运行

如果未执行评分程序,请检查以下内容:

  1. 检查试验:确保将跟踪记录到试验,而不是记录到单个运行。
  2. 采样率:采样率较低,查看结果可能需要一些时间。

序列化问题

创建自定义评分器时,在函数定义中包含导入。

# ❌ Avoid external dependencies
import external_library  # Outside function

@scorer
def bad_scorer(outputs):
    return external_library.process(outputs)

# ✅ Include imports in the function definition
@scorer
def good_scorer(outputs):
    import json  # Inside function
    return len(json.dumps(outputs))

# ❌ Avoid using type hints in scorer function signature that requires imports
from typing import List

@scorer
def scorer_with_bad_types(outputs: List[str]):
    return False

指标回填问题

“试验中未找到计划的记分器 'X' ”

  • 确保记分器名称与实验中已注册的记分器匹配
  • 使用 list_scorers 方法检查可用的记分器

存档跟踪

可以将跟踪及其关联的评估保存到 Unity 目录 Delta 表,以便进行长期存储和高级分析。 这对于生成自定义仪表板、对跟踪数据执行深入分析以及维护应用程序行为的持久记录非常有用。

注释

必须具有写入指定 Unity 目录 Delta 表所需的权限。 如果目标表尚不存在,将创建该表。

如果该表已存在,则跟踪将追加到该表。

启用存档跟踪

若要开始存档试验的跟踪,请使用函数 enable_databricks_trace_archival 。 必须指定目标 Delta 表的完整名称,包括目录和架构。 如果未提供 experiment_id存档跟踪,则会为当前活动试验启用存档跟踪。

from mlflow.tracing.archival import enable_databricks_trace_archival

# Archive traces from a specific experiment to a Unity Catalog Delta table
enable_databricks_trace_archival(
    delta_table_fullname="my_catalog.my_schema.archived_traces",
    experiment_id="YOUR_EXPERIMENT_ID",
)

禁用存档跟踪

可以使用函数 disable_databricks_trace_archival 随时停止存档试验的跟踪。

from mlflow.tracing.archival import disable_databricks_trace_archival

# Stop archiving traces for the specified experiment
disable_databricks_trace_archival(experiment_id="YOUR_EXPERIMENT_ID")

后续步骤

参考指南

浏览本指南中提到的概念和功能的详细文档。