将自定义指标与数据分析配合使用

本页介绍如何在数据分析中创建自定义指标。 除了自动计算的分析和偏移统计信息外,还可以创建自定义指标。 例如,你可能想要跟踪反映业务逻辑某些方面的加权平均值,或使用自定义模型质量分数。 还可以创建自定义偏移指标,用于跟踪对主表中值的更改(与基线或上一个时间范围相比)。

有关如何使用 MonitorMetric API 的更多详细信息,请参阅 API 参考

自定义指标的类型

数据分析包括以下类型的自定义指标:

  • 聚合指标,这些指标基于主表中的列计算。 聚合度量存储在概要度量表中。
  • 派生指标,这些指标是根据以前计算的聚合指标计算的,并且不直接使用主表中的数据。 派生指标存储在个人资料指标表中。
  • 偏移指标,用于比较以前从两个不同的时间窗口计算的聚合或派生指标,或主表和基线表之间。 偏移指标存储在偏移指标表中。

尽可能使用派生指标和偏移指标可以最大程度地减少对完整主表的重新计算。 仅聚合指标访问主表中的数据。 然后,可以直接从聚合指标值计算派生指标和偏移指标。

自定义指标参数

若要定义自定义指标,请为 SQL 列表达式创建 Jinja 模板 。 本节中的表描述了定义指标的参数以及 Jinja 模板中使用的参数。

参数 Description
type MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATEMonitorMetricType.CUSTOM_METRIC_TYPE_DERIVEDMonitorMetricType.CUSTOM_METRIC_TYPE_DRIFT 之一。
name 指标表中自定义指标的列名。
input_columns 用于计算指标的输入表中的列名列表。 若要指示计算中使用了多个列,请使用 :table。 请参阅本文中的示例。
definition 用于指定如何计算指标的 SQL 表达式的 Jinja 模板。 请参阅 “创建定义”。
output_data_type 以 JSON 字符串格式输出的指标的 Spark 数据类型。

创建 definition

definition 参数必须是采用 Jinja 模板形式的单个字符串表达式。 它不能包含联接或子查询。

下表列出了可用于创建 SQL Jinja 模板的参数,用于指定如何计算指标。

参数 Description
{{input_column}} 用于计算自定义指标的列。
{{prediction_col}} 包含机器学习 (ML) 模型预测的列。 与InferenceLog分析一起使用。
{{label_col}} 包含 ML 模型真实值标签的列。 与InferenceLog分析一起使用。
{{current_df}} 对于与上一个时间窗口相比的偏移。 上一个时间窗口中的数据。
{{base_df}} 相对于基线表的偏移。 基线数据。

聚合指标示例

下面的示例计算列中值的平方的平均值,并应用于列 f1f2。 输出将保存为概要指标表中的新列,并显示在与列f1f2对应的分析行中。 适用的列名将替换为 Jinja 参数 {{input_column}}

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
    name="squared_avg",
    input_columns=["f1", "f2"],
    definition="avg(`{{input_column}}`*`{{input_column}}`)",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

以下代码定义一个自定义指标,用于计算列 f1f2列之间的差异的平均值。 在此示例中,参数 `[":table"]` 使用 `input_columns` 来表示在计算中使用了表的多个列。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
    name="avg_diff_f1_f2",
    input_columns=[":table"],
    definition="avg(f1 - f2)",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

此示例计算加权模型质量分数。 对于critical列为True的观察,当该行的预测值与实际值不匹配时,将给予更重的惩罚。 由于它在原始列(prediction 以及 label)上定义,因此它被定义为聚合指标。 该 :table 列指示从多个列计算此指标。 Jinja 参数 {{prediction_col}}{{label_col}} 被替换为配置文件中预测标签列和真实值标签列的名称。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
    name="weighted_error",
    input_columns=[":table"],
    definition="""avg(CASE
      WHEN {{prediction_col}} = {{label_col}} THEN 0
      WHEN {{prediction_col}} != {{label_col}} AND critical=TRUE THEN 2
      ELSE 1 END)""",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

派生指标示例

以下代码定义一个自定义指标,用于计算本部分前面定义的指标的 squared_avg 平方根。 由于这是派生指标,因此它不引用主表数据,而是根据 squared_avg 聚合指标进行定义。 输出将保存为配置文件指标表中的新列。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_DERIVED,
    name="root_mean_square",
    input_columns=["f1", "f2"],
    definition="sqrt(squared_avg)",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

偏移指标示例

以下代码定义一个偏移指标,用于跟踪本部分前面定义的指标的变化 weighted_error 。 参数{{current_df}}{{base_df}}允许指标引用weighted_error当前窗口和比较窗口中的值。 比较窗口可以是基线数据,也可以是上一时间窗口中的数据。 偏移指标保存在偏移指标表中。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_DRIFT,
    name="error_rate_delta",
    input_columns=[":table"],
    definition="{{current_df}}.weighted_error - {{base_df}}.weighted_error",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)