Compartilhar via


Carregar dados do Data Warehouse para o Lakehouse incrementalmente

Neste tutorial, você aprenderá a copiar apenas dados novos ou alterados do Data Warehouse para um Lakehouse. Essa abordagem é chamada de carregamento incremental e é útil quando você deseja manter seus dados up-to-date sem copiar tudo todas as vezes.

Aqui está o design de alto nível da solução:

Diagrama mostrando a lógica para carregar dados incrementalmente.

  1. Escolha uma coluna de marca d'água. Escolha uma coluna na tabela de origem que ajude a controlar registros novos ou alterados. Esta coluna geralmente contém valores que aumentam quando as linhas são adicionadas ou atualizadas (como um carimbo de data/hora ou ID). Usaremos o valor mais alto nesta coluna como nossa "marca d'água" para sabermos onde paramos.

  2. Configure uma tabela para armazenar seu último valor de marca de referência.

  3. Crie um pipeline que faça o seguinte:

    O fluxo de trabalho inclui estas atividades:

    • Duas atividades de pesquisa. O primeiro obtém o último valor de marca d'água (onde paramos na última vez). O segundo obtém o novo valor da marca d'água (onde vamos parar desta vez). Ambos os valores são passados para a atividade de cópia.
    • Uma atividade de cópia que localiza linhas em que o valor da coluna de marca d'água está entre as marcas d'água antigas e novas. Em seguida, ele copia esses dados do Data Warehouse para o Lakehouse como um novo arquivo.
    • Atividade de procedimento armazenado que salva o novo valor da marca d'água, permitindo que a próxima execução do pipeline saiba por onde começar.

Pré-requisitos

  • Data Warehouse. Você usará o Data Warehouse como seu armazenamento de dados de origem. Se você não tiver um, confira Criar um Data Warehouse para obter instruções.
  • Lakehouse Você usará o Lakehouse como seu repositório de dados de destino. Se você não tiver um, consulte Criar um Lakehouse para obter instruções.
    • Crie uma pasta chamada IncrementalCopy para armazenar seus dados copiados.

Prepare sua fonte

Vamos configurar as tabelas e o procedimento armazenado necessários no Data Warehouse antes de configurar o pipeline de cópia incremental.

1. Criar uma tabela de dados de origem no seu Data Warehouse

Execute o seguinte comando SQL em seu Data Warehouse para criar uma tabela chamada data_source_table como sua tabela de origem. Usaremos isso como dados de exemplo para a cópia incremental.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

Os dados na tabela de origem têm esta aparência:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

Neste tutorial, usaremos LastModifytime como a coluna de marca d'água.

2. Preparar outra tabela no seu Data Warehouse para armazenar o último valor de marca d'água

  1. Execute o comando SQL a seguir no seu Data Warehouse para criar uma tabela chamada watermarktable para armazenar o último valor de marca d'água:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. Defina o valor padrão inicial da última marca d'água utilizando o nome da sua tabela de origem. Neste tutorial, o nome da tabela é data_source_table e definiremos o valor padrão como 1/1/2010 12:00:00 AM.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Verifique os dados em sua watermarktable.

    Select * from watermarktable
    

    Saída:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Criar um procedimento armazenado no seu Data Warehouse

Execute o comando a seguir para criar um procedimento armazenado no seu Data Warehouse. Esse procedimento armazenado atualiza o último valor da marca d'água após cada execução do pipeline.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Configurar um pipeline para a cópia incremental

Etapa 1: Criar um pipeline

  1. Vá para o Power BI.

  2. Selecione o ícone do Power BI na parte inferior esquerda da tela e selecione Fabric.

  3. Selecione Meu workspace para abrir o workspace do Fabric.

  4. Selecione + Novo Item e, em seguida, selecione Pipeline e, em seguida, insira um nome de pipeline para criar um novo pipeline.

    Captura de tela mostrando o novo botão de pipeline no espaço de trabalho recém-criado.

    Captura de tela mostrando o nome da criação de um novo pipeline.

Etapa 2: Adicionar uma atividade de pesquisa para a última marca d'água

Nesta etapa, você criará uma atividade de consulta para obter o último valor de marca d'água. Obteremos o valor 1/1/2010 12:00:00 AM padrão que definimos anteriormente.

  1. Selecione a atividade pipeline e selecione Pesquisa na lista suspensa.

  2. Na guia Geral , renomeie essa atividade como LookupOldWaterMarkActivity.

  3. Na guia Configurações , configure o seguinte:

    • Conexão: Em Warehouse, selecione Procurar todos e selecione seu data warehouse na lista.
    • Usar consulta: escolha Tabela.
    • Tabela: escolha dbo.watermarktable.
    • Somente a primeira linha: selecionada.

    Captura de tela mostrando uma pesquisa da marca d'água antiga.

Etapa 3: Adicionar uma atividade de pesquisa para a marca d'água nova

Nesta etapa, você criará uma atividade de consulta para obter o novo valor de referência. Você usará uma consulta para obter a nova marca d'água da tabela de dados de origem. Obteremos o valor mais alto na coluna LastModifytime de data_source_table.

  1. Na barra superior, selecione Pesquisa na guia Atividades para adicionar a segunda atividade de pesquisa.

  2. Na guia Geral , renomeie essa atividade como LookupNewWaterMarkActivity.

  3. Na guia Configurações , configure o seguinte:

    • Conexão: em Armazém de Dados, selecione Pesquisar todos e selecione seu data warehouse na lista ou selecione seu data warehouse nas conexões de itens do Fabric.

    • Usar consulta: escolha Consulta.

    • Consulta: insira a consulta a seguir para escolher o valor máximo da hora da última modificação como a nova marca d'água:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • Somente a primeira linha: selecionada.

    Captura de tela mostrando uma pesquisa da marca d'água nova.

Etapa 4: Adicionar a atividade de cópia para copiar dados incrementais

Nesta etapa, você adicionará uma atividade de cópia para transferir os dados incrementais entre a última marca temporal e a nova marca temporal do seu Data Warehouse para o seu Lakehouse.

  1. Selecione Atividades na barra superior e selecione Copiar dados ->Adicionar à tela para obter a atividade de cópia.

  2. Na guia Geral , renomeie essa atividade como IncrementalCopyActivity.

  3. Conecte ambas as atividades de pesquisa à atividade de cópia. Faça isso arrastando o botão verde (Em caso de sucesso) anexado às atividades de pesquisa para a atividade de cópia. Solte o botão do mouse quando a cor da borda da atividade de cópia ficar verde.

    Captura de tela mostrando as atividades de pesquisa e cópia sendo conectadas.

  4. Na guia Origem , configure o seguinte:

    • Conexão: em Armazém de Dados, selecione Pesquisar todos e selecione seu data warehouse na lista ou selecione seu data warehouse nas conexões de itens do Fabric.

    • Warehouse: selecione seu armazém.

    • Usar consulta: escolha Consulta.

    • Consulta: Insira a consulta a seguir para copiar dados incrementais entre a marca d'água anterior e a nova marca d'água.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    Captura de tela mostrando a configuração de copiar origem.

  5. Na guia Destino , configure o seguinte:

    • Conexão: Em Lakehouse, selecione Navegar todos e escolha seu lakehouse na lista ou selecione seu lakehouse nas conexões de item do Fabric.
    • Lakehouse: selecione seu Lakehouse.
    • Pasta raiz: escolha Arquivos.
    • Caminho do arquivo: escolha a pasta na qual deseja armazenar os dados copiados. Selecione Procurar para selecionar sua pasta. No que se refere ao nome do arquivo, abra Adicionar conteúdo dinâmico e digite @CONCAT('Incremental-', pipeline().RunId, '.txt') na janela que foi aberta para criar nomes de arquivos para o seu arquivo de dados copiados no Lakehouse.
    • Formato do arquivo: selecione o tipo de formato dos seus dados.

    Captura de tela mostrando a configuração de destino da cópia.

Etapa 5: Adicionar uma atividade de procedimento armazenado

Nesta etapa, você adicionará uma atividade de procedimento armazenado para atualizar o último valor de marcador para a próxima execução do pipeline.

  1. Selecione Atividades na barra superior e selecione Procedimento armazenado para adicionar uma atividade de procedimento armazenado.

  2. Na guia Geral , renomeie essa atividade como StoredProceduretoWriteWatermarkActivity.

  3. Conecte o resultado verde (Concluída com sucesso) da atividade de cópia à atividade de procedimento armazenado.

  4. Na guia Configurações , configure o seguinte:

    • Data Warehouse: selecione seu Data Warehouse.

    • Nome do procedimento armazenado: escolha o procedimento armazenado que você criou em seu Data Warehouse: [dbo].[ usp_write_watermark].

    • Expanda Parâmetros de procedimento armazenado. Para definir valores para os parâmetros de procedimento armazenado, selecione Importar e insira os seguintes valores para os parâmetros:

      Nome Tipo Valor
      LastModifiedtime Datetime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    Captura de tela mostrando a configuração da atividade de procedimento armazenado.

Etapa 6: Execute o pipeline e monitore o resultado

Na barra superior, selecione Executar na guia Página Inicial . Em seguida, selecione Salvar e executar. O pipeline começa a ser executado e você pode monitorá-lo na guia Saída .

Captura de tela mostrando os resultados da execução do pipeline.

Vá para o Lakehouse e você descobrirá que o arquivo de dados está na pasta escolhida. Você pode selecionar o arquivo para visualizar os dados copiados.

Captura de tela mostrando os dados do Lakehouse para a primeira execução do pipeline.

Captura de tela mostrando a visualização dos dados do Lakehouse para a primeira execução do pipeline.

Adicionar mais dados para ver os resultados da cópia incremental

Depois de concluir a primeira execução de pipeline, vamos adicionar mais dados à tabela de origem do Data Warehouse para ver se esse pipeline pode copiar seus dados incrementais.

Etapa 1: Adicionar mais dados à origem

Insira novos dados no seu Data Warehouse executando a seguinte consulta:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

Os dados atualizados para a data_source_table são:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

Etapa 2: Acionar outra execução do pipeline e monitorar o resultado

Volte para a sua página do pipeline. Na barra superior, selecione Executar na guia Página Inicial novamente. O pipeline começa a ser executado e você pode monitorá-lo em Saída.

Vá para o Lakehouse e você descobrirá que o novo arquivo de dados copiado está na pasta escolhida. Você pode selecionar o arquivo para visualizar os dados copiados. Você verá seus dados incrementais exibidos neste arquivo.

Captura de tela mostrando os dados do Lakehouse para a segunda execução do pipeline.

Captura de tela mostrando a visualização dos dados do Lakehouse para a segunda execução do pipeline.

Em seguida, saiba mais sobre como copiar do Armazenamento de Blobs do Azure para o Lakehouse.