Udostępnij przez


Wzorzec przyrostowego masowania danych za pomocą przepływu danych Gen2

Ten samouczek zajmuje 15 minut i opisuje, jak przyrostowo zgromadzić dane w usłudze Lakehouse przy użyciu usługi Dataflow Gen2.

Przyrostowe zbieranie danych w miejscu docelowym danych wymaga techniki ładowania tylko nowych lub zaktualizowanych danych do miejsca docelowego danych. Tę technikę można wykonać przy użyciu zapytania w celu filtrowania danych na podstawie miejsca docelowego danych. W tym samouczku pokazano, jak utworzyć przepływ danych w celu załadowania danych ze źródła OData do magazynu typu lakehouse oraz jak dodać zapytanie do przepływu danych w celu filtrowania danych na podstawie miejsca docelowego danych.

Główne kroki w tym samouczku są następujące:

  • Utwórz przepływ danych, aby załadować dane ze źródła OData do magazynu danych typu lakehouse.
  • Dodaj zapytanie do przepływu danych, aby filtrować dane na podstawie miejsca docelowego danych.
  • (Opcjonalnie) załaduj ponownie dane przy użyciu notatników i potoków.

Wymagania wstępne

Musisz mieć obszar roboczy z włączoną usługą Microsoft Fabric. Jeśli jeszcze go nie masz, zapoznaj się z artykułem Tworzenie obszaru roboczego. Ponadto w samouczku założono, że używasz widoku diagramu w przepływie danych Gen2. Aby sprawdzić, czy używasz widoku diagramu, na górnej wstążce menu przejdź do pozycji Widok i upewnij się, że wybrano widok diagramu.

Utwórz przepływ danych w celu załadowania danych ze źródła OData do magazynu typu lakehouse

W tej sekcji utworzysz przepływ danych w celu załadowania danych ze źródła OData do magazynu typu lakehouse.

  1. Utwórz nowy lakehouse w swoim środowisku pracy.

    Zrzut ekranu przedstawiający okno dialogowe tworzenia usługi Lakehouse.

  2. Utwórz nowy przepływ danych Gen2 w obszarze roboczym.

    Zrzut ekranu przedstawiający rozwijaną listę tworzenia przepływu danych.

  3. Dodaj nowe źródło do przepływu danych. Wybierz źródło OData i wprowadź następujący adres URL: https://services.OData.org/V4/Northwind/Northwind.svc

    Zrzut ekranu przedstawiający okno dialogowe Pobieranie danych.

    Zrzut ekranu przedstawiający łącznik OData.

    Zrzut ekranu przedstawiający ustawienia OData.

  4. Wybierz tabelę Orders (Zamówienia) i wybierz przycisk Next (Dalej).

    Zrzut ekranu przedstawiający okno dialogowe wyboru tabeli zamówień.

  5. Wybierz następujące kolumny, aby zachować:

    • OrderID
    • CustomerID
    • EmployeeID
    • OrderDate
    • RequiredDate
    • ShippedDate
    • ShipVia
    • Freight
    • ShipName
    • ShipAddress
    • ShipCity
    • ShipRegion
    • ShipPostalCode
    • ShipCountry

    Zrzut ekranu przedstawiający funkcję choose columns (Wybieranie kolumn).

    Zrzut ekranu przedstawiający tabelę wyboru kolejności kolumn.

  6. Zmień typ danych , OrderDateRequiredDatei ShippedDate na datetime.

    Zrzut ekranu przedstawiający funkcję change datatype.

  7. Skonfiguruj miejsce docelowe danych w usłudze Lakehouse przy użyciu następujących ustawień:

    • Miejsce docelowe danych: Lakehouse
    • Lakehouse: wybierz jezioro utworzone w kroku 1.
    • Nowa nazwa tabeli: Orders
    • Metoda aktualizacji: Replace

    Zrzut ekranu przedstawiający wstążkę miejsca docelowego danych typu lakehouse.

    Zrzut ekranu przedstawiający tabelę zamówień lakehouse miejsca docelowego danych.

    Zrzut ekranu pokazujący ustawienia docelowego miejsca przechowywania danych w magazynie lakehouse.

  8. wybierz pozycję Dalej i opublikuj przepływ danych.

    Zrzut ekranu przedstawiający okno dialogowe publikowania przepływu danych.

Utworzyłeś teraz przepływ danych, aby załadować dane ze źródła OData do lakehouse. Ten przepływ danych jest używany w następnej sekcji, aby dodać zapytanie do przepływu danych w celu filtrowania danych na podstawie miejsca docelowego danych. Następnie możesz użyć przepływu danych do ponownego załadowania danych przy użyciu notatników i potoków.

Dodawanie zapytania do przepływu danych w celu filtrowania danych na podstawie miejsca docelowego danych

Ta sekcja dodaje zapytanie do przepływu danych, aby przefiltrować dane zgodnie z informacjami znajdującymi się w docelowym jeziorze danych. Zapytanie pobiera maksymalną wartość OrderID w Lakehouse na początku odświeżania przepływu danych i korzysta z maksymalnego OrderId, aby uzyskać tylko te zamówienia, które mają wyższy OrderId od źródła, a następnie dołącza je do miejsca docelowego danych. Przyjęto założenie, że zamówienia są dodawane do źródła w kolejności rosnącejOrderID. Jeśli tak nie jest, możesz użyć innej kolumny do filtrowania danych. Na przykład możesz użyć kolumny OrderDate do filtrowania danych.

Uwaga

Filtry OData są stosowane w ramach Fabric po odebraniu danych ze źródła danych. Jednak w przypadku źródeł baz danych, takich jak SQL Server, filtr jest stosowany w zapytaniu wysyłanym do źródła danych zaplecza i do usługi zwracane są tylko przefiltrowane wiersze.

  1. Po odświeżeniu przepływu danych otwórz ponownie przepływ danych utworzony w poprzedniej sekcji.

    Zrzut ekranu przedstawiający otwarte okno dialogowe przepływu danych.

  2. Utwórz nowe zapytanie o nazwie IncrementalOrderID i pobierz dane z tabeli zamówień w systemie Lakehouse utworzonym w poprzedniej sekcji.

    Zrzut ekranu przedstawiający okno dialogowe Pobieranie danych.

    Zrzut ekranu przedstawiający łącznik lakehouse.

    Zrzut ekranu przedstawiający tabelę zamówień w lakehouse.

    Zrzut ekranu przedstawiający funkcję kwerendy zmiany nazwy.

    Zrzut ekranu przedstawiający zmienioną nazwę zapytania.

  3. Wyłącz przemieszczanie tego zapytania.

    Zrzut ekranu przedstawiający funkcję wyłączania inscenizacji.

  4. W podglądzie danych kliknij prawym przyciskiem myszy kolumnę OrderID i wybierz pozycję Przejdź do szczegółów.

    Zrzut ekranu przedstawiający funkcję drążenia.

  5. Na wstążce wybierz pozycję Narzędzia listy ->Statystyka ->Maksimum.

    Zrzut ekranu przedstawiający maksymalną funkcję orderid statystyki.

Masz teraz zapytanie, które zwraca maksymalny identyfikator OrderID w Lakehouse. To zapytanie służy do filtrowania danych ze źródła OData. W następnej sekcji dodano zapytanie do przepływu danych w celu filtrowania danych ze źródła OData na podstawie maksymalnego identyfikatora OrderID w magazynie typu lakehouse.

  1. Wróć do zapytania Orders (Zamówienia) i dodaj nowy krok w celu filtrowania danych. Użyj następujących ustawień:

    • Kolumna: OrderID
    • Operacja: Greater than
    • Wartość: parametr IncrementalOrderID

    Zrzut ekranu przedstawiający identyfikator orderid większy niż funkcja filter.

    Zrzut ekranu przedstawiający ustawienia filtru.

  2. Zezwalaj na łączenie danych ze źródła OData i lakehouse, potwierdzając następujące okno dialogowe:

    Zrzut ekranu przedstawiający okno dialogowe zezwalania na łączenie danych.

  3. Zaktualizuj miejsce docelowe danych, aby użyć następujących ustawień:

    • Metoda aktualizacji: Append

    Zrzut ekranu przedstawiający funkcję edytuj ustawienia danych wyjściowych.

    Zrzut ekranu przedstawiający istniejącą tabelę zamówień.

    Zrzut ekranu przedstawiający dołączanie ustawień miejsca docelowego usługi Lakehouse.

  4. Opublikuj przepływ danych.

    Zrzut ekranu przedstawiający okno dialogowe publikowania przepływu danych.

Przepływ danych zawiera teraz zapytanie, które filtruje dane ze źródła OData na podstawie maksymalnego identyfikatora OrderID w lakehouse. Oznacza to, że do lakehouse są ładowane tylko nowe lub zaktualizowane dane. W następnej sekcji, przepływ danych zostanie użyty do ponownego ładowania danych za pomocą notebooków i potoków.

(Opcjonalnie) ponownie załaduj dane za pomocą notatników i pipelines.

Opcjonalnie możesz ponownie załadować określone dane przy użyciu notesów i potoków. Za pomocą niestandardowego kodu w języku Python w notatniku usuwasz stare dane z lakehouse'u. Następnie utworzysz potok, w którym najpierw uruchomisz notes i sekwencyjnie uruchomisz przepływ danych, załadujesz ponownie dane ze źródła OData do usługi Lakehouse. Notesy obsługują wiele języków, ale w tym samouczku jest używany program PySpark. Pyspark to interfejs API języka Python dla platformy Spark i jest używany w tym samouczku do uruchamiania zapytań Spark SQL.

  1. Utwórz nowy notatnik w swoim środowisku pracy.

    Zrzut ekranu przedstawiający okno dialogowe nowego notesu.

  2. Dodaj następujący kod PySpark do notesu:

    ### Variables
    LakehouseName = "YOURLAKEHOUSE"
    TableName = "Orders"
    ColName = "OrderID"
    NumberOfOrdersToRemove = "10"
    
    
    ### Remove Old Orders
    Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect()
    Reload = Reload[0].ReLoadValue
    spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))
    
  3. Uruchom notatnik, aby sprawdzić, czy dane zostały usunięte z lakehouse.

  4. Utwórz nowy potok w obszarze roboczym.

    Zrzut ekranu przedstawiający okno dialogowe nowej linii przetwarzania.

  5. Dodaj nowe zadanie notesu do potoku danych i wybierz notes utworzony w poprzednim kroku.

    Zrzut ekranu przedstawiający okno dialogowe dodawania notesu.

    Zrzut ekranu przedstawiający okno dialogowe wybierania notesu.

  6. Dodaj nową aktywność przepływu danych do potoku i wybierz przepływ danych utworzony w poprzednim kroku.

    Zrzut ekranu przedstawiający okno dialogowe dodawania działania przepływu danych.

    Zrzut ekranu przedstawiający okno dialogowe wybierania przepływu danych.

  7. Połącz aktywność notesu z aktywnością przepływu danych za pomocą wyzwalacza powodzenia.

    Zrzut ekranu przedstawiający okno dialogowe łączenia działań.

  8. Zapisz i uruchom potok.

    Zrzut ekranu przedstawiający okno dialogowe uruchamiania pipeline'u.

Masz teraz pipeline, który usuwa stare dane z lakehouse'u i ponownie ładuje dane ze źródła OData do lakehouse'u. Dzięki tej konfiguracji można regularnie przeładowywać dane ze źródła OData do Lakehouse.