声明式特征工程和管理管道

重要

此功能为 Beta 版,可在以下区域使用: us-east-1us-west-2

使用特征存储声明性 API,可以从数据源定义和计算时间窗口聚合特征。 本指南介绍以下工作流:

  • 功能开发 工作流
    • 用于 create_feature 定义可在模型训练和服务工作流中使用的 Unity Catalog 功能对象。
  • 模型训练 工作流
    • 使用 create_training_set 来计算机器学习中的时间点聚合特征。 这将返回一个训练集对象,该对象可以返回一个 Spark 数据帧,该帧的计算特征已扩充到观察数据集,用于训练模型。
    • 使用此训练集调用 log_model,以将此模型保存在 Unity Catalog 中,并在特征对象和模型对象之间创建关联。
    • score_batch 使用 Unity 目录世系使用功能定义代码执行时间点正确的特征聚合,这些聚合扩充到推理数据集进行模型评分。
  • 特征具体化和服务 工作流
    • 在定义了create_feature功能或使用get_feature检索到该功能之后,可以使用materialize_features将该功能或功能集物化到离线存储,以便高效重复使用,或者将功能物化到在线商店以进行在线服务。
    • 使用 create_training_set 和物化视图准备一个脱机批量训练数据集。

有关log_modelscore_batch的详细文档,请参阅使用功能训练模型

要求

  • 运行 Databricks Runtime 17.0 ML 或更高版本 的经典计算 群集。

  • 必须安装自定义 Python 包。 每次运行笔记本时,必须执行以下代码行:

    %pip install databricks-feature-engineering>=0.14.0
    dbutils.library.restartPython()
    

快速入门示例

from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import DeltaTableSource, Sum, Avg, ContinuousWindow, OfflineStoreConfig
from datetime import timedelta

CATALOG_NAME = "main"
SCHEMA_NAME = "feature_store"
TABLE_NAME = "transactions"

# 1. Create data source
source = DeltaTableSource(
    catalog_name=CATALOG_NAME,
    schema_name=SCHEMA_NAME,
    table_name=TABLE_NAME,
    entity_columns=["user_id"],
    timeseries_column="transaction_time"
)

# 2. Define features
fe = FeatureEngineeringClient()
features = [
    fe.create_feature(
        catalog_name=CATALOG_NAME,
        schema_name=SCHEMA_NAME,
        name="avg_transaction_30d",
        source=source,
        inputs=["amount"],
        function=Avg(),
        time_window=ContinuousWindow(window_duration=timedelta(days=30))
    ),
    fe.create_feature(
        catalog_name=CATALOG_NAME,
        schema_name=SCHEMA_NAME,
        source=source,
        inputs=["amount"],
        function=Sum(),
        time_window=ContinuousWindow(window_duration=timedelta(days=7))
        # name auto-generated: "amount_sum_continuous_7d"
    ),
]

# 3. Create training set using declarative features

`labeled_df` should have columns "user_id", "transaction_time", and "target". It can have other context features specific to the individual observations.
training_set = fe.create_training_set(
    df=labeled_df,
    features=features,
    label="target",
)
training_set.load_df().display()  # action: joins labeled_df with computed feature

# 4. Train model
with mlflow.start_run():
    training_df = training_set.load_df()

    # training code

    fe.log_model(
        model=model,
        artifact_path="recommendation_model",
        flavor=mlflow.sklearn,
        training_set=training_set,
        registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.recommendation_model",
    )

# 5. (Optional) Materialize features for serving
fe.materialize_features(
    features=features,
    offline_config=OfflineStoreConfig(
        catalog_name=CATALOG_NAME,
        schema_name=SCHEMA_NAME,
        table_name_prefix="customer_features"
    ),
    pipeline_state="ACTIVE",
    cron_schedule="0 0 * * * ?"  # Hourly
)

注释

具体化功能后,可以使用 CPU 模型服务为模型提供服务。 有关在线服务的详细信息,请参阅 Materialize 和 Serve 声明性特征

数据源

DeltaTableSource

注释

允许的 timeseries_column数据类型:TimestampType、DateType。 其他整数数据类型可以正常工作,但会导致时间窗口聚合的精度丢失。

以下代码演示了使用 Unity 目录中的 main.analytics.user_events 表的示例:

from databricks.feature_engineering.entities import DeltaTableSource

source = DeltaTableSource(
    catalog_name="main",               # Catalog name
    schema_name="analytics",           # Schema name
    table_name="user_events",          # Table name
    entity_columns=["user_id"],        # Join keys, used to look up features for an entity
    timeseries_column="event_time"     # Timestamp for time windows
)

声明性特性 API

create_feature() API(应用程序接口)

FeatureEngineeringClient.create_feature() 提供全面的验证并确保适当的功能构造:

FeatureEngineeringClient.create_feature(
    source: DataSource,                   # Required: DeltaTableSource
    inputs: List[str],                    # Required: List of column names from the source
    function: Union[Function, str],       # Required: Aggregation function (Sum, Avg, Count, etc.)
    time_window: TimeWindow,              # Required: TimeWindow for aggregation
    catalog_name: str,                    # Required: The catalog name for the feature
    schema_name: str,                     # Required: The schema name for the feature
    name: Optional[str],                  # Optional: Feature name (auto-generated if omitted)
    description: Optional[str],           # Optional: Feature description
    filter_condition: Optional[str],      # Optional: SQL WHERE clause to filter source data
) -> Feature

参数:

  • source:特征计算中使用的数据源
  • inputs:源中要用作聚合输入的列名列表
  • function:聚合函数(函数实例或字符串名称)。 请参阅下面的受支持函数列表。
  • time_window:聚合的时间范围(TimeWindow 实例或包含“duration”和可选“offset”的字典)
  • catalog_name:该功能的目录名称
  • schema_name:功能的架构名称
  • name:可选功能名称(如果省略则自动生成)
  • description:功能的可选说明
  • filter_condition:可选 SQL WHERE 子句,用于在聚合之前筛选源数据。 示例: "status = 'completed'""transaction" = "Credit" AND "amount > 100"

返回: 已验证的功能实例

抛出: 如果任何验证失败,则引发 ValueError

自动生成的名称

当省略name时,名称遵循以下模式:{column}_{function}_{window} 例如:

  • price_avg_continuous_1h (1小时平均价格)
  • transaction_count_continuous_30d_1d (30 天内的事务计数,与事件时间戳相差 1 天)

支持的函数

注释

所有函数均在聚合时间范围内应用,如下面的 时间窗口 部分所述。

功能 速记 Description 示例用例
Sum() "sum" 数值总和 每用户每日应用使用情况(以分钟为单位)
Avg() "avg""mean" 值的平均值 平均交易金额
Count() "count" 记录数 每个用户的登录次数
Min() "min" 最小值 可穿戴设备记录的最低心率
Max() "max" 最大值 每个会话的最大购物车数量
StddevPop() "stddev_pop" 总体标准偏差 所有客户的每日交易金额可变性
StddevSamp() "stddev_samp" 样本标准偏差 广告市场点击率的可变性
VarPop() "var_pop" 总体方差 工厂中 IoT 设备的传感器读数分布
VarSamp() "var_samp" 样本方差 电影收视率在采样组的分布
ApproxCountDistinct(relativeSD=0.05) "approx_count_distinct"* 近似唯一计数 购买项目的去重计数
ApproxPercentile(percentile=0.95,accuracy=100) N/A 近似百分位数 p95 响应延迟
First() "first" 第一个值 第一个登录时间戳
Last() "last" 最后一个值 最新购买金额

*使用字符串速记时,具有参数的函数使用默认值。

以下示例显示了在同一数据源上定义的窗口聚合功能。

from databricks.feature_engineering.entities import Sum, Avg, Count, Max, ApproxCountDistinct

fe = FeatureEngineeringClient()
sum_feature = fe.create_feature(source=source, inputs=["amount"], function=Sum(), ...)
avg_feature = fe.create_feature(source=source, inputs=["amount"], function=Avg(), ...)
distinct_count = fe.create_feature(
    source=source,
    inputs=["product_id"],
    function=ApproxCountDistinct(relativeSD=0.01),
    ...
)

具有筛选条件的功能

声明性功能 API 还支持应用 SQL 筛选器,该筛选器在聚合中作为 WHERE 子句应用。 使用包含特征计算所需的超集数据的大型源表时,筛选器非常有用,并最大程度地减少在这些表之上创建单独的视图的需求。

from databricks.feature_engineering.entities import Sum, Count, ContinuousWindow
from datetime import timedelta

# Only aggregate high-value transactions
high_value_sales = fe.create_feature(
    catalog_name="main",
    schema_name="ecommerce",
    source=transactions,
    inputs=["amount"],
    function=Sum(),
    time_window=ContinuousWindow(window_duration=timedelta(days=30)),
    filter_condition="amount > 100"  # Only transactions over $100
)

# Multiple conditions using SQL syntax
completed_orders = fe.create_feature(
    catalog_name="main",
    schema_name="ecommerce",
    source=orders,
    inputs=["order_id"],
    function=Count(),
    time_window=ContinuousWindow(window_duration=timedelta(days=7)),
    filter_condition="status = 'completed' AND payment_method = 'credit_card'"
)

时间范围

特征工程声明式 API 支持三种不同的窗口类型来控制基于时间窗口的聚合回溯行为:连续、跳动和滑动。

  • 连续窗口从事件时间回溯。 显式定义了持续时间和偏移。
  • 翻转窗口是固定的、不重叠的时间窗口。 每个数据点只属于一个窗口。
  • 滑动窗口是重叠的、连续滚动的时间窗口,具有可配置的滑动间隔。

下图显示了它们的工作原理。

连续窗口、滚动窗口 和滑动回溯窗口。

连续窗口

用于流数据的连续窗口是最新的并提供实时聚合。 在流式处理管道中,仅当固定长度窗口的内容发生更改(例如事件进入或离开时)时,连续窗口才会发出新行。 在训练管道中使用连续窗口功能时,将使用紧邻特定事件的时间戳之前的固定长度窗口持续时间对源数据执行准确的时间点特征计算。 这有助于防止联机与脱机数据不一致或数据泄露。 在时间 T 时,以 [T - 持续时间, T) 区间的聚合事件为特征。

class ContinuousWindow(TimeWindow):
    window_duration: datetime.timedelta
    offset: Optional[datetime.timedelta] = None

下表列出了连续窗口的参数。 窗口开始和结束时间基于以下参数:

  • 开始时间: evaluation_time - window_duration + offset (含)
  • 结束时间: evaluation_time + offset (独占)
参数 限制条件
offset(可选) 必须≤ 0(将时间窗口从结束时间戳向后移动)。 用于 offset 考虑创建事件的时间与事件时间戳之间的任何系统延迟,以防止将来的事件泄漏到训练数据集中。 例如,如果事件创建的时间与这些事件最终插入到一个分配有时间戳的源表中的时间之间有一分钟的延迟,则偏移量将是timedelta(minutes=-1)
window_duration 必须为 > 0
from databricks.feature_engineering.entities import ContinuousWindow
from datetime import timedelta

# Look back 7 days from evaluation time
window = ContinuousWindow(window_duration=timedelta(days=7))

请使用以下代码定义一个带有偏移量的连续窗口。

# Look back 7 days, but end 1 day ago (exclude most recent day)
window = ContinuousWindow(
    window_duration=timedelta(days=7),
    offset=timedelta(days=-1)
)

连续窗口示例

  • window_duration=timedelta(days=7), offset=timedelta(days=0):这将创建一个以当前评估时间结束的 7 天回溯窗口。 对于第 7 天下午 2:00 的活动,这包括从第 0 天下午 2:00 到第 7 天下午 2:00 之前的所有活动。

  • window_duration=timedelta(hours=1), offset=timedelta(minutes=-30):这将在评估时间的30分钟前创建一个持续1小时的回溯窗口。 对于下午 3:00 的事件,这包括从下午 1:30 到(但不包括)下午 2:30 的所有事件。 这可用于考虑数据引入延迟。

翻转窗口

对于使用滚动窗口定义的特性,聚合是通过预先确定的固定长度窗口计算的,该窗口按滑动间隔推进,生成完全划分时间的非重叠窗口。 因此,源中的每个事件都只贡献一个窗口。 在时间 t 的功能会聚合从时间窗口结束于或早于(但不包括)t 的数据。 Windows 从 Unix 时代开始。

class TumblingWindow(TimeWindow):
    window_duration: datetime.timedelta

下表列出了翻转窗口的参数。

参数 限制条件
window_duration 必须为 > 0
from databricks.feature_engineering.entities import TumblingWindow
from datetime import timedelta

window = TumblingWindow(
    window_duration=timedelta(days=7)
)

滚动窗口示例

  • window_duration=timedelta(days=5):这会创建预先确定的固定长度窗口,每个窗口为 5 天。 示例:窗口 #1 跨越第 0 天到第 4 天、窗口 #2 跨越第 5 天到第 9 天、窗口 #3 跨越第 10 天到第 14 天等。 具体而言,Window #1 包括所有在第 0 天从00:00:00.00开始且早于第 5 天00:00:00.00时间戳的事件(不包括第 5 天的事件)。 每个事件只属于一个窗口。

滑动窗口

对于使用滑动窗口定义的功能,聚合是通过预先确定的固定长度窗口计算的,该窗口按幻灯片间隔前进,生成重叠窗口。 源中的每个事件都可以为多个窗口的功能聚合做出贡献。 特性在时间 t 将窗口结束时间点在 t(不包括 t 本身)之前的数据进行聚合。 Windows 从 Unix 时代开始。

class SlidingWindow(TimeWindow):
    window_duration: datetime.timedelta
    slide_duration: datetime.timedelta

下表列出了滑动窗口的参数。

参数 限制条件
window_duration 必须为 > 0
slide_duration 必须为 > 0 且 <window_duration
from databricks.feature_engineering.entities import SlidingWindow
from datetime import timedelta

window = SlidingWindow(
    window_duration=timedelta(days=7),
    slide_duration=timedelta(days=1)
)

滑动窗口示例

  • window_duration=timedelta(days=5), slide_duration=timedelta(days=1):这将创建重叠的 5 天窗口,每次向前推进 1 天。 示例:窗口 #1 跨越第 0 天到第 4 天,窗口 #2 跨越第 1 天到第 5 天,窗口 #3 跨越第 2 天到第 6 天,依依如此。 每个窗口包括从00:00:00.00 开始日到(但不包括)结束日00:00:00.00的事件。 由于窗口重叠,单个事件可以属于多个窗口(在此示例中,每个事件最多属于 5 个不同的窗口)。

API 方法

create_training_set()

将特征与用于 ML 训练的标记数据联接:

FeatureEngineeringClient.create_training_set(
    df: DataFrame,                                # DataFrame with training data
    features: Optional[List[Feature]],            # List of Feature objects
    label: Union[str, List[str], None],           # Label column name(s)
    exclude_columns: Optional[List[str]] = None,  # Optional: columns to exclude

    # API continues to support creating training set using materialized feature tables and functions
) -> TrainingSet

调用 TrainingSet.load_df 以获取原始训练数据,并将其与在特定时间点动态计算的特征联接。

df参数要求:

  • 必须包含功能数据源中的所有 entity_columns 内容
  • 必须包含来自功能数据源的 timeseries_column
  • 应包含标签列

时间点正确性: 特征只使用每行时间戳之前可用的源数据进行计算,以防止将来的数据泄漏到模型训练中。 计算利用 Spark 的开窗函数提高效率。

log_model()

在推理过程中,为了实现数据溯源和自动查找特征,记录一个带有特征元数据的模型。

FeatureEngineeringClient.log_model(
    model,                                    # Trained model object
    artifact_path: str,                       # Path to store model artifact
    flavor: ModuleType,                       # MLflow flavor module (e.g., mlflow.sklearn)
    training_set: TrainingSet,                # TrainingSet used for training
    registered_model_name: Optional[str],     # Optional: register model in Unity Catalog
)

flavor 参数指定要使用的 MLflow 模型风格 模块,例如 mlflow.sklearnmlflow.xgboost

使用TrainingSet记录的模型会自动跟踪训练中使用的特征谱系。 有关详细文档,请参阅 “使用功能训练模型”。

score_batch()

使用自动特征查找执行批量推理

FeatureEngineeringClient.score_batch(
    model_uri: str,                           # URI of logged model
    df: DataFrame,                            # DataFrame with entity keys and timestamps
) -> DataFrame

score_batch 使用与模型一起存储的特征元数据,自动计算时间点上正确的特征以进行推理,从而确保与训练的一致性。 有关详细文档,请参阅 “使用功能训练模型”。

最佳做法

功能命名

  • 对业务关键功能使用描述性名称。
  • 跨团队遵循一致的命名约定。
  • 让自动生成处理探索功能。

时间范围

  • 使用偏移量排除不稳定的最近数据。
  • 将窗口边界与业务周期(每日、每周)对齐。
  • 考虑数据新鲜度与特征稳定性权衡。

Performance

  • 按数据源对功能进行分组,以最大程度地减少数据扫描。
  • 为用例使用适当的窗口大小。

Testing

  • 使用已知数据方案测试时间范围边界。

常见模式

客户分析

fe = FeatureEngineeringClient()
features = [
    # Recency: Number of transactions in the last day
    fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["transaction_id"],
            function=Count(), time_window=ContinuousWindow(window_duration=timedelta(days=1))),

    # Frequency: transaction count over the last 90 days
    fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["transaction_id"],
            function=Count(), time_window=ContinuousWindow(window_duration=timedelta(days=90))),

    # Monetary: total spend in the last month
    fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["amount"],
            function=Sum(), time_window=ContinuousWindow(window_duration=timedelta(days=30)))
]

走向分析

# Compare recent vs. historical behavior
fe = FeatureEngineeringClient()
recent_avg = fe.create_feature(
    catalog_name="main", schema_name="ecommerce",
    source=transactions, inputs=["amount"], function=Avg(),
    time_window=ContinuousWindow(window_duration=timedelta(days=7))
)

historical_avg = fe.create_feature(
    catalog_name="main", schema_name="ecommerce",
    source=transactions, inputs=["amount"], function=Avg(),
    time_window=ContinuousWindow(window_duration=timedelta(days=7), offset=timedelta(days=-7))
)

季节性模式

# Same day of week, 4 weeks ago
fe = FeatureEngineeringClient()
weekly_pattern = fe.create_feature(
    catalog_name="main", schema_name="ecommerce",
    source=transactions, inputs=["amount"], function=Avg(),
    time_window=ContinuousWindow(window_duration=timedelta(days=1), offset=timedelta(weeks=-4))
)

局限性

  • 在 API 中使用 create_training_set 时,实体和时间序列列的名称必须与训练(已标记)数据集和源表之间匹配。
  • 训练数据集中用作 label 列的列名不应存在于用于定义 Features 的源表中。
  • API 支持的函数列表(UDAF)是有限的。