Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Neste tutorial, você aprenderá a copiar apenas dados novos ou alterados do Data Warehouse para um Lakehouse. Essa abordagem é chamada de carregamento incremental e é útil quando você deseja manter seus dados up-to-date sem copiar tudo todas as vezes.
Aqui está o design de alto nível da solução:
Escolha uma coluna de marca d'água. Escolha uma coluna na tabela de origem que ajude a controlar registros novos ou alterados. Esta coluna geralmente contém valores que aumentam quando as linhas são adicionadas ou atualizadas (como um carimbo de data/hora ou ID). Usaremos o valor mais alto nesta coluna como nossa "marca d'água" para sabermos onde paramos.
Configure uma tabela para armazenar seu último valor de marca de referência.
Crie um pipeline que faça o seguinte:
O fluxo de trabalho inclui estas atividades:
- Duas atividades de pesquisa. O primeiro obtém o último valor de marca d'água (onde paramos na última vez). O segundo obtém o novo valor da marca d'água (onde vamos parar desta vez). Ambos os valores são passados para a atividade de cópia.
- Uma atividade de cópia que localiza linhas em que o valor da coluna de marca d'água está entre as marcas d'água antigas e novas. Em seguida, ele copia esses dados do Data Warehouse para o Lakehouse como um novo arquivo.
- Atividade de procedimento armazenado que salva o novo valor da marca d'água, permitindo que a próxima execução do pipeline saiba por onde começar.
Pré-requisitos
- Data Warehouse. Você usará o Data Warehouse como seu armazenamento de dados de origem. Se você não tiver um, confira Criar um Data Warehouse para obter instruções.
-
Lakehouse Você usará o Lakehouse como seu repositório de dados de destino. Se você não tiver um, consulte Criar um Lakehouse para obter instruções.
- Crie uma pasta chamada IncrementalCopy para armazenar seus dados copiados.
Prepare sua fonte
Vamos configurar as tabelas e o procedimento armazenado necessários no Data Warehouse antes de configurar o pipeline de cópia incremental.
1. Criar uma tabela de dados de origem no seu Data Warehouse
Execute o seguinte comando SQL em seu Data Warehouse para criar uma tabela chamada data_source_table como sua tabela de origem. Usaremos isso como dados de exemplo para a cópia incremental.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
Os dados na tabela de origem têm esta aparência:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
Neste tutorial, usaremos LastModifytime como a coluna de marca d'água.
2. Preparar outra tabela no seu Data Warehouse para armazenar o último valor de marca d'água
Execute o comando SQL a seguir no seu Data Warehouse para criar uma tabela chamada watermarktable para armazenar o último valor de marca d'água:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );Defina o valor padrão inicial da última marca d'água utilizando o nome da sua tabela de origem. Neste tutorial, o nome da tabela é data_source_table e definiremos o valor padrão como
1/1/2010 12:00:00 AM.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')Verifique os dados em sua watermarktable.
Select * from watermarktableSaída:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Criar um procedimento armazenado no seu Data Warehouse
Execute o comando a seguir para criar um procedimento armazenado no seu Data Warehouse. Esse procedimento armazenado atualiza o último valor da marca d'água após cada execução do pipeline.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Configurar um pipeline para a cópia incremental
Etapa 1: Criar um pipeline
Vá para o Power BI.
Selecione o ícone do Power BI na parte inferior esquerda da tela e selecione Fabric.
Selecione Meu workspace para abrir o workspace do Fabric.
Selecione + Novo Item e, em seguida, selecione Pipeline e, em seguida, insira um nome de pipeline para criar um novo pipeline.
Etapa 2: Adicionar uma atividade de pesquisa para a última marca d'água
Nesta etapa, você criará uma atividade de consulta para obter o último valor de marca d'água. Obteremos o valor 1/1/2010 12:00:00 AM padrão que definimos anteriormente.
Selecione a atividade pipeline e selecione Pesquisa na lista suspensa.
Na guia Geral , renomeie essa atividade como LookupOldWaterMarkActivity.
Na guia Configurações , configure o seguinte:
- Conexão: Em Warehouse, selecione Procurar todos e selecione seu data warehouse na lista.
- Usar consulta: escolha Tabela.
- Tabela: escolha dbo.watermarktable.
- Somente a primeira linha: selecionada.
Etapa 3: Adicionar uma atividade de pesquisa para a marca d'água nova
Nesta etapa, você criará uma atividade de consulta para obter o novo valor de referência. Você usará uma consulta para obter a nova marca d'água da tabela de dados de origem. Obteremos o valor mais alto na coluna LastModifytime de data_source_table.
Na barra superior, selecione Pesquisa na guia Atividades para adicionar a segunda atividade de pesquisa.
Na guia Geral , renomeie essa atividade como LookupNewWaterMarkActivity.
Na guia Configurações , configure o seguinte:
Conexão: em Armazém de Dados, selecione Pesquisar todos e selecione seu data warehouse na lista ou selecione seu data warehouse nas conexões de itens do Fabric.
Usar consulta: escolha Consulta.
Consulta: insira a consulta a seguir para escolher o valor máximo da hora da última modificação como a nova marca d'água:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_tableSomente a primeira linha: selecionada.
Etapa 4: Adicionar a atividade de cópia para copiar dados incrementais
Nesta etapa, você adicionará uma atividade de cópia para transferir os dados incrementais entre a última marca temporal e a nova marca temporal do seu Data Warehouse para o seu Lakehouse.
Selecione Atividades na barra superior e selecione Copiar dados ->Adicionar à tela para obter a atividade de cópia.
Na guia Geral , renomeie essa atividade como IncrementalCopyActivity.
Conecte ambas as atividades de pesquisa à atividade de cópia. Faça isso arrastando o botão verde (Em caso de sucesso) anexado às atividades de pesquisa para a atividade de cópia. Solte o botão do mouse quando a cor da borda da atividade de cópia ficar verde.
Na guia Origem , configure o seguinte:
Conexão: em Armazém de Dados, selecione Pesquisar todos e selecione seu data warehouse na lista ou selecione seu data warehouse nas conexões de itens do Fabric.
Warehouse: selecione seu armazém.
Usar consulta: escolha Consulta.
Consulta: Insira a consulta a seguir para copiar dados incrementais entre a marca d'água anterior e a nova marca d'água.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Na guia Destino , configure o seguinte:
- Conexão: Em Lakehouse, selecione Navegar todos e escolha seu lakehouse na lista ou selecione seu lakehouse nas conexões de item do Fabric.
- Lakehouse: selecione seu Lakehouse.
- Pasta raiz: escolha Arquivos.
-
Caminho do arquivo: escolha a pasta na qual deseja armazenar os dados copiados. Selecione Procurar para selecionar sua pasta. No que se refere ao nome do arquivo, abra Adicionar conteúdo dinâmico e digite
@CONCAT('Incremental-', pipeline().RunId, '.txt')na janela que foi aberta para criar nomes de arquivos para o seu arquivo de dados copiados no Lakehouse. - Formato do arquivo: selecione o tipo de formato dos seus dados.
Etapa 5: Adicionar uma atividade de procedimento armazenado
Nesta etapa, você adicionará uma atividade de procedimento armazenado para atualizar o último valor de marcador para a próxima execução do pipeline.
Selecione Atividades na barra superior e selecione Procedimento armazenado para adicionar uma atividade de procedimento armazenado.
Na guia Geral , renomeie essa atividade como StoredProceduretoWriteWatermarkActivity.
Conecte o resultado verde (Concluída com sucesso) da atividade de cópia à atividade de procedimento armazenado.
Na guia Configurações , configure o seguinte:
Data Warehouse: selecione seu Data Warehouse.
Nome do procedimento armazenado: escolha o procedimento armazenado que você criou em seu Data Warehouse: [dbo].[ usp_write_watermark].
Expanda Parâmetros de procedimento armazenado. Para definir valores para os parâmetros de procedimento armazenado, selecione Importar e insira os seguintes valores para os parâmetros:
Nome Tipo Valor LastModifiedtime Datetime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Etapa 6: Execute o pipeline e monitore o resultado
Na barra superior, selecione Executar na guia Página Inicial . Em seguida, selecione Salvar e executar. O pipeline começa a ser executado e você pode monitorá-lo na guia Saída .
Vá para o Lakehouse e você descobrirá que o arquivo de dados está na pasta escolhida. Você pode selecionar o arquivo para visualizar os dados copiados.
Adicionar mais dados para ver os resultados da cópia incremental
Depois de concluir a primeira execução de pipeline, vamos adicionar mais dados à tabela de origem do Data Warehouse para ver se esse pipeline pode copiar seus dados incrementais.
Etapa 1: Adicionar mais dados à origem
Insira novos dados no seu Data Warehouse executando a seguinte consulta:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Os dados atualizados para a data_source_table são:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Etapa 2: Acionar outra execução do pipeline e monitorar o resultado
Volte para a sua página do pipeline. Na barra superior, selecione Executar na guia Página Inicial novamente. O pipeline começa a ser executado e você pode monitorá-lo em Saída.
Vá para o Lakehouse e você descobrirá que o novo arquivo de dados copiado está na pasta escolhida. Você pode selecionar o arquivo para visualizar os dados copiados. Você verá seus dados incrementais exibidos neste arquivo.
Conteúdo relacionado
Em seguida, saiba mais sobre como copiar do Armazenamento de Blobs do Azure para o Lakehouse.