Partilhar via


Carregue dados incrementalmente do Banco de Dados SQL do Azure para o armazenamento de Blob do Azure usando o portal do Azure

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!

Neste tutorial, você cria um Azure Data Factory com um pipeline que carrega dados delta de uma tabela no Banco de Dados SQL do Azure para o armazenamento de Blob do Azure.

Vai executar os seguintes passos neste tutorial:

  • Preparar o arquivo de dados para armazenar o valor de limite de tamanho.
  • Criar uma fábrica de dados.
  • Criar serviços ligados.
  • Crie conjuntos de dados de origem, de sink e de marca d'água.
  • Criar um pipeline.
  • Executar o pipeline.
  • Monitorizar a execução do pipeline.
  • Rever resultados
  • Adicionar mais dados à origem.
  • Executar o pipeline novamente.
  • Monitorizar a segunda execução do pipeline
  • Rever os resultados da segunda execução

Descrição geral

Eis o diagrama de nível elevado da solução:

Carregar dados incrementalmente

Eis os passos importantes para criar esta solução:

  1. Selecione a coluna de marca d'água. Selecione uma coluna no arquivo de dados de origem, que pode ser utilizada para dividir os registos novos ou atualizados para cada execução. Normalmente, os dados nesta coluna selecionada (por exemplo, last_modify_time ou ID) continuam a aumentar quando as linhas são criadas ou atualizadas. O valor máximo nesta coluna é utilizado como limite de tamanho.

  2. Prepare um armazenamento de dados para armazenar o valor da marca d'água. Neste tutorial, vai armazenar o valor de marca d'água numa base de dados SQL.

  3. Crie um pipeline com o seguinte fluxo de trabalho:

    O pipeline nesta solução tem as seguintes atividades:

    • Crie duas atividades de Pesquisa. Utilize a primeira atividade Pesquisa para obter o último valor de limite de tamanho. Utilize a segunda para obter o valor de limite de tamanho novo. Estes valores de limite de tamanho são transmitidos para a atividade Copy.
    • Criar uma atividade Cópia que copia linhas do arquivo de dados de origem com o valor da coluna de limite de tamanho superior ao valor de limite de tamanho antigo e inferior ao valor novo. Em seguida, copia os dados delta do arquivo de dados de origem para um armazenamento de Blobs como um ficheiro novo.
    • Crie uma atividade StoredProcedure, que atualiza o valor de marca d'água do pipeline que vai ser executado da próxima vez.

Se você não tiver uma assinatura do Azure, crie uma conta gratuita antes de começar.

Pré-requisitos

  • Banco de Dados SQL do Azure. Vai utilizar a base de dados como o arquivo de dados de origem. Se você não tiver um banco de dados no Banco de Dados SQL do Azure, consulte Criar um banco de dados no Banco de Dados SQL do Azure para conhecer as etapas para criar um.
  • Armazenamento do Azure. Vai utilizar o armazenamento de blobs como arquivo de dados de sink. Se você não tiver uma conta de armazenamento, consulte Criar uma conta de armazenamento para conhecer as etapas para criar uma. Crie um contentor com o nome adftutorial.

Criar uma tabela de origem de dados na base de dados SQL

  1. Abra o SQL Server Management Studio. No Gerenciador de Servidores, clique com o botão direito do mouse no banco de dados e escolha Nova Consulta.

  2. Execute o seguinte comando SQL na base de dados SQL para criar uma tabela com o nome data_source_table e armazenar o valor de limite de tamaho:

    create table data_source_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    INSERT INTO data_source_table
        (PersonID, Name, LastModifytime)
    VALUES
        (1, 'aaaa','9/1/2017 12:56:00 AM'),
        (2, 'bbbb','9/2/2017 5:23:00 AM'),
        (3, 'cccc','9/3/2017 2:36:00 AM'),
        (4, 'dddd','9/4/2017 3:21:00 AM'),
        (5, 'eeee','9/5/2017 8:06:00 AM');
    

    Neste tutorial, vai utilizar LastModifytime como a coluna de limite de tamanho. Os dados no arquivo da origem de dados são apresentados na tabela seguinte:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1        | aaaa | 2017-09-01 00:56:00.000
    2        | bbbb | 2017-09-02 05:23:00.000
    3        | cccc | 2017-09-03 02:36:00.000
    4        | dddd | 2017-09-04 03:21:00.000
    5        | eeee | 2017-09-05 08:06:00.000
    

Criar outra tabela na base de dados SQL para armazenar o valor de limite superior de tamanho

  1. Execute o comando SQL seguinte na base de dados SQL para criar uma tabela com o nome watermarktable e armazenar o valor de limite de tamanho:

    create table watermarktable
    (
    
    TableName varchar(255),
    WatermarkValue datetime,
    );
    
  2. Defina o valor predefinido do limite superior de tamanho com o nome da tabela do arquivo de dados de origem. Neste tutorial, o nome da tabela é data_source_table.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Reveja os dados na tabela watermarktable.

    Select * from watermarktable
    

    Saída:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

Criar um procedimento armazenado na base de dados SQL

Execute o comando seguinte para criar um procedimento armazenado na base de dados SQL:

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Criar uma fábrica de dados

  1. Inicie o navegador da Web Microsoft Edge ou Google Chrome . Atualmente, a IU do Data Factory é suportada apenas nos browsers Microsoft Edge e Google Chrome.

  2. No menu superior, selecione Criar um recurso>Analytics>Data Factory :

    Seleção de Data Factory no painel

  3. Na página Novo data factory, introduza ADFIncCopyTutorialDF para o nome.

    O nome do Azure Data Factory deve ser globalmente exclusivo. Se vir um ponto de exclamação vermelho com o seguinte erro, altere o nome da fábrica de dados (por exemplo, oseunomeADFIncCopyTutorialDF) e tente criá-la novamente. Consulte o artigo Data Factory - Regras de nomenclatura para obter regras de nomenclatura para artefatos do Data Factory.

    O nome da fábrica de dados "ADFIncCopyTutorialDF" não está disponível

  4. Selecione sua assinatura do Azure na qual você deseja criar o data factory.

  5. Para o Grupo de Recursos, execute uma das seguintes etapas:

  6. Selecione V2 para a versão.

  7. Selecione a localização para a fábrica de dados. Só aparecem na lista pendente as localizações que são suportadas. Os armazenamentos de dados (Armazenamento do Azure, Banco de Dados SQL do Azure, Instância Gerenciada SQL do Azure e assim por diante) e os cálculos (HDInsight, etc.) usados pelo data factory podem estar em outras regiões.

  8. Clique em Criar.

  9. Após a conclusão da criação, você verá a página Data Factory , conforme mostrado na imagem.

    Página inicial do Azure Data Factory, com o tile Open Azure Data Factory Studio.

  10. Selecione Abrir no bloco Abrir o Estúdio do Azure Data Factory para iniciar a interface do usuário (UI) do Azure Data Factory em uma guia separada.

Criar um pipeline

Neste tutorial, vai criar um pipeline com duas atividades de Pesquisa, uma atividade de Cópia e uma atividade StoredProcedure encadeadas num pipeline.

  1. Na página inicial da interface do usuário do Data Factory, clique no bloco Orquestrar .

    Captura de tela que mostra a página inicial do data factory com o botão Orchestrate realçado.

  2. No painel Geral, em Propriedades, especifique IncrementalCopyPipeline para Name. Em seguida, feche o painel clicando no ícone Propriedades no canto superior direito.

  3. Vamos adicionar a primeira atividade Lookup para obter o valor de limite de tamanho antigo. Na caixa de ferramentas Atividades, desenvolva Geral e arraste e solte a atividade Pesquisa na interface de designer de pipeline. Altere o nome da atividade para LookupOldWaterMarkActivity.

    Primeira atividade de pesquisa - nome

  4. Alterne para a guia Configurações e clique em + Novo para Conjunto de Dados de Origem. Nesta etapa, você cria um conjunto de dados para representar dados na tabela de marca d'água. Esta tabela contém o limite de tamanho antigo que foi utilizado na operação de cópia anterior.

  5. Na janela Novo Conjunto de Dados , selecione Banco de Dados SQL do Azure e clique em Continuar. Você verá uma nova janela aberta para o conjunto de dados.

  6. Na janela Definir propriedades do conjunto de dados, insira WatermarkDataset para Name.

  7. Para Serviço Vinculado, selecione Novo e siga as seguintes etapas:

    1. Insira AzureSqlDatabaseLinkedService para Name.

    2. Selecione o servidor para Nome do servidor.

    3. Selecione o nome do banco de dados na lista suspensa.

    4. Introduza o seu Nome de utilizador & Palavra-passe.

    5. Para testar a conexão com o banco de dados SQL, clique em Testar conexão.

    6. Clique em Concluir.

    7. Confirme se AzureSqlDatabaseLinkedService está selecionado para Serviço vinculado.

      Nova janela de serviço vinculado

    8. Selecione Concluir.

  8. Na guia Conexão, selecione [dbo].[watermarktable] para Tabela. Se pretender pré-visualizar dados na tabela, clique em Pré-visualizar dados.

    Conjunto de dados de marca d'água - configurações de conexão

  9. Clique no separador do pipeline, na parte superior, ou clique no nome do pipeline na vista de árvore, do lado esquerdo, para mudar para o editor do pipeline. Na janela de propriedades da atividade Pesquisa, confirme se WatermarkDataset está selecionado para o campo Conjunto de Dados de Origem .

  10. Na caixa de ferramentas Atividades, expanda Geral, arraste e largue outra atividade Pesquisa na superfície do designer de pipeline e, na guia Geral da janela de propriedades, defina o nome como LookupNewWaterMarkActivity. Esta atividade Lookup obtém o valor de limite de tamanho antigo da tabela com a origem de dados que vai ser copiada para o destino.

  11. Na janela de propriedades da segunda atividade de Pesquisa , alterne para a guia Configurações e clique em Novo. Crie um conjunto de dados que aponte para a tabela de origem que contém o valor de limite de tamanho novo (valor máximo de LastModifyTime).

  12. Na janela Novo Conjunto de Dados , selecione Banco de Dados SQL do Azure e clique em Continuar.

  13. Na janela Definir propriedades , digite SourceDataset para Name. Selecione AzureSqlDatabaseLinkedService para serviço vinculado.

  14. Selecione [dbo].[data_source_table] para Tabela. Vai especificar uma consulta neste conjunto de dados mais adiante no tutorial. A consulta tem precedência sobre a tabela a que especificar neste passo.

  15. Selecione Concluir.

  16. Clique no separador do pipeline, na parte superior, ou clique no nome do pipeline na vista de árvore, do lado esquerdo, para mudar para o editor do pipeline. Na janela de propriedades da atividade Pesquisa, confirme se SourceDataset está selecionado para o campo Conjunto de Dados de Origem .

  17. Selecione Consulta para o campo Usar consulta e insira a seguinte consulta: você está selecionando apenas o valor máximo de LastModifytime no data_source_table. Por favor, certifique-se de que também verificou apenas a primeira linha.

    select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
    

    Segunda atividade de pesquisa - consulta

  18. Na caixa de ferramentas Atividades, expanda Mover & Transformar e arraste e solte a atividade Copiar da mesma caixa de ferramentas, e defina o nome como IncrementalCopyActivity.

  19. Conecte ambas as atividades de Pesquisa à atividade Copiar arrastando o botão verde anexado às atividades de Pesquisa para a atividade Copiar. Largue o botão do rato quando vir que a cor do limite da atividade Copy muda para azul.

    Atividades de Pesquisa de Conexão para a atividade de Cópia

  20. Selecione a atividade Copiar e confirme que vê as propriedades da atividade na janela Propriedades.

  21. Alterne para a guia Origem na janela Propriedades e execute as seguintes etapas:

    1. Selecione SourceDataset para o campo Source Dataset .

    2. Selecione Consulta para o campo Usar consulta .

    3. Insira a seguinte consulta SQL para o campo Consulta .

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

      Atividade de cópia - fonte

  22. Alterne para a guia Coletor e clique em + Novo para o campo Conjunto de dados do coletor .

  23. Neste tutorial, o arquivo de dados de sink é do tipo Armazenamento de Blobs do Azure. Portanto, selecione Armazenamento de Blob do Azure e clique em Continuar na janela Novo Conjunto de Dados .

  24. Na janela Selecionar formato , selecione o tipo de formato dos dados e clique em Continuar.

  25. Na janela Definir propriedades , digite SinkDataset para Name. Em Serviço Vinculado, selecione + Novo. Nesta etapa, você cria uma conexão (serviço vinculado) com seu armazenamento de Blob do Azure.

  26. Na janela Novo Serviço Vinculado (Armazenamento de Blob do Azure), execute as seguintes etapas:

    1. Insira AzureStorageLinkedService para Name.
    2. Selecione sua conta de Armazenamento do Azure para Nome da conta de armazenamento.
    3. Testar Conexão e clique em Concluir.
  27. Na janela Definir Propriedades , confirme se AzureStorageLinkedService está selecionado para Serviço vinculado. Em seguida, selecione Concluir.

  28. Vá para a guia Conexão de SinkDataset e execute as seguintes etapas:

    1. Para o campo Caminho do arquivo , insira adftutorial/incrementalcopy. adftutorial é o nome do contêiner blob e incrementalcopy é o nome da pasta. Este fragmento parte do princípio de que tem um contentor de blobs denominado adftutorial no armazenamento de blobs. Crie o contentor se ainda não existir ou defina-o como o nome de um contentor existente. O Azure Data Factory cria automaticamente a cópia incremental da pasta de saída, se ela não existir. Você também pode usar o botão Procurar para o caminho do ficheiro para navegar até uma pasta num contentor de blob.
    2. Para a parte Arquivo do campo Caminho do arquivo , selecione Adicionar conteúdo dinâmico [Alt+P] e insira @CONCAT('Incremental-', pipeline().RunId, '.txt')na janela aberta. Em seguida, selecione Concluir. O nome do ficheiro é gerado dinamicamente através da expressão. Cada execução de pipeline tem um ID exclusivo. A atividade Copy utiliza o ID de execução para gerar o nome do ficheiro.
  29. Alterne para o editor de pipeline clicando na guia pipeline na parte superior ou clicando no nome do pipeline na visualização em árvore à esquerda.

  30. Na caixa de ferramentas Atividades, expanda Geral e arraste a atividade Procedimento Armazenado da caixa de ferramentas Atividades para a superfície de design do pipeline. Conecte a saída verde (Êxito) da atividade Copiar à atividade Procedimento armazenado .

  31. Selecione Atividade de Procedimento Armazenado no designer de pipeline, altere o nome para StoredProceduretoWriteWatermarkActivity.

  32. Alterne para a guia Conta SQL e selecione AzureSqlDatabaseLinkedService para serviço vinculado.

  33. Alterne para a guia Procedimento Armazenado e siga as seguintes etapas:

    1. Em Nome do procedimento armazenado, selecione usp_write_watermark.

    2. Para especificar valores para os parâmetros do procedimento armazenado, clique em Importar parâmetro e insira os seguintes valores para os parâmetros:

      Nome Tipo valor
      ÚltimaHoraDeModificação Data e Hora @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      Nome da Tabela Cordão @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

      Atividade de procedimento armazenado - configurações de procedimento armazenado

  34. Para validar as configurações do pipeline, clique em Validar na barra de ferramentas. Confirme que não há erros de validação. Para fechar a janela Relatório de Validação de Pipeline , clique em >>.

  35. Publique entidades (serviços vinculados, conjuntos de dados e pipelines) no serviço Azure Data Factory selecionando o botão Publicar Tudo . Aguarde até ver uma mensagem a indicar que a publicação foi bem-sucedida.

Acionar uma execução de pipeline

  1. Clique em Adicionar gatilho na barra de ferramentas e clique em Gatilho agora.

  2. Na janela Pipeline Run , selecione Finish.

Monitorizar a execução do pipeline.

  1. Mude para o separador Monitor do lado esquerdo. Você vê o status da execução do pipeline acionada por um gatilho manual. Você pode usar links na coluna NOME DO PIPELINE para exibir detalhes da execução e executar novamente o pipeline.

  2. Para ver as execuções de atividade associadas à execução do pipeline, selecione o link na coluna NOME DO PIPELINE . Para obter detalhes sobre a atividade executada, selecione o link Detalhes (ícone de óculos) na coluna NOME DA ATIVIDADE . Selecione Todas as Execuções de Pipelines no topo para voltar à visualização Execuções de Pipelines. Para atualizar a vista, selecione Atualizar.

Rever os resultados

  1. Conecte-se à sua Conta de Armazenamento do Azure usando ferramentas como o Gerenciador de Armazenamento do Azure. Verifique se um arquivo de saída foi criado na pasta incrementalcopy do contêiner adftutorial .

    Primeiro arquivo de saída

  2. Abra o arquivo de saída e observe que todos os dados são copiados do data_source_table para o arquivo de blob.

    1,aaaa,2017-09-01 00:56:00.0000000
    2,bbbb,2017-09-02 05:23:00.0000000
    3,cccc,2017-09-03 02:36:00.0000000
    4,dddd,2017-09-04 03:21:00.0000000
    5,eeee,2017-09-05 08:06:00.0000000
    
  3. Verifique o valor mais recente do watermarktable. Verá que o valor de limite de tamanho foi atualizado.

    Select * from watermarktable
    

    A saída é:

    | TableName | WatermarkValue |
    | --------- | -------------- |
    | data_source_table | 2017-09-05	8:06:00.000 |
    

Adicionar mais dados à origem

Insira novos dados em seu banco de dados (armazenamento da fonte de dados).

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

Os dados atualizados na sua base de dados são:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

Acionar outra execução de pipeline

  1. Alterne para a guia Editar . Clique no pipeline na visualização em árvore se ele não estiver aberto no designer.

  2. Clique em Adicionar gatilho na barra de ferramentas e clique em Gatilho agora.

Monitorizar a segunda execução do pipeline

  1. Mude para o separador Monitor do lado esquerdo. Você vê o status da execução do pipeline acionada por um gatilho manual. Você pode usar links na coluna NOME DO PIPELINE para exibir detalhes da atividade e executar novamente o pipeline.

  2. Para ver as execuções de atividade associadas à execução do pipeline, selecione o link na coluna NOME DO PIPELINE . Para obter detalhes sobre a atividade executada, selecione o link Detalhes (ícone de óculos) na coluna NOME DA ATIVIDADE . Selecione Todas as Execuções de Pipelines no topo para voltar à visualização Execuções de Pipelines. Para atualizar a vista, selecione Atualizar.

Verificar a segunda saída

  1. No armazenamento de blobs, verá que outro ficheiro foi criado. Neste tutorial, o novo nome de ficheiro é Incremental-<GUID>.txt. Abra esse ficheiro e verá duas linhas de registos no mesmo.

    6,newdata,2017-09-06 02:23:00.0000000
    7,newdata,2017-09-07 09:01:00.0000000    
    
  2. Verifique o valor mais recente do watermarktable. Verá que o valor de marca d’água foi atualizado.

    Select * from watermarktable
    

    Saída de exemplo:

    | TableName | WatermarkValue |
    | --------- | -------------- |
    | data_source_table | 2017-09-07 09:01:00.000 |
    

Neste tutorial, executou os passos seguintes:

  • Preparar o arquivo de dados para armazenar o valor de limite de tamanho.
  • Criar uma fábrica de dados.
  • Criar serviços ligados.
  • Crie conjuntos de dados de origem, de sink e de marca d'água.
  • Criar um pipeline.
  • Executar o pipeline.
  • Monitorizar a execução do pipeline.
  • Rever resultados
  • Adicionar mais dados à origem.
  • Executar o pipeline novamente.
  • Monitorizar a segunda execução do pipeline
  • Rever os resultados da segunda execução

Neste tutorial, o pipeline copiou dados de uma única tabela no Banco de dados SQL para o armazenamento de Blob. Avance para o tutorial a seguir para aprender a copiar dados de várias tabelas em um banco de dados do SQL Server para o Banco de dados SQL.