Udostępnij przez


Przyrostowe ładowanie danych z magazynu danych do Lakehouse

Z tego samouczka dowiesz się, jak skopiować tylko nowe lub zmienione dane z usługi Data Warehouse do usługi Lakehouse. Takie podejście jest nazywane ładowaniem przyrostowym i jest przydatne, gdy chcesz zachować dane up-to-date bez kopiowania wszystkiego za każdym razem.

Oto ogólny projekt rozwiązania:

Diagram przedstawiający przyrostowe ładowanie logiki danych.

  1. Wybierz kolumnę limitu. Wybierz jedną kolumnę w tabeli źródłowej, która ułatwia śledzenie nowych lub zmienionych rekordów. Ta kolumna zwykle zawiera wartości, które zwiększają się, gdy wiersze są dodawane lub aktualizowane (na przykład znacznik czasu lub identyfikator). Użyjemy najwyższej wartości w tej kolumnie jako naszego "wskaźnika", aby wiedzieć, gdzie skończyliśmy.

  2. Skonfiguruj tabelę do przechowywania ostatniej wartości limitu.

  3. Zbuduj przepływ, który wykonuje następujące czynności:

    Przepływ pracy obejmuje następujące działania:

    • Dwie czynności wyszukiwania. Pierwszy pobiera ostatnią wartość limitu (gdzie zatrzymaliśmy się ostatni raz). Drugi pobiera nową wartość limitu (gdzie zatrzymamy się tym razem). Obie wartości są przekazywane do działania kopiowania.
    • Działanie kopiowania, które znajduje wiersze, w których wartość kolumny znacznika znajduje się między starą a nową wartością znacznika. Następnie kopiuje te dane z magazynu danych do usługi Lakehouse jako nowy plik.
    • Działanie procedury składowanej, które zapisuje nową wartość limitu, dzięki czemu następne uruchomienie potoku wie, gdzie należy rozpocząć.

Wymagania wstępne

  • Magazyn danych. Magazyn danych będzie używany jako źródło danych. Jeśli go nie masz, zapoznaj się z tematem Tworzenie magazynu danych , aby uzyskać instrukcje.
  • Lakehouse. Użyjesz usługi Lakehouse jako docelowego magazynu danych. Jeśli go nie masz, zobacz Tworzenie usługi Lakehouse , aby uzyskać instrukcje.
    • Utwórz folder o nazwie IncrementalCopy do przechowywania skopiowanych danych.

Przygotowywanie źródła

Zanim skonfigurujesz potok kopiowania przyrostowego, przygotuj tabele oraz procedurę składowaną potrzebną w Twoim magazynie danych.

1. Tworzenie tabeli źródła danych w magazynie danych

Uruchom następujące polecenie SQL w usłudze Data Warehouse, aby utworzyć tabelę o nazwie data_source_table jako tabelę źródłową. Użyjemy tego jako przykładowych danych do kopiowania przyrostowego.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

Dane w tabeli źródłowej wyglądają następująco:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

W tym samouczku użyjemy kolumny LastModifytime jako kolumny limitu.

2. Utwórz inną tabelę w magazynie danych, aby przechowywać ostatnią wartość watermark.

  1. Uruchom następujące polecenie SQL w usłudze Data Warehouse, aby utworzyć tabelę o nazwie watermarktable do przechowywania ostatniej wartości znacznika wodnego:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. Ustaw domyślną wartość ostatniego znacznika za pomocą nazwy tabeli źródłowej. W tym samouczku nazwa tabeli jest data_source_table i ustawimy wartość domyślną na 1/1/2010 12:00:00 AM.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Sprawdź dane w tabeli znaków wodnych.

    Select * from watermarktable
    

    Wyjście:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Utwórz procedurę składowaną w magazynie danych

Uruchom następujące polecenie, aby utworzyć procedurę składowaną w magazynie danych. Ta procedura składowana aktualizuje ostatnią wartość znacznika wodnego po każdym uruchomieniu potoku.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Skonfiguruj potok do kopiowania przyrostowego

Krok 1: Utwórz potok

  1. Przejdź do usługi Power BI.

  2. Wybierz ikonę usługi Power BI w lewym dolnym rogu ekranu, a następnie wybierz pozycję Fabric.

  3. Wybierz pozycję Mój obszar roboczy , aby otworzyć obszar roboczy usługi Fabric.

  4. Wybierz pozycję + Nowy element, a następnie wybierz pozycję Potok, a następnie wprowadź nazwę potoku, aby utworzyć nowy potok.

    Zrzut ekranu przedstawiający przycisk nowego potoku w nowo utworzonym obszarze roboczym.

    Zrzut ekranu przedstawiający nazwę tworzenia nowego potoku.

Krok 2. Dodaj działanie wyszukiwania dla ostatniego znaku wodnego

W tym kroku utworzysz czynność wyszukiwania, aby uzyskać ostatnią wartość punktu odniesienia. Uzyskamy ustawioną wcześniej wartość 1/1/2010 12:00:00 AM domyślną.

  1. Wybierz pozycję Działanie potoku i wybierz pozycję Wyszukaj z listy rozwijanej.

  2. Na karcie Ogólne zmień nazwę tego działania na LookupOldWaterMarkActivity.

  3. Na karcie Ustawienia skonfiguruj następujące ustawienia:

    • Połączenie: w obszarze Magazyn wybierz pozycję Przeglądaj wszystko, a następnie z listy wybierz magazyn danych.
    • Użyj zapytania: wybierz pozycję Tabela.
    • Tabela: wybierz pozycję dbo.watermarktable.
    • Pierwszy wiersz tylko: wybrany.

    Zrzut ekranu przedstawiający wyszukiwanie starego znaku wodnego.

Krok 3. Dodawanie działania wyszukiwania dla nowego znaku wodnego

W tym kroku utworzysz działanie wyszukiwania, aby uzyskać nową wartość punktu kontrolnego. Użyjesz zapytania, aby pobrać nowy znacznik z tabeli danych źródłowych. Uzyskamy najwyższą wartość w kolumnie LastModifytime z data_source_table.

  1. Na górnym pasku wybierz Wyszukaj w zakładce Działania, aby dodać drugie działanie wyszukiwania.

  2. Na karcie Ogólne zmień nazwę tego działania na LookupNewWaterMarkActivity.

  3. Na karcie Ustawienia skonfiguruj następujące ustawienia:

    • Połączenie: W obszarze Magazyn wybierz pozycję Przeglądaj wszystko, a następnie wybierz magazyn danych z listy lub wybierz magazyn danych z Połączenia elementów Fabric.

    • Użyj zapytania: wybierz Zapytanie.

    • Zapytanie: Wprowadź następujące zapytanie, aby wybrać maksymalny czas ostatniej modyfikacji jako nowy punkt odniesienia:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • Pierwszy wiersz tylko: wybrany.

    Zrzut ekranu przedstawiający podgląd nowego znaku wodnego.

Krok 4. Dodawanie działania kopiowania w celu kopiowania danych przyrostowych

W tym kroku dodasz działanie kopiowania, aby skopiować dane przyrostowe między ostatnim znakiem wodnym a nowym znakiem wodnym z usługi Data Warehouse do usługi Lakehouse.

  1. Wybierz pozycję Działania na górnym pasku i wybierz pozycję Kopiuj dane -> Dodaj do kanwy , aby uzyskać działanie kopiowania.

  2. Na karcie Ogólne zmień nazwę tego działania na IncrementalCopyActivity.

  3. Połącz oba działania wyszukiwania z działaniem kopiowania, przeciągając zielony przycisk (Po pomyślnym zakończeniu) z działań wyszukiwania do działania kopiowania. Zwolnij przycisk myszy, gdy kolor obramowania działania kopiowania zmieni się na zielony.

    Zrzut ekranu przedstawiający łączenie działań wyszukiwania i kopiowania.

  4. Na karcie Źródło skonfiguruj następujące ustawienia:

    • Połączenie: W obszarze Magazyn wybierz pozycję Przeglądaj wszystko, a następnie wybierz magazyn danych z listy lub wybierz magazyn danych z Połączenia elementów Fabric.

    • Magazyn: wybierz magazyn.

    • Użyj zapytania: wybierz Zapytanie.

    • Zapytanie: Wprowadź następujące zapytanie, aby skopiować przyrostowe dane między poprzednim znacznikiem czasowym a nowym znacznikiem czasowym.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    Zrzut ekranu przedstawiający konfigurację źródła kopiowania.

  5. Na karcie Miejsce docelowe skonfiguruj następujące ustawienia:

    • Połączenie: w obszarze Lakehouse wybierz pozycję Przeglądaj wszystko, a następnie wybierz Lakehouse z listy lub Lakehouse z połączeń elementów Fabric.
    • Lakehouse: Wybierz swoje Lakehouse.
    • Folder główny: wybierz pozycję Pliki.
    • Ścieżka pliku: wybierz folder, w którym chcesz przechowywać skopiowane dane. Wybierz pozycję Przeglądaj , aby wybrać folder. W polu nazwa pliku otwórz pozycję Dodaj dynamiczną zawartość i wprowadź @CONCAT('Incremental-', pipeline().RunId, '.txt') w otwartym oknie, aby utworzyć nazwy plików dla skopiowanego pliku danych w usłudze Lakehouse.
    • Format pliku: wybierz typ formatu danych.

    Zrzut ekranu przedstawiający konfigurację lokalizacji docelowej kopiowania.

Krok 5: Dodaj aktywność procedury składowanej

W tym kroku dodasz działanie procedury składowanej, aby zaktualizować ostatnią wartość limitu dla następnego uruchomienia potoku.

  1. Wybierz pozycję Działania na górnym pasku i wybierz pozycję Procedura składowana, aby dodać działanie procedury składowanej.

  2. Na karcie Ogólne zmień nazwę tego działania na StoredProceduretoWriteWatermarkActivity.

  3. Połącz zielone (po powodzeniu) dane wyjściowe działania kopiowania z działaniem procedury składowanej.

  4. Na karcie Ustawienia skonfiguruj następujące ustawienia:

    • Data Warehouse: wybierz magazyn danych.

    • Nazwa procedury składowanej: wybierz procedurę składowaną utworzoną w magazynie danych: [dbo].[ usp_write_watermark].

    • Rozwiń Parametry procedury składowanej. Aby ustawić wartości parametrów procedury składowanej, wybierz pozycję Importuj i wprowadź następujące wartości dla parametrów:

      Nazwisko Typ Wartość
      CzasOstatniejModyfikacji Data i Czas @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      NazwaTabeli String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    Zrzut ekranu przedstawiający konfigurację działania procedury składowanej.

Krok 6: Uruchom potok i monitoruj wynik

Na górnym pasku wybierz pozycję Uruchom na karcie Narzędzia główne . Następnie wybierz pozycję Zapisz i uruchom. Potok rozpoczyna działanie i można go monitorować na karcie Dane wyjściowe.

Zrzut ekranu przedstawiający wyniki działania potoku.

Przejdź do usługi Lakehouse i zobaczysz, że plik danych znajduje się w folderze, który został wybrany. Możesz wybrać plik, aby wyświetlić podgląd skopiowanych danych.

Zrzut ekranu przedstawiający dane usługi Lakehouse dla pierwszego uruchomienia potoku.

Zrzut ekranu przedstawiający podgląd danych Lakehouse dla pierwszego uruchomienia potoku.

Dodawanie większej liczby danych w celu wyświetlenia wyników kopiowania przyrostowego

Po zakończeniu pierwszego uruchomienia potoku dodajmy więcej danych do tabeli źródłowej magazynu danych, aby sprawdzić, czy ten potok może skopiować dane przyrostowe.

Krok 1. Dodawanie większej ilości danych do źródła

Wstaw nowe dane do magazynu danych, uruchamiając następujące zapytanie:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

Zaktualizowane dane dla data_source_table są następujące:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

Krok 2: Wyzwól kolejne uruchomienie potoku i monitoruj wynik

Wróć do strony potoku. Na górnym pasku ponownie wybierz pozycję Uruchom na karcie Strona główna. Potok zaczyna działać i można go monitorować w obszarze Dane wyjściowe.

Przejdź do swojego Lakehouse, a znajdziesz nowy skopiowany plik danych w folderze, który wybrałeś. Możesz wybrać plik, aby wyświetlić podgląd skopiowanych danych. Zobaczysz, że dane przyrostowe zostaną wyświetlone w tym pliku.

Zrzut ekranu przedstawiający dane Lakehouse dla drugiego uruchomienia potoku.

Zrzut ekranu przedstawiający podgląd danych lakehouse dla drugiego przebiegu potoku.

Następnie dowiedz się więcej na temat kopiowania z usługi Azure Blob Storage do usługi Lakehouse.