Partilhar via


Carregue dados incrementalmente do Data Warehouse para o Lakehouse

Neste tutorial, você aprenderá a copiar apenas dados novos ou alterados do seu Data Warehouse para um Lakehouse. Essa abordagem é chamada de carregamento incremental e é útil quando você deseja manter seus dados atualizados sem copiar tudo up-tocada vez.

Aqui está o design de alto nível da solução:

Diagrama mostrando a lógica de dados de carga incremental.

  1. Escolha uma coluna de marca d'água. Escolha uma coluna na tabela de origem que ajude a controlar registros novos ou alterados. Essa coluna geralmente contém valores que aumentam quando linhas são adicionadas ou atualizadas (como um carimbo de data/hora ou ID). Usaremos o valor mais alto nesta coluna como nossa "marca d'água" para identificar o ponto onde ficámos.

  2. Configure uma tabela para armazenar o último valor da marca d'água.

  3. Crie um pipeline que faça o seguinte:

    O pipeline inclui estas atividades:

    • Duas atividades de consulta. O primeiro obtém o último valor de marca d'água (onde paramos da última vez). O segundo recebe o novo valor de marca d'água (onde pararemos desta vez). Ambos os valores são passados para a atividade de cópia.
    • Uma atividade de cópia que localiza linhas em que o valor da coluna de marca d'água está entre as marcas d'água antigas e novas. Em seguida, ele copia esses dados do seu Data Warehouse para o Lakehouse como um novo arquivo.
    • Uma atividade de procedimento armazenado que guarda o novo valor da marca d'água, para que a próxima operação do pipeline saiba por onde começar.

Pré-requisitos

  • Armazém de Dados. Você usará o Data Warehouse como seu armazenamento de dados de origem. Se você não tiver um, confira Criar um Data Warehouse para obter instruções.
  • Casa do lago. Você usará o Lakehouse como seu armazenamento de dados de destino. Se você não tiver um, consulte Criar um Lakehouse para obter instruções.
    • Crie uma pasta chamada IncrementalCopy para armazenar os dados copiados.

Prepare a sua fonte

Vamos configurar as tabelas e o procedimento armazenado de que você precisa em seu Data Warehouse antes de configurar o pipeline de cópia incremental.

1. Crie uma tabela de fonte de dados no seu Data Warehouse

Execute o seguinte comando SQL no Data Warehouse para criar uma tabela chamada data_source_table como tabela de origem. Usaremos isso como dados de exemplo para a cópia incremental.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

Os dados na tabela de origem têm esta aparência:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

Neste tutorial, usaremos LastModifytime como a coluna de marca d'água.

2. Crie outra tabela no seu Data Warehouse para armazenar o último valor do watermark

  1. Execute o seguinte comando SQL no seu Data Warehouse para criar uma tabela chamada watermarktable para armazenar o último valor da marca d'água:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. Defina o valor padrão da última marca de água usando o nome da tabela de origem. Neste tutorial, o nome da tabela é data_source_table e definiremos o valor padrão como 1/1/2010 12:00:00 AM.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Verifique os dados na sua tabela de marca d'água.

    Select * from watermarktable
    

    Saída:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Crie um procedimento armazenado no seu Data Warehouse

Execute o seguinte comando para criar um procedimento armazenado no seu Data Warehouse. Este procedimento armazenado atualiza o último valor de marca d'água após cada execução de pipeline.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Configurar um pipeline para cópia incremental

Etapa 1: Criar um pipeline

  1. Vá para Power BI.

  2. Selecione o ícone do Power BI no canto inferior esquerdo da tela e selecione Malha.

  3. Selecione Meu espaço de trabalho para abrir o seu espaço de trabalho no Fabric.

  4. Selecione + Novo Item, selecione Pipeline e insira um nome de pipeline para criar um novo pipeline.

    Captura de tela mostrando o novo botão de pipeline no espaço de trabalho recém-criado.

    Captura de tela mostrando o nome da criação de um novo pipeline.

Etapa 2: adicionar uma atividade de pesquisa para a última marca d'água

Nesta etapa, irá criar uma atividade de pesquisa para obter o último valor da marca d'água. Obteremos o valor 1/1/2010 12:00:00 AM padrão que definimos anteriormente.

  1. Selecione Atividade do pipeline e selecione Pesquisa na lista suspensa.

  2. Na guia Geral, renomeie esta atividade para LookupOldWaterMarkActivity.

  3. Na guia Configurações , configure o seguinte:

    • Conexão: em Armazém, selecione Procurar tudo e selecione o seu armazém de dados da lista.
    • Usar consulta: Escolher Tabela.
    • Tabela: Escolha dbo.watermarktable.
    • Primeira linha apenas: Seleccionado.

    Captura de ecrã mostrando a marca d'água antiga da consulta.

Etapa 3: Adicionar uma atividade de consulta para a nova marca d'água

Nesta etapa, vais criar uma atividade de consulta para obter o novo valor da referência. Você usará uma consulta para obter a nova marca d'água da tabela de dados de origem. Obteremos o valor mais alto na coluna LastModifytime de data_source_table.

  1. Na barra superior, selecione Pesquisa na guia Atividades para adicionar a segunda atividade de pesquisa.

  2. Na guia Geral, renomeie esta atividade para LookupNewWaterMarkActivity.

  3. Na guia Configurações , configure o seguinte:

    • Conexão: Em Depósito, selecione Procurar tudo e selecione o seu armazém de dados na lista ou selecione o seu armazém de dados em ligações de itens do Fabric.

    • Usar consulta: escolha Consulta.

    • Consulta: insira a seguinte consulta para escolher o tempo máximo da última modificação como a nova marca d'água:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • Primeira linha apenas: Seleccionado.

    Captura de ecrã mostrando a nova marca d'água de consulta.

Etapa 4: Adicionar a atividade de cópia para copiar dados incrementais

Nesta etapa, você adicionará uma atividade de cópia para copiar os dados incrementais entre a última marca d'água e a nova marca d'água do seu Data Warehouse para o Lakehouse.

  1. Selecione Atividades na barra superior e selecione Copiar dados -> Adicionar à tela para obter a atividade de cópia.

  2. Na guia Geral , renomeie essa atividade para IncrementalCopyActivity.

  3. Conecte ambas as atividades de pesquisa à atividade de cópia arrastando o botão verde (Em êxito) anexado às atividades de pesquisa para a atividade de cópia. Solte o botão do mouse quando vir que a cor da borda da atividade de cópia muda para verde.

    Captura de ecrã a mostrar atividades de pesquisa e de cópia conectadas.

  4. Na guia Origem , configure o seguinte:

    • Conexão: Em Depósito, selecione Procurar tudo e selecione o seu armazém de dados na lista ou selecione o seu armazém de dados em ligações de itens do Fabric.

    • Armazém: Selecione o seu armazém.

    • Usar consulta: escolha Consulta.

    • Consulta: Insira a seguinte consulta para copiar dados incrementais entre a Última Marca Temporal e a Nova Marca Temporal.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    Captura de tela mostrando a configuração da fonte de cópia.

  5. Na guia Destino , configure o seguinte:

    • Conexão: Em Lakehouse, selecione Procurar tudo, e selecione a sua lakehouse na lista ou selecione a sua lakehouse em Conexões de itens de fábrica.
    • Lakehouse: Selecione sua Lakehouse.
    • Pasta raiz: Escolha Ficheiros.
    • Caminho do arquivo: escolha a pasta onde deseja armazenar os dados copiados. Selecione Procurar para selecionar sua pasta. Para o nome do arquivo, abra Adicionar conteúdo dinâmico e digite @CONCAT('Incremental-', pipeline().RunId, '.txt') na janela aberta para criar nomes de arquivo para o arquivo de dados copiado no Lakehouse.
    • Formato de arquivo: Selecione o tipo de formato dos seus dados.

    Captura de tela mostrando a configuração de destino da cópia.

Etapa 5: Adicionar uma atividade de procedimento armazenado

Nesta etapa, irá adicionar uma atividade de stored procedure para atualizar o último valor de watermark para a próxima execução de pipeline.

  1. Selecione Atividades na barra superior e selecione Procedimento armazenado para adicionar uma atividade de procedimento armazenado.

  2. Na guia Geral , renomeie essa atividade para StoredProceduretoWriteWatermarkActivity.

  3. Conecte a saída verde (Com êxito) da atividade de cópia à atividade de procedimento armazenado.

  4. Na guia Configurações , configure o seguinte:

    • Data Warehouse: Selecione o seu Data Warehouse.

    • Nome do procedimento armazenado: escolha o procedimento armazenado que você criou no seu Data Warehouse: [dbo].[ usp_write_watermark].

    • Expanda os parâmetros do procedimento guardado. Para definir valores para os parâmetros do procedimento armazenado, selecione Importar e insira os seguintes valores para os parâmetros:

      Nome Tipo valor
      ÚltimaHoraDeModificação DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      NomeDaTabela String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    Captura de tela mostrando a configuração da atividade do procedimento armazenado.

Etapa 6: Executar o pipeline e monitorizar o resultado

Na barra superior, selecione Executar na guia Página Inicial . Em seguida, selecione Salvar e executar. O pipeline começa a ser executado e você pode monitorá-lo na guia Saída .

Captura de tela mostrando os resultados da execução do pipeline.

Vá para o seu Lakehouse e irá encontrar o arquivo de dados na pasta que escolheu. Você pode selecionar o arquivo para visualizar os dados copiados.

Captura de tela mostrando os dados do lakehouse para a primeira execução do pipeline.

Captura de ecrã mostrando a visualização de dados do lakehouse para a primeira execução do pipeline.

Adicione mais dados para ver os resultados das cópias incrementais

Depois de concluir a primeira execução do pipeline, vamos adicionar mais dados à tabela de origem do Data Warehouse para ver se esse pipeline pode copiar seus dados incrementais.

Etapa 1: adicionar mais dados à fonte

Insira novos dados no seu Data Warehouse executando a seguinte consulta:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

Os dados atualizados para data_source_table são:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

Etapa 2: Acionar outra execução de pipeline e monitorar o resultado

Regresse à sua página de pipeline. Na barra de ferramentas superior, novamente selecione Executar no separador Página Inicial. O pipeline começa a ser executado e você pode monitorá-lo em Saída.

Vá para o seu Lakehouse, e você vai encontrar o novo arquivo de dados copiado está sob a pasta que você escolheu. Você pode selecionar o arquivo para visualizar os dados copiados. Você verá que seus dados incrementais aparecem neste arquivo.

Captura de ecrã mostrando os dados do lakehouse para a segunda execução do pipeline.

Captura de ecrã mostrando a visualização de dados do lakehouse para a segunda execução do pipeline.

Em seguida, saiba mais sobre como copiar do Armazenamento de Blobs do Azure para o Lakehouse.