以增量方式将数据从数据仓库加载到湖屋

本教程介绍如何仅将新的或更改的数据从数据仓库复制到 Lakehouse。 此方法称为增量加载,当您希望在不必每次复制所有内容的情况下保持您的数据 up-to日期时,这种方法非常有用。

下面是解决方案的高级设计:

示意图显示了以增量方式加载数据的逻辑。

  1. 选择水印列。 在源表中选择一个有助于跟踪新记录或更改的记录的列。 此列通常包含添加或更新行时增加的值(例如时间戳或 ID)。 我们将使用此列中的最高值作为“水印”来了解我们离开的位置。

  2. 设置表以存储最后一个水印值

  3. 生成执行以下作的管道

    流水线包括这些活动:

    • 两个查找活动。 第一个值获取最后一个水印值(上次停止的位置)。 第二个值获取新的水印值(我们这次将停止的位置)。 这两个值都传递给复制活动。
    • 一个复制活动,用于查找水印列值介于旧水印和新水印之间的行。 然后将此数据从数据仓库复制到 Lakehouse 作为新文件。
    • 一个存储过程的活动,负责保存新的水印值,以确保下次管道运行时能够知道从哪里开始。

先决条件

  • 数据仓库。 你将使用数据仓库作为源数据存储。 如果没有数据仓库,请查看 “创建数据仓库” 以获取说明。
  • 湖屋。 你将使用 Lakehouse 作为目标数据存储。 如果您没有一个,请参阅创建 Lakehouse以获取说明。
    • 创建名为 IncrementalCopy 的文件夹来存储复制的数据。

准备您的源代码

在配置增量复制管道之前,让我们在数据仓库中设置所需的表和存储过程。

1. 在数据仓库中创建数据源表

在数据仓库中运行以下命令,以创建名为 data_source_table 的表作为源表。 我们将将其用作增量复制的示例数据。

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

源表中的数据如下所示:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

在本教程中,我们将 LastModifytime 用作水印列。

2. 在数据仓库中准备另一个表来存储最后一个水印值

  1. 在数据仓库中运行以下 SQL 命令,以创建名为 watermarktable 的表来存储最后一个水印值:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. 使用您的源表名称设置最新水印的默认值。 在本教程中,表名称 data_source_table,我们将默认值设置为 1/1/2010 12:00:00 AM

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. 检查 水印表中的数据。

    Select * from watermarktable
    

    输出:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. 在数据仓库中创建存储过程

运行以下命令,以在数据仓库中创建存储过程。 此存储过程在每个管道运行后更新最后一个水印值。

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

配置用于增量复制的管道

步骤 1:创建管道

  1. 转到 Power BI

  2. 选择屏幕左下角的 Power BI 图标,然后选择 Fabric

  3. 选择 “我的工作区 ”以打开 Fabric 工作区。

  4. 选择 “+ 新建项”,然后选择 “管道”,然后输入管道名称以创建新管道。

    显示新创建的工作区中的“新建管道”按钮的屏幕截图。

    显示创建新管道的名称的屏幕截图。

步骤 2:为最后一个水印添加查找活动

在此步骤中,你将创建一个查找活动来获取最后一个水印值。 我们将获取前面设置的默认值 1/1/2010 12:00:00 AM

  1. 选择 管道活动,然后从下拉列表中选择 查找

  2. 在“ 常规 ”选项卡下,将此活动重命名为 LookupOldWaterMarkActivity

  3. “设置” 选项卡下,配置以下内容:

    • 连接:在 “仓库 ”下选择“ 全部浏览”,然后从列表中选择数据仓库。
    • 使用查询:选择“”。
    • :选择“dbo.watermarktable”。
    • 仅限首行:已选择。

    屏幕截图显示查找旧水印。

步骤 3:为新水印添加查找活动

在此步骤中,你将创建一个查找活动来获取新的水印值。 你将使用查询从源数据表获取新的水印。 我们将从 data_source_table 获取 LastModifytime 列中的最高值。

  1. 在顶部栏中,选择“活动”选项卡下的“查找”以添加第二个查找活动。

  2. 在“ 常规 ”选项卡下,将此活动重命名为 LookupNewWaterMarkActivity

  3. “设置” 选项卡下,配置以下内容:

    • 连接:在 “仓库 ”下选择“ 全部浏览”,然后从列表中选择数据仓库,或者从 Fabric 项连接中选择数据仓库。

    • 使用查询:选择“查询”。

    • 查询:输入以下查询以选择最大上次修改时间作为新水印:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • 仅限首行:已选择。

    屏幕截图显示查找新水印。

步骤 4:添加复制活动以复制增量数据

在此步骤中,你将添加一个复制活动,将上一个水印和新水印之间的增量数据从数据仓库复制到湖库。

  1. 选择顶部栏上的“活动”,然后选择“复制数据”->“添加到画布”以获取复制活动。

  2. 在“ 常规 ”选项卡下,将此活动重命名为 IncrementalCopyActivity

  3. 将两个查找活动通过拖动它们附加的绿色按钮(成功时)连接到复制活动。 在看到复制活动的边框颜色变为蓝色时,松开鼠标按钮。

    屏幕截图显示了连接查找和复制活动。

  4. “源 ”选项卡下,配置以下内容:

    • 连接:在 “仓库 ”下选择“ 全部浏览”,然后从列表中选择数据仓库,或者从 Fabric 项连接中选择数据仓库。

    • 仓库:选择仓库。

    • 使用查询:选择“查询”。

    • 查询:输入以下查询,在最后一个水印和新水印之间复制增量数据。

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    显示复制源配置的屏幕截图。

  5. “目标 ”选项卡下,配置以下内容:

    • 连接:在 Lakehouse 下选择“全部浏览”,然后从列表中选择您的 Lakehouse,或者从 Fabric 项连接中选择您的 Lakehouse。
    • 湖屋:选择你的湖屋。
    • 根文件夹:选择“文件”。
    • 文件路径:选择要在其中存储复制数据的文件夹。 选择“浏览”以选择文件夹。 对于文件名,打开“添加动态内容”,并在打开的窗口中输入 @CONCAT('Incremental-', pipeline().RunId, '.txt'),以便在湖屋中为复制的数据文件创建文件名。
    • 文件格式:选择数据的格式类型。

    显示复制目标配置的屏幕截图。

步骤 5:添加存储过程活动

在此步骤中,你将添加一个存储过程活动,以更新下次管道运行的最后水印值。

  1. 选择顶部栏上的“活动”,然后选择“存储过程”以添加存储过程活动。

  2. 在“ 常规 ”选项卡下,将此活动重命名为 StoredProceduretoWriteWatermarkActivity

  3. 将复制活动的绿色(在成功时)输出连接到存储过程活动。

  4. “设置” 选项卡下,配置以下内容:

    • 数据仓库:选择你的数据仓库。

    • 存储过程名称:选择在数据仓库中创建的存储过程: [dbo].[usp_write_watermark]

    • 展开存储过程参数。 若要设置存储过程参数的值,请选择“ 导入”,然后输入参数的以下值:

      名称 类型
      LastModifiedtime DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName 字符串 @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    显示存储过程活动配置的屏幕截图。

步骤 6:运行管道并监视结果

在顶部栏中,选择“开始”选项卡下的“运行”。然后选择“保存并运行”。 管道开始运行,可以在“ 输出 ”选项卡下监视它。

显示管道运行结果的屏幕截图。

转到 Lakehouse,你会发现数据文件位于所选文件夹下。 可以选择文件来预览复制的数据。

显示第一次管道运行的湖屋数据的屏幕截图。

显示第一次管道运行的湖屋数据预览的屏幕截图。

添加更多数据以查看增量复制结果

完成第一个管道运行后,让我们向数据仓库源表添加更多数据,以查看此管道是否可以复制增量数据。

步骤 1:向源添加更多数据

通过运行以下查询将新数据插入数据仓库:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

data_source_table 的更新数据为:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

步骤 2:触发另一个管道运行并监视结果

返回到管道页。 在顶部栏上,再次选择“开始”选项卡下的“运行”。 管道开始运行,可以在 “输出”下监视它。

转到 Lakehouse,你会发现新复制的数据文件位于所选文件夹下。 可以选择文件来预览复制的数据。 你会看到增量数据出现在这个文件中。

显示第二次管道运行的湖屋数据的屏幕截图。

显示第二次管道运行的湖屋数据预览的屏幕截图。

接下来,详细了解如何从 Azure Blob 存储复制到 Lakehouse。