本教程介绍如何仅将新的或更改的数据从数据仓库复制到 Lakehouse。 此方法称为增量加载,当您希望在不必每次复制所有内容的情况下保持您的数据 up-to日期时,这种方法非常有用。
下面是解决方案的高级设计:
选择水印列。 在源表中选择一个有助于跟踪新记录或更改的记录的列。 此列通常包含添加或更新行时增加的值(例如时间戳或 ID)。 我们将使用此列中的最高值作为“水印”来了解我们离开的位置。
设置表以存储最后一个水印值。
生成执行以下作的管道:
流水线包括这些活动:
- 两个查找活动。 第一个值获取最后一个水印值(上次停止的位置)。 第二个值获取新的水印值(我们这次将停止的位置)。 这两个值都传递给复制活动。
- 一个复制活动,用于查找水印列值介于旧水印和新水印之间的行。 然后将此数据从数据仓库复制到 Lakehouse 作为新文件。
- 一个存储过程的活动,负责保存新的水印值,以确保下次管道运行时能够知道从哪里开始。
先决条件
- 数据仓库。 你将使用数据仓库作为源数据存储。 如果没有数据仓库,请查看 “创建数据仓库” 以获取说明。
-
湖屋。 你将使用 Lakehouse 作为目标数据存储。 如果您没有一个,请参阅创建 Lakehouse以获取说明。
- 创建名为 IncrementalCopy 的文件夹来存储复制的数据。
准备您的源代码
在配置增量复制管道之前,让我们在数据仓库中设置所需的表和存储过程。
1. 在数据仓库中创建数据源表
在数据仓库中运行以下命令,以创建名为 data_source_table 的表作为源表。 我们将将其用作增量复制的示例数据。
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
源表中的数据如下所示:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
在本教程中,我们将 LastModifytime 用作水印列。
2. 在数据仓库中准备另一个表来存储最后一个水印值
在数据仓库中运行以下 SQL 命令,以创建名为 watermarktable 的表来存储最后一个水印值:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );使用您的源表名称设置最新水印的默认值。 在本教程中,表名称 data_source_table,我们将默认值设置为
1/1/2010 12:00:00 AM。INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')检查 水印表中的数据。
Select * from watermarktable输出:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. 在数据仓库中创建存储过程
运行以下命令,以在数据仓库中创建存储过程。 此存储过程在每个管道运行后更新最后一个水印值。
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
配置用于增量复制的管道
步骤 1:创建管道
转到 Power BI。
选择屏幕左下角的 Power BI 图标,然后选择 Fabric。
选择 “我的工作区 ”以打开 Fabric 工作区。
选择 “+ 新建项”,然后选择 “管道”,然后输入管道名称以创建新管道。
步骤 2:为最后一个水印添加查找活动
在此步骤中,你将创建一个查找活动来获取最后一个水印值。 我们将获取前面设置的默认值 1/1/2010 12:00:00 AM 。
选择 管道活动,然后从下拉列表中选择 查找。
在“ 常规 ”选项卡下,将此活动重命名为 LookupOldWaterMarkActivity。
在 “设置” 选项卡下,配置以下内容:
- 连接:在 “仓库 ”下选择“ 全部浏览”,然后从列表中选择数据仓库。
- 使用查询:选择“表”。
- 表:选择“dbo.watermarktable”。
- 仅限首行:已选择。
步骤 3:为新水印添加查找活动
在此步骤中,你将创建一个查找活动来获取新的水印值。 你将使用查询从源数据表获取新的水印。 我们将从 data_source_table 获取 LastModifytime 列中的最高值。
在顶部栏中,选择“活动”选项卡下的“查找”以添加第二个查找活动。
在“ 常规 ”选项卡下,将此活动重命名为 LookupNewWaterMarkActivity。
在 “设置” 选项卡下,配置以下内容:
连接:在 “仓库 ”下选择“ 全部浏览”,然后从列表中选择数据仓库,或者从 Fabric 项连接中选择数据仓库。
使用查询:选择“查询”。
查询:输入以下查询以选择最大上次修改时间作为新水印:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table仅限首行:已选择。
步骤 4:添加复制活动以复制增量数据
在此步骤中,你将添加一个复制活动,将上一个水印和新水印之间的增量数据从数据仓库复制到湖库。
选择顶部栏上的“活动”,然后选择“复制数据”->“添加到画布”以获取复制活动。
在“ 常规 ”选项卡下,将此活动重命名为 IncrementalCopyActivity。
将两个查找活动通过拖动它们附加的绿色按钮(成功时)连接到复制活动。 在看到复制活动的边框颜色变为蓝色时,松开鼠标按钮。
在 “源 ”选项卡下,配置以下内容:
连接:在 “仓库 ”下选择“ 全部浏览”,然后从列表中选择数据仓库,或者从 Fabric 项连接中选择数据仓库。
仓库:选择仓库。
使用查询:选择“查询”。
查询:输入以下查询,在最后一个水印和新水印之间复制增量数据。
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
在 “目标 ”选项卡下,配置以下内容:
- 连接:在 Lakehouse 下选择“全部浏览”,然后从列表中选择您的 Lakehouse,或者从 Fabric 项连接中选择您的 Lakehouse。
- 湖屋:选择你的湖屋。
- 根文件夹:选择“文件”。
-
文件路径:选择要在其中存储复制数据的文件夹。 选择“浏览”以选择文件夹。 对于文件名,打开“添加动态内容”,并在打开的窗口中输入
@CONCAT('Incremental-', pipeline().RunId, '.txt'),以便在湖屋中为复制的数据文件创建文件名。 - 文件格式:选择数据的格式类型。
步骤 5:添加存储过程活动
在此步骤中,你将添加一个存储过程活动,以更新下次管道运行的最后水印值。
选择顶部栏上的“活动”,然后选择“存储过程”以添加存储过程活动。
在“ 常规 ”选项卡下,将此活动重命名为 StoredProceduretoWriteWatermarkActivity。
将复制活动的绿色(在成功时)输出连接到存储过程活动。
在 “设置” 选项卡下,配置以下内容:
数据仓库:选择你的数据仓库。
存储过程名称:选择在数据仓库中创建的存储过程: [dbo].[usp_write_watermark]。
展开存储过程参数。 若要设置存储过程参数的值,请选择“ 导入”,然后输入参数的以下值:
名称 类型 值 LastModifiedtime DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName 字符串 @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
步骤 6:运行管道并监视结果
在顶部栏中,选择“开始”选项卡下的“运行”。然后选择“保存并运行”。 管道开始运行,可以在“ 输出 ”选项卡下监视它。
转到 Lakehouse,你会发现数据文件位于所选文件夹下。 可以选择文件来预览复制的数据。
添加更多数据以查看增量复制结果
完成第一个管道运行后,让我们向数据仓库源表添加更多数据,以查看此管道是否可以复制增量数据。
步骤 1:向源添加更多数据
通过运行以下查询将新数据插入数据仓库:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
data_source_table 的更新数据为:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
步骤 2:触发另一个管道运行并监视结果
返回到管道页。 在顶部栏上,再次选择“开始”选项卡下的“运行”。 管道开始运行,可以在 “输出”下监视它。
转到 Lakehouse,你会发现新复制的数据文件位于所选文件夹下。 可以选择文件来预览复制的数据。 你会看到增量数据出现在这个文件中。
相关内容
接下来,详细了解如何从 Azure Blob 存储复制到 Lakehouse。