更改数据馈送允许 Azure Databricks 跟踪 Delta 表版本之间的行级别更改。 对 Delta 表启用此功能后,运行时会记录写入该表的所有数据的“更改事件”。 这包括行数据以及指示已插入、已删除还是已更新指定行的元数据。
可以使用更改数据馈送为常见数据用例提供支持,包括:
- ETL 管道:以增量方式处理自上次管道运行以来更改的行。
- 审核线索:跟踪符合性和治理要求的数据修改。
- 数据复制:同步对下游表、缓存或外部系统的更改。
重要说明
更改数据馈送与表历史记录协同工作,以提供更改信息。 由于克隆 Delta 表会创建单独的历史记录,因此克隆表上的更改数据馈送与原始表的历史记录不匹配。
启用更改数据馈送
必须在要从中读取的表上显式启用更改数据馈送。 使用以下方法之一。
新建表
在CREATE TABLE命令中设置表属性delta.enableChangeDataFeed = true。
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
现有表
在ALTER TABLE命令中设置表属性delta.enableChangeDataFeed = true。
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
会话中的所有新表
设置 Spark 配置,为会话中创建的所有新表启用更改数据馈送。
SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
重要说明
仅记录启用更改数据馈送后所做的更改。 不会捕获之前对表所做的更改。
更改数据馈送架构
当你读取表的更改数据馈送时,将使用最新表版本的架构。 Azure Databricks 完全支持大多数架构更改和演变作,但启用了列映射的表具有限制。 请参阅 更改具有列映射的表的数据馈送限制。
除了 Delta 表架构中的数据列之外,更改数据馈送还包含用于标识更改事件类型的元数据列:
| 列名称 | 类型 | 值 |
|---|---|---|
_change_type |
字符串 |
insert、、 update_preimageupdate_postimage、 delete(1) |
_commit_version |
长整型 | 包含更改的 Delta 日志或表版本。 |
_commit_timestamp |
时间戳 | 创建提交时关联的时间戳。 |
(1)preimage 是更新前的值,postimage 是更新后的值。
如果架构包含与这些元数据列具有相同名称的列,则不能对表启用更改数据馈送。 在启用更改数据馈送之前,重命名表中的列以解决此冲突。
以增量方式处理更改数据
Databricks 建议结合使用更改数据馈送和结构化流式处理,以增量处理 Delta 表中的更改。 您必须使用 Azure Databricks 的结构化流式处理来自动跟踪表的更改数据馈送的版本变更。 有关 SCD 类型 1 或类型 2 表的 CDC 处理,请参阅 AUTO CDC API:使用管道简化更改数据捕获。
当针对表配置流来读取更改数据馈送时,将 readChangeFeed 选项设置为 true,如以下语法示例所示:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)
Scala(编程语言)
spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
默认行为
当流首次启动时,它将表的最新快照作为 INSERT 记录返回,然后返回未来的更改作为变更数据。 更改数据作为 Delta Lake 事务的一部分提交,并在向表提交新数据的同时变为可用。
其他选项
可以选择指定起始版本(请参阅 指定起始版本)或使用批处理执行(请参阅 批量查询中的读取更改)。 Azure Databricks 在读取更改数据时,还支持速率限制(maxFilesPerTrigger,maxBytesPerTrigger)和 excludeRegex。
对于除启动快照以外的版本,速率限制以原子方式应用于整个提交,即整个提交要么被纳入当前批处理中,要么被延迟到下一批。
指定起始版本
若要从特定点读取更改,请使用时间戳或版本号指定起始版本。 批量读取需要起始版本。 可以选择指定结束版本来限制范围。 若要了解有关 Delta Lake 表历史记录的详细信息,请参阅什么是 Delta Lake 时间旅行?
配置涉及更改数据馈送的结构化流式处理工作负荷时,请了解如何指定起始版本会影响处理:
- 新的数据处理管道通常会受益于默认设置,该设置在流首次启动时将表中所有现有记录记录为
INSERT操作。 - 如果目标表已经包含所有记录,且有到特定时间点的适当更改,请执行起始版本,以避免将源表状态作为
INSERT事件进行处理。
下面的示例展示了用于从检查点损坏导致的流式处理失败中恢复的语法。 在本例中,假设条件如下:
- 在创建表时,源表上启用了更改数据馈送。
- 目标下游表已处理直到版本 75(含)的所有更改。
- 源表的版本历史记录可用于版本 70 及更高版本。
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
)
Scala(编程语言)
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
在此示例中,还必须指定新的检查点位置。
重要说明
如果指定起始版本,那么当表历史记录中不再有起始版本时,流无法从新的检查点启动。 Delta Lake 会自动清理历史版本,这意味着最终会删除所有指定的起始版本。
请参阅 重播表历史记录。
在批处理查询中读取更改
可以使用批量查询语法来读取从特定版本开始的所有更改,或者读取指定版本范围中的更改。
- 请将版本指定为整数,将时间戳指定为格式为
yyyy-MM-dd[ HH:mm:ss[.SSS]]的字符串。 - 开始和结束版本是包容性的。
- 若要从起始版本读取到最新版本,请仅指定起始版本。
- 启用更改数据馈送之前指定版本会引发错误。
以下语法示例演示如何对批量读取使用起始和结束版本选项:
SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name,
-- with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
Python
# version as ints or longs
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala(编程语言)
// version as ints or longs
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
处理范围外版本
默认情况下,指定超过上次提交的版本或时间戳将引发错误 timestampGreaterThanLatestCommit。 在 Databricks Runtime 11.3 LTS 及更高版本中,可以启用对范围外版本的容忍度:
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
启用此设置:
- 在上次提交之后启动版本/时间戳:返回空结果。
- 上次提交之后的结束版本/时间戳:从头到上一次提交返回所有更改。
记录数据更改
Delta Lake 有效记录数据更改,并可能使用其他 Delta Lake 功能优化存储表示形式。
存储注意事项
- 存储成本:启用更改数据馈送可能会导致存储成本小幅增加,因为更改可能会记录在单独的文件中。
- 没有更改文件的操作:某些操作(仅插入、完整分区删除)不会生成更改数据文件:Azure Databricks 直接从事务日志计算更改数据流。
-
保留:更改数据文件遵循表的保留策略。 该
VACUUM命令将删除它们,并从事务日志更改后遵循检查点保留。
不要尝试通过直接查询更改数据文件来重新构造更改数据馈送。 始终使用 Delta Lake API。
回放表历史
更改数据馈送不打算用作表的所有更改的永久记录。 它仅记录启用后发生的更改,你可以启动新的流式读取来捕获当前版本和所有后续更改。
更改数据馈送中的记录是暂时性的,只能用于指定的保留时段。 Delta Lake 事务日志会定期删除表版本及其相应的更改数据馈送版本。 删除某个版本后,无法再读取该版本的更改数据馈送。
存档永久历史记录的更改数据
如果用例需要保留对表的所有更改的永久历史记录,请使用增量逻辑将记录从更改数据馈送写入新表。 以下示例演示如何将 trigger.AvailableNow 可用数据作为批处理工作负荷进行处理,以便进行审核或完全可重播:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala(编程语言)
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
更改具有列映射的表的数据馈送限制
在 Delta 表上启用列映射后,可以在不重写数据文件的情况下删除或重命名列。 但是,数据变更馈送在遇到非追加架构更改时,如重命名或删除列、更改数据类型或进行可为空性的调整,会存在一些限制:
- 批处理语义:无法读取发生非累加架构更改的事务或范围的更改数据馈送。
- DBR 12.2 LTS 及更低版本:已启用列映射且经历了非附加性架构更改的表不支持对更改数据馈送进行流式读取。 请参阅使用列映射和架构更改进行流式处理。
- DBR 11.3 LTS 及更低版本:无法读取启用了列映射且已启用列重命名或删除的表的更改数据馈送。
在 Databricks Runtime 12.2 LTS 及更高版本中,可以对启用了列映射且经历过非添加性架构更改的表的更改数据馈送执行批量读取。 读取操作使用查询中指定的最终版本的架构,而不是最新的表版本。 如果版本范围跨越非累加架构更改,查询仍然失败。