AUTO CDC API:使用管道简化变更数据捕获

Lakeflow Spark 声明性管道(SDP)通过 AUTO CDCAUTO CDC FROM SNAPSHOT API 简化了更改数据捕获(CDC)。

注释

AUTO CDC API 替换 APPLY CHANGES API,并具有相同的语法。 APPLY CHANGES API 仍然可用,但 Databricks 建议在其位置使用 AUTO CDC API。

使用的接口取决于更改数据源:

  • AUTO CDC 来处理更改数据馈送(CDF)中的更改。
  • 使用 AUTO CDC FROM SNAPSHOT (仅适用于 Python)处理数据库快照中的更改。

以前,该 MERGE INTO 语句通常用于处理 Azure Databricks 上的 CDC 记录。 但是, MERGE INTO 由于序列外记录或需要复杂的逻辑重新排序记录,可能会生成不正确的结果。

AUTO CDC API 在管道 SQL 和 Python 接口中得到支持。 Python 接口支持该 AUTO CDC FROM SNAPSHOT API。 AUTO CDC Apache Spark 声明性管道不支持这些 API。

AUTO CDCAUTO CDC FROM SNAPSHOT都支持使用 SCD 类型 1 和类型 2 更新表。

  • 使用 SCD 类型 1 直接更新记录。 历史不会保留更新后的记录。
  • 使用 SCD 类型 2 保留记录的历史,可以选择在所有更新中保留,或者仅在指定列集的更新中保留。

有关语法和其他引用,请参阅管道的 AUTO CDC(SQL)管道的 AUTO CDC(Python)管道的 AUTO CDC FROM SNAPSHOT(Python)。

注释

本文介绍如何根据源数据的变化更新数据管道中的表。 若要了解如何记录和查询 Delta 表的行级更改信息,请参阅 在 Azure Databricks 上使用 Delta Lake 更改数据馈送

要求

若要使用 CDC API,您的管道必须配置为使用 无服务器 SDP 或 SDPProAdvanced版本

CDC 如何通过 AUTO CDC API 实现?

通过自动处理序列外记录,AUTO CDC API 可确保正确处理 CDC 记录,并不需要开发复杂的逻辑来处理序列外记录。 必须在源数据中指定要对记录进行排序的列,API 将其解释为源数据正确排序的单调增加表示形式。 管道会自动处理无序到达的数据。 对于 SCD 类型 2 更改,管道会将相应的排序值传播到目标表 __START_AT__END_AT 列。 每个排序值中每个键应有一个不同的更新,不支持 NULL 排序值。

若要使用 AUTO CDC 执行 CDC 处理,请先创建流式表,然后在 SQL 中使用 AUTO CDC ... INTO 语句或在 Python 中使用 create_auto_cdc_flow() 函数来指定更改源、键和顺序以设置变更馈送。 若要创建目标流式处理表,请使用 CREATE OR REFRESH STREAMING TABLE SQL 中的语句或 create_streaming_table() Python 中的函数。 请参阅 SCD 类型 1 和类型 2 处理 示例。

有关语法详细信息,请参阅管道 SQL 引用Python 参考

CDC 如何通过 AUTO CDC FROM SNAPSHOT API 实现?

AUTO CDC FROM SNAPSHOT 是一个声明性 API,它通过比较一系列有序快照来有效地确定源数据的变化,然后运行 CDC 处理快照中记录所需的处理。 AUTO CDC FROM SNAPSHOT 仅 Python 管道接口支持。

AUTO CDC FROM SNAPSHOT 支持从多种源类型导入快照。

  • 使用定期快照导入从现有表或视图中获取的快照。 AUTO CDC FROM SNAPSHOT 具有一个简单的简化界面,用于支持定期从现有数据库对象引入快照。 每次管道更新时,新快照被引入,并将引入的时间用作快照版本号。 在连续模式下运行管道时,将在每个管道更新中引入多个快照,该周期由包含处理的流的AUTO CDC FROM SNAPSHOT设置决定。
  • 使用历史快照引入来处理包含数据库快照的文件,例如从 Oracle 或 MySQL 数据库或数据仓库生成的快照。

若要使用任意源类型 AUTO CDC FROM SNAPSHOT进行 CDC 处理,请先创建一个流表,然后在 Python 中使用函数 create_auto_cdc_from_snapshot_flow() 来指定快照、键和实现处理所需的其他参数。 请参阅 定期快照引入历史快照引入 示例。

传递给 API 的快照必须按版本按升序排列。 如果 SDP 检测到快照的顺序不正确,则会抛出异常。

有关语法详细信息,请参阅管道 Python 参考

使用多个列进行排序

可以按多个列进行排序(例如时间戳和 ID 来解决并列),可以使用 STRUCT 将它们结合起来:首先按 STRUCT 的第一个字段排序,如果有并列情况,再考虑第二个字段,依此类推。

SQL 中的示例:

SEQUENCE BY STRUCT(timestamp_col, id_col)

Python 中的示例:

sequence_by = struct("timestamp_col", "id_col")

局限性

用于排序的列必须是可排序数据类型。

示例:使用 CDF 源数据处理 SCD 类型 1 和 SCD 类型 2

以下部分提供了 SCD 类型 1 和类型 2 查询的示例,这些查询基于更改数据馈送中的源事件更新目标表:

  1. 创建新的用户记录。
  2. 删除用户记录。
  3. 更新用户记录。 在 SCD 类型 1 示例中,最后一个 UPDATE 操作延迟到达,并且从目标表中删除,演示了如何处理无序事件。

以下示例假定熟悉配置和更新管道。 请参阅 教程:使用变更数据捕获生成 ETL 管道

若要运行这些示例,必须首先创建示例数据集。 请参阅 “生成测试数据”。

下面是这些示例的输入记录:

userId 姓名 城市 操作 序列号
124 劳尔 瓦哈卡州 INSERT 1
123 伊莎贝尔 蒙特雷 INSERT 1
125 梅赛德斯 提 华纳 INSERT 2
126 百合 坎昆 INSERT 2
123 null null DELETE 6
125 梅赛德斯 瓜达拉哈拉 UPDATE 6
125 梅赛德斯 Mexicali UPDATE 5
123 伊莎贝尔 奇瓦瓦州 UPDATE 5

如果取消注释示例数据中的最后一行,它将插入以下记录,指定应截断记录的位置:

userId 姓名 城市 操作 序列号
null null null 截断 3

注释

以下所有示例都包含指定DELETETRUNCATE操作的选项,但每个操作都是可选的。

处理 SCD 第1类更新

以下示例演示如何处理 SCD 类型 1 更新:

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flowname AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

运行 SCD 类型 1 示例后,目标表包含以下记录:

userId 姓名 城市
124 劳尔 瓦哈卡州
125 梅赛德斯 瓜达拉哈拉
126 百合 坎昆

在运行带有附加TRUNCATE记录的 SCD 类型 1 示例之后,由于在124进行的126操作,记录TRUNCATEsequenceNum=3被截断,目标表包含以下记录:

userId 姓名 城市
125 梅赛德斯 瓜达拉哈拉

处理 SCD 类型 2 更新

以下示例演示如何处理 SCD 类型 2 更新:

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

运行 SCD 类型 2 示例后,目标表包含以下记录:

userId 姓名 城市 __START_AT __END_AT
123 伊莎贝尔 蒙特雷 1 5
123 伊莎贝尔 奇瓦瓦州 5 6
124 劳尔 瓦哈卡州 1 null
125 梅赛德斯 提 华纳 2 5
125 梅赛德斯 Mexicali 5 6
125 梅赛德斯 瓜达拉哈拉 6 null
126 百合 坎昆 2 null

SCD 类型 2 查询还可以指定要跟踪的目标表中历史记录的输出列的子集。 对其他列的更改将就地更新,而不是生成新的历史记录。 以下示例演示如何从跟踪中排除 city 列:

以下示例演示如何将跟踪历史记录与 SCD 类型 2 配合使用:

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

在没有其他 TRUNCATE 记录的情况下运行此示例后,目标表包含以下记录:

userId 姓名 城市 __START_AT __END_AT
123 伊莎贝尔 奇瓦瓦州 1 6
124 劳尔 瓦哈卡州 1 null
125 梅赛德斯 瓜达拉哈拉 2 null
126 百合 坎昆 2 null

生成测试数据

下面提供了以下代码来生成示例数据集,以便在本教程中提供的示例查询中使用。 假设你有适当的凭据来创建新架构并创建新表,则可以使用笔记本或 Databricks SQL 运行这些语句。 以下代码 不应 作为管道定义的一部分运行:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

示例:定期快照处理

下面的示例演示了 SCD 类型 2 处理,该处理获取存储在mycatalog.myschema.mytable位置的表的快照。 处理结果将写入名为 target 的表。

mycatalog.myschema.mytable 在时间戳 2024-01-01 00:00:00 的记录

Key 价值
1 a1
2 a2

mycatalog.myschema.mytable 时间戳 2024-01-01 12:00:00 的记录

Key 价值
2 b2
3 a3
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dp.create_streaming_table("target")

dp.create_auto_cdc_from_snapshot_flow(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

处理快照后,目标表包含以下记录:

Key 价值 __START_AT __END_AT
1 a1 2024年01月01日 00:00:00 2024-01-01 12:00:00
2 a2 2024年01月01日 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 null
3 a3 2024-01-01 12:00:00 null

示例:历史快照处理

以下示例演示 SCD 类型 2 的处理,该处理根据存储在云存储系统中的两个快照的源事件来更新目标表:

快照位置 timestamp,存储在 /<PATH>/filename1.csv

Key TrackingColumn 非跟踪列
1 a1 b1
2 a2 b2
4 a4 b4

快照位置 timestamp + 5,存储在 /<PATH>/filename2.csv

Key TrackingColumn 非跟踪列
2 a2_new b2
3 a3 b3
4 a4 b4_new

以下代码示例演示如何使用以下快照处理 SCD 类型 2 更新:

from pyspark import pipelines as dp

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dp.create_streaming_live_table("target")

dp.create_auto_cdc_from_snapshot_flow(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

处理快照后,目标表包含以下记录:

Key TrackingColumn 非跟踪列 __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 null
3 a3 b3 2 null
4 a4 b4_new 1 null

在目标流式处理表中添加、更改或删除数据

如果管道将表发布到 Unity 目录,则可以使用 数据作语言 (DML)语句(包括插入、更新、删除和合并语句)修改由 AUTO CDC ... INTO 语句创建的目标流式处理表。

注释

  • 不支持用于修改流式处理表的表架构的 DML 语句。 确保 DML 语句不会尝试修改表架构。
  • 只能在使用 Databricks Runtime 13.3 LTS 及更高版本的 Unity Catalog 共享群集或 SQL 仓库中运行用于更新流式表的 DML 语句。
  • 由于流式传输要求仅追加数据源,因此如果你的处理需要从包含更改的源流式传输表进行流式传输(例如,通过 DML 语句),请在读取源流式传输表时设置 skipChangeCommits 标志。 设置 skipChangeCommits 后,将会忽略删除或修改源表上记录的事务。 如果你的处理不需要某个流式处理表,则可使用具体化视图(没有“仅追加”限制)作为目标表。

由于 Lakeflow Spark 声明性管道使用指定的SEQUENCE BY列,并将适当的排序值传播到目标表的__START_AT__END_AT列(适用于 SCD 类型 2),因此必须确保 DML 语句在这些列中使用有效值以维护记录的正确排序。 请参阅 如何使用 AUTO CDC API 实现 CDC?

有关对流式处理表使用 DML 语句的详细信息,请参阅 在流式处理表中添加、更改或删除数据

以下示例插入一个活动记录,其起始序列为 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

从 AUTO CDC 目标表读取变更数据馈送

在 Databricks Runtime 15.2 及以上版本中,你可以从作为 AUTO CDCAUTO CDC FROM SNAPSHOT 查询目标的流式处理表中读取更改数据馈送,方式与从其他 Delta 表读取更改数据馈送相同。 从目标流式处理表读取更改数据馈送需要满足以下条件:

  • 目标流式处理表必须发布到 Unity Catalog。 请参阅 将 Unity Catalog 与管道结合使用
  • 为了从目标流式处理表中读取变更数据流,您必须使用 Databricks Runtime 15.2 或更高版本。 若要读取其他管道中的更改数据馈送,必须将管道配置为使用 Databricks Runtime 15.2 或更高版本。

从在 Lakeflow Spark 声明性管道中创建的目标流式处理表读取更改数据馈送的方式与从其他 Delta 表读取更改数据馈送的方式相同。 若要详细了解如何使用 Delta 更改数据馈送功能,包括 Python 和 SQL 中的示例,请参阅 在 Azure Databricks 上使用 Delta Lake 更改数据馈送

注释

更改数据馈送记录包括标识更改事件的类型的 元数据 。 在表中更新记录时,关联更改记录的元数据通常包括 _change_type 值,以及设置为 update_preimageupdate_postimage 的事件。

但是,如果对目标流表进行更新并包含更改主键值,则这些 _change_type 值会有所不同。 更改包括对主键的更新时,元数据 _change_type 字段将设置为 insertdelete 事件。 在对具有 UPDATEMERGE 语句的某个键字段进行手动更新时,或对于 SCD 类型 2 表,当 __start_at 字段更改以反映较早的起始序列值时,主键可能会发生更改。

查询 AUTO CDC 用于确定主键值,而在确定这些主键值的方法上,SCD 类型 1 和 SCD 类型 2 的处理是不同的。

  • 对于 SCD 类型 1 处理和管道 Python 接口,主键是函数中keys参数的值create_auto_cdc_flow()。 对于 SQL 接口,主键是由KEYS语句中的AUTO CDC ... INTO子句定义的列。
  • 对于 SCD 类型 2,主键是 keys 参数或 KEYS 子句加上 coalesce(__START_AT, __END_AT) 操作的返回值,其中 __START_AT__END_AT 是目标流式处理表中的相应列。

获取有关管道中 CDC 查询处理的记录的数据

注释

以下指标仅由 AUTO CDC 查询捕获,而不是由 AUTO CDC FROM SNAPSHOT 查询捕获。

以下指标由 AUTO CDC 查询捕获:

  • num_upserted_rows:更新期间插入数据集的输出行数。
  • num_deleted_rows:更新期间从数据集中删除的现有输出行数。

对于 num_output_rows 查询,不会捕获 AUTO CDC 非 CDC 流的指标(输出)。

哪些数据对象用于管道中的 CDC 处理?

注释

  • 这些数据结构仅适用于 AUTO CDC 处理,不适用于 AUTO CDC FROM SNAPSHOT 处理。
  • 仅当目标表发布到 Hive 元存储时,这些数据结构才适用。 如果管道发布到 Unity Catalog,则用户无法访问内部基础表。

在 Hive 元存储中声明目标表时,会创建两个数据结构:

  • 一个使用分配给目标表的名称的视图。
  • 管道用于管理 CDC 处理的内部后盾表。 此表通过追加 __apply_changes_storage_ 到目标表名称进行命名。

例如,如果声明了名为 dp_cdc_target的目标表,则会看到名为 dp_cdc_target 的视图和元存储中命名 __apply_changes_storage_dp_cdc_target 的表。 创建视图使得 Lakeflow Spark 声明性管道能够过滤掉处理无序数据所需的额外信息(例如,墓碑和版本)。 若要查看已处理的数据,请查询目标视图。 由于表的 __apply_changes_storage_ 架构可能会更改为支持将来的功能或增强功能,因此不应查询表以供生产使用。 如果手动向表添加数据,则假定记录位于其他更改之前,因为缺少版本列。

其他资源