Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Z tego samouczka dowiesz się, jak skopiować tylko nowe lub zmienione dane z usługi Data Warehouse do usługi Lakehouse. Takie podejście jest nazywane ładowaniem przyrostowym i jest przydatne, gdy chcesz zachować dane up-to-date bez kopiowania wszystkiego za każdym razem.
Oto ogólny projekt rozwiązania:
Wybierz kolumnę limitu. Wybierz jedną kolumnę w tabeli źródłowej, która ułatwia śledzenie nowych lub zmienionych rekordów. Ta kolumna zwykle zawiera wartości, które zwiększają się, gdy wiersze są dodawane lub aktualizowane (na przykład znacznik czasu lub identyfikator). Użyjemy najwyższej wartości w tej kolumnie jako naszego "wskaźnika", aby wiedzieć, gdzie skończyliśmy.
Skonfiguruj tabelę do przechowywania ostatniej wartości limitu.
Zbuduj przepływ, który wykonuje następujące czynności:
Przepływ pracy obejmuje następujące działania:
- Dwie czynności wyszukiwania. Pierwszy pobiera ostatnią wartość limitu (gdzie zatrzymaliśmy się ostatni raz). Drugi pobiera nową wartość limitu (gdzie zatrzymamy się tym razem). Obie wartości są przekazywane do działania kopiowania.
- Działanie kopiowania, które znajduje wiersze, w których wartość kolumny znacznika znajduje się między starą a nową wartością znacznika. Następnie kopiuje te dane z magazynu danych do usługi Lakehouse jako nowy plik.
- Działanie procedury składowanej, które zapisuje nową wartość limitu, dzięki czemu następne uruchomienie potoku wie, gdzie należy rozpocząć.
Wymagania wstępne
- Magazyn danych. Magazyn danych będzie używany jako źródło danych. Jeśli go nie masz, zapoznaj się z tematem Tworzenie magazynu danych , aby uzyskać instrukcje.
-
Lakehouse. Użyjesz usługi Lakehouse jako docelowego magazynu danych. Jeśli go nie masz, zobacz Tworzenie usługi Lakehouse , aby uzyskać instrukcje.
- Utwórz folder o nazwie IncrementalCopy do przechowywania skopiowanych danych.
Przygotowywanie źródła
Zanim skonfigurujesz potok kopiowania przyrostowego, przygotuj tabele oraz procedurę składowaną potrzebną w Twoim magazynie danych.
1. Tworzenie tabeli źródła danych w magazynie danych
Uruchom następujące polecenie SQL w usłudze Data Warehouse, aby utworzyć tabelę o nazwie data_source_table jako tabelę źródłową. Użyjemy tego jako przykładowych danych do kopiowania przyrostowego.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
Dane w tabeli źródłowej wyglądają następująco:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
W tym samouczku użyjemy kolumny LastModifytime jako kolumny limitu.
2. Utwórz inną tabelę w magazynie danych, aby przechowywać ostatnią wartość watermark.
Uruchom następujące polecenie SQL w usłudze Data Warehouse, aby utworzyć tabelę o nazwie watermarktable do przechowywania ostatniej wartości znacznika wodnego:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );Ustaw domyślną wartość ostatniego znacznika za pomocą nazwy tabeli źródłowej. W tym samouczku nazwa tabeli jest data_source_table i ustawimy wartość domyślną na
1/1/2010 12:00:00 AM.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')Sprawdź dane w tabeli znaków wodnych.
Select * from watermarktableWyjście:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Utwórz procedurę składowaną w magazynie danych
Uruchom następujące polecenie, aby utworzyć procedurę składowaną w magazynie danych. Ta procedura składowana aktualizuje ostatnią wartość znacznika wodnego po każdym uruchomieniu potoku.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Skonfiguruj potok do kopiowania przyrostowego
Krok 1: Utwórz potok
Przejdź do usługi Power BI.
Wybierz ikonę usługi Power BI w lewym dolnym rogu ekranu, a następnie wybierz pozycję Fabric.
Wybierz pozycję Mój obszar roboczy , aby otworzyć obszar roboczy usługi Fabric.
Wybierz pozycję + Nowy element, a następnie wybierz pozycję Potok, a następnie wprowadź nazwę potoku, aby utworzyć nowy potok.
Krok 2. Dodaj działanie wyszukiwania dla ostatniego znaku wodnego
W tym kroku utworzysz czynność wyszukiwania, aby uzyskać ostatnią wartość punktu odniesienia. Uzyskamy ustawioną wcześniej wartość 1/1/2010 12:00:00 AM domyślną.
Wybierz pozycję Działanie potoku i wybierz pozycję Wyszukaj z listy rozwijanej.
Na karcie Ogólne zmień nazwę tego działania na LookupOldWaterMarkActivity.
Na karcie Ustawienia skonfiguruj następujące ustawienia:
- Połączenie: w obszarze Magazyn wybierz pozycję Przeglądaj wszystko, a następnie z listy wybierz magazyn danych.
- Użyj zapytania: wybierz pozycję Tabela.
- Tabela: wybierz pozycję dbo.watermarktable.
- Pierwszy wiersz tylko: wybrany.
Krok 3. Dodawanie działania wyszukiwania dla nowego znaku wodnego
W tym kroku utworzysz działanie wyszukiwania, aby uzyskać nową wartość punktu kontrolnego. Użyjesz zapytania, aby pobrać nowy znacznik z tabeli danych źródłowych. Uzyskamy najwyższą wartość w kolumnie LastModifytime z data_source_table.
Na górnym pasku wybierz Wyszukaj w zakładce Działania, aby dodać drugie działanie wyszukiwania.
Na karcie Ogólne zmień nazwę tego działania na LookupNewWaterMarkActivity.
Na karcie Ustawienia skonfiguruj następujące ustawienia:
Połączenie: W obszarze Magazyn wybierz pozycję Przeglądaj wszystko, a następnie wybierz magazyn danych z listy lub wybierz magazyn danych z Połączenia elementów Fabric.
Użyj zapytania: wybierz Zapytanie.
Zapytanie: Wprowadź następujące zapytanie, aby wybrać maksymalny czas ostatniej modyfikacji jako nowy punkt odniesienia:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_tablePierwszy wiersz tylko: wybrany.
Krok 4. Dodawanie działania kopiowania w celu kopiowania danych przyrostowych
W tym kroku dodasz działanie kopiowania, aby skopiować dane przyrostowe między ostatnim znakiem wodnym a nowym znakiem wodnym z usługi Data Warehouse do usługi Lakehouse.
Wybierz pozycję Działania na górnym pasku i wybierz pozycję Kopiuj dane -> Dodaj do kanwy , aby uzyskać działanie kopiowania.
Na karcie Ogólne zmień nazwę tego działania na IncrementalCopyActivity.
Połącz oba działania wyszukiwania z działaniem kopiowania, przeciągając zielony przycisk (Po pomyślnym zakończeniu) z działań wyszukiwania do działania kopiowania. Zwolnij przycisk myszy, gdy kolor obramowania działania kopiowania zmieni się na zielony.
Na karcie Źródło skonfiguruj następujące ustawienia:
Połączenie: W obszarze Magazyn wybierz pozycję Przeglądaj wszystko, a następnie wybierz magazyn danych z listy lub wybierz magazyn danych z Połączenia elementów Fabric.
Magazyn: wybierz magazyn.
Użyj zapytania: wybierz Zapytanie.
Zapytanie: Wprowadź następujące zapytanie, aby skopiować przyrostowe dane między poprzednim znacznikiem czasowym a nowym znacznikiem czasowym.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Na karcie Miejsce docelowe skonfiguruj następujące ustawienia:
- Połączenie: w obszarze Lakehouse wybierz pozycję Przeglądaj wszystko, a następnie wybierz Lakehouse z listy lub Lakehouse z połączeń elementów Fabric.
- Lakehouse: Wybierz swoje Lakehouse.
- Folder główny: wybierz pozycję Pliki.
-
Ścieżka pliku: wybierz folder, w którym chcesz przechowywać skopiowane dane. Wybierz pozycję Przeglądaj , aby wybrać folder. W polu nazwa pliku otwórz pozycję Dodaj dynamiczną zawartość i wprowadź
@CONCAT('Incremental-', pipeline().RunId, '.txt')w otwartym oknie, aby utworzyć nazwy plików dla skopiowanego pliku danych w usłudze Lakehouse. - Format pliku: wybierz typ formatu danych.
Krok 5: Dodaj aktywność procedury składowanej
W tym kroku dodasz działanie procedury składowanej, aby zaktualizować ostatnią wartość limitu dla następnego uruchomienia potoku.
Wybierz pozycję Działania na górnym pasku i wybierz pozycję Procedura składowana, aby dodać działanie procedury składowanej.
Na karcie Ogólne zmień nazwę tego działania na StoredProceduretoWriteWatermarkActivity.
Połącz zielone (po powodzeniu) dane wyjściowe działania kopiowania z działaniem procedury składowanej.
Na karcie Ustawienia skonfiguruj następujące ustawienia:
Data Warehouse: wybierz magazyn danych.
Nazwa procedury składowanej: wybierz procedurę składowaną utworzoną w magazynie danych: [dbo].[ usp_write_watermark].
Rozwiń Parametry procedury składowanej. Aby ustawić wartości parametrów procedury składowanej, wybierz pozycję Importuj i wprowadź następujące wartości dla parametrów:
Nazwisko Typ Wartość CzasOstatniejModyfikacji Data i Czas @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} NazwaTabeli String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Krok 6: Uruchom potok i monitoruj wynik
Na górnym pasku wybierz pozycję Uruchom na karcie Narzędzia główne . Następnie wybierz pozycję Zapisz i uruchom. Potok rozpoczyna działanie i można go monitorować na karcie Dane wyjściowe.
Przejdź do usługi Lakehouse i zobaczysz, że plik danych znajduje się w folderze, który został wybrany. Możesz wybrać plik, aby wyświetlić podgląd skopiowanych danych.
Dodawanie większej liczby danych w celu wyświetlenia wyników kopiowania przyrostowego
Po zakończeniu pierwszego uruchomienia potoku dodajmy więcej danych do tabeli źródłowej magazynu danych, aby sprawdzić, czy ten potok może skopiować dane przyrostowe.
Krok 1. Dodawanie większej ilości danych do źródła
Wstaw nowe dane do magazynu danych, uruchamiając następujące zapytanie:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Zaktualizowane dane dla data_source_table są następujące:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Krok 2: Wyzwól kolejne uruchomienie potoku i monitoruj wynik
Wróć do strony potoku. Na górnym pasku ponownie wybierz pozycję Uruchom na karcie Strona główna. Potok zaczyna działać i można go monitorować w obszarze Dane wyjściowe.
Przejdź do swojego Lakehouse, a znajdziesz nowy skopiowany plik danych w folderze, który wybrałeś. Możesz wybrać plik, aby wyświetlić podgląd skopiowanych danych. Zobaczysz, że dane przyrostowe zostaną wyświetlone w tym pliku.
Powiązana zawartość
Następnie dowiedz się więcej na temat kopiowania z usługi Azure Blob Storage do usługi Lakehouse.