Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Neste tutorial, você aprenderá a copiar apenas dados novos ou alterados do seu Data Warehouse para um Lakehouse. Essa abordagem é chamada de carregamento incremental e é útil quando você deseja manter seus dados atualizados sem copiar tudo up-tocada vez.
Aqui está o design de alto nível da solução:
Escolha uma coluna de marca d'água. Escolha uma coluna na tabela de origem que ajude a controlar registros novos ou alterados. Essa coluna geralmente contém valores que aumentam quando linhas são adicionadas ou atualizadas (como um carimbo de data/hora ou ID). Usaremos o valor mais alto nesta coluna como nossa "marca d'água" para identificar o ponto onde ficámos.
Configure uma tabela para armazenar o último valor da marca d'água.
Crie um pipeline que faça o seguinte:
O pipeline inclui estas atividades:
- Duas atividades de consulta. O primeiro obtém o último valor de marca d'água (onde paramos da última vez). O segundo recebe o novo valor de marca d'água (onde pararemos desta vez). Ambos os valores são passados para a atividade de cópia.
- Uma atividade de cópia que localiza linhas em que o valor da coluna de marca d'água está entre as marcas d'água antigas e novas. Em seguida, ele copia esses dados do seu Data Warehouse para o Lakehouse como um novo arquivo.
- Uma atividade de procedimento armazenado que guarda o novo valor da marca d'água, para que a próxima operação do pipeline saiba por onde começar.
Pré-requisitos
- Armazém de Dados. Você usará o Data Warehouse como seu armazenamento de dados de origem. Se você não tiver um, confira Criar um Data Warehouse para obter instruções.
-
Casa do lago. Você usará o Lakehouse como seu armazenamento de dados de destino. Se você não tiver um, consulte Criar um Lakehouse para obter instruções.
- Crie uma pasta chamada IncrementalCopy para armazenar os dados copiados.
Prepare a sua fonte
Vamos configurar as tabelas e o procedimento armazenado de que você precisa em seu Data Warehouse antes de configurar o pipeline de cópia incremental.
1. Crie uma tabela de fonte de dados no seu Data Warehouse
Execute o seguinte comando SQL no Data Warehouse para criar uma tabela chamada data_source_table como tabela de origem. Usaremos isso como dados de exemplo para a cópia incremental.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
Os dados na tabela de origem têm esta aparência:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
Neste tutorial, usaremos LastModifytime como a coluna de marca d'água.
2. Crie outra tabela no seu Data Warehouse para armazenar o último valor do watermark
Execute o seguinte comando SQL no seu Data Warehouse para criar uma tabela chamada watermarktable para armazenar o último valor da marca d'água:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );Defina o valor padrão da última marca de água usando o nome da tabela de origem. Neste tutorial, o nome da tabela é data_source_table e definiremos o valor padrão como
1/1/2010 12:00:00 AM.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')Verifique os dados na sua tabela de marca d'água.
Select * from watermarktableSaída:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Crie um procedimento armazenado no seu Data Warehouse
Execute o seguinte comando para criar um procedimento armazenado no seu Data Warehouse. Este procedimento armazenado atualiza o último valor de marca d'água após cada execução de pipeline.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Configurar um pipeline para cópia incremental
Etapa 1: Criar um pipeline
Vá para Power BI.
Selecione o ícone do Power BI no canto inferior esquerdo da tela e selecione Malha.
Selecione Meu espaço de trabalho para abrir o seu espaço de trabalho no Fabric.
Selecione + Novo Item, selecione Pipeline e insira um nome de pipeline para criar um novo pipeline.
Etapa 2: adicionar uma atividade de pesquisa para a última marca d'água
Nesta etapa, irá criar uma atividade de pesquisa para obter o último valor da marca d'água. Obteremos o valor 1/1/2010 12:00:00 AM padrão que definimos anteriormente.
Selecione Atividade do pipeline e selecione Pesquisa na lista suspensa.
Na guia Geral, renomeie esta atividade para LookupOldWaterMarkActivity.
Na guia Configurações , configure o seguinte:
- Conexão: em Armazém, selecione Procurar tudo e selecione o seu armazém de dados da lista.
- Usar consulta: Escolher Tabela.
- Tabela: Escolha dbo.watermarktable.
- Primeira linha apenas: Seleccionado.
Etapa 3: Adicionar uma atividade de consulta para a nova marca d'água
Nesta etapa, vais criar uma atividade de consulta para obter o novo valor da referência. Você usará uma consulta para obter a nova marca d'água da tabela de dados de origem. Obteremos o valor mais alto na coluna LastModifytime de data_source_table.
Na barra superior, selecione Pesquisa na guia Atividades para adicionar a segunda atividade de pesquisa.
Na guia Geral, renomeie esta atividade para LookupNewWaterMarkActivity.
Na guia Configurações , configure o seguinte:
Conexão: Em Depósito, selecione Procurar tudo e selecione o seu armazém de dados na lista ou selecione o seu armazém de dados em ligações de itens do Fabric.
Usar consulta: escolha Consulta.
Consulta: insira a seguinte consulta para escolher o tempo máximo da última modificação como a nova marca d'água:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_tablePrimeira linha apenas: Seleccionado.
Etapa 4: Adicionar a atividade de cópia para copiar dados incrementais
Nesta etapa, você adicionará uma atividade de cópia para copiar os dados incrementais entre a última marca d'água e a nova marca d'água do seu Data Warehouse para o Lakehouse.
Selecione Atividades na barra superior e selecione Copiar dados -> Adicionar à tela para obter a atividade de cópia.
Na guia Geral , renomeie essa atividade para IncrementalCopyActivity.
Conecte ambas as atividades de pesquisa à atividade de cópia arrastando o botão verde (Em êxito) anexado às atividades de pesquisa para a atividade de cópia. Solte o botão do mouse quando vir que a cor da borda da atividade de cópia muda para verde.
Na guia Origem , configure o seguinte:
Conexão: Em Depósito, selecione Procurar tudo e selecione o seu armazém de dados na lista ou selecione o seu armazém de dados em ligações de itens do Fabric.
Armazém: Selecione o seu armazém.
Usar consulta: escolha Consulta.
Consulta: Insira a seguinte consulta para copiar dados incrementais entre a Última Marca Temporal e a Nova Marca Temporal.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Na guia Destino , configure o seguinte:
- Conexão: Em Lakehouse, selecione Procurar tudo, e selecione a sua lakehouse na lista ou selecione a sua lakehouse em Conexões de itens de fábrica.
- Lakehouse: Selecione sua Lakehouse.
- Pasta raiz: Escolha Ficheiros.
-
Caminho do arquivo: escolha a pasta onde deseja armazenar os dados copiados. Selecione Procurar para selecionar sua pasta. Para o nome do arquivo, abra Adicionar conteúdo dinâmico e digite
@CONCAT('Incremental-', pipeline().RunId, '.txt')na janela aberta para criar nomes de arquivo para o arquivo de dados copiado no Lakehouse. - Formato de arquivo: Selecione o tipo de formato dos seus dados.
Etapa 5: Adicionar uma atividade de procedimento armazenado
Nesta etapa, irá adicionar uma atividade de stored procedure para atualizar o último valor de watermark para a próxima execução de pipeline.
Selecione Atividades na barra superior e selecione Procedimento armazenado para adicionar uma atividade de procedimento armazenado.
Na guia Geral , renomeie essa atividade para StoredProceduretoWriteWatermarkActivity.
Conecte a saída verde (Com êxito) da atividade de cópia à atividade de procedimento armazenado.
Na guia Configurações , configure o seguinte:
Data Warehouse: Selecione o seu Data Warehouse.
Nome do procedimento armazenado: escolha o procedimento armazenado que você criou no seu Data Warehouse: [dbo].[ usp_write_watermark].
Expanda os parâmetros do procedimento guardado. Para definir valores para os parâmetros do procedimento armazenado, selecione Importar e insira os seguintes valores para os parâmetros:
Nome Tipo valor ÚltimaHoraDeModificação DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} NomeDaTabela String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Etapa 6: Executar o pipeline e monitorizar o resultado
Na barra superior, selecione Executar na guia Página Inicial . Em seguida, selecione Salvar e executar. O pipeline começa a ser executado e você pode monitorá-lo na guia Saída .
Vá para o seu Lakehouse e irá encontrar o arquivo de dados na pasta que escolheu. Você pode selecionar o arquivo para visualizar os dados copiados.
Adicione mais dados para ver os resultados das cópias incrementais
Depois de concluir a primeira execução do pipeline, vamos adicionar mais dados à tabela de origem do Data Warehouse para ver se esse pipeline pode copiar seus dados incrementais.
Etapa 1: adicionar mais dados à fonte
Insira novos dados no seu Data Warehouse executando a seguinte consulta:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Os dados atualizados para data_source_table são:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Etapa 2: Acionar outra execução de pipeline e monitorar o resultado
Regresse à sua página de pipeline. Na barra de ferramentas superior, novamente selecione Executar no separador Página Inicial. O pipeline começa a ser executado e você pode monitorá-lo em Saída.
Vá para o seu Lakehouse, e você vai encontrar o novo arquivo de dados copiado está sob a pasta que você escolheu. Você pode selecionar o arquivo para visualizar os dados copiados. Você verá que seus dados incrementais aparecem neste arquivo.
Conteúdos relacionados
Em seguida, saiba mais sobre como copiar do Armazenamento de Blobs do Azure para o Lakehouse.