Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Lernprogramm erfahren Sie, wie Sie nur neue oder geänderte Daten aus Ihrem Data Warehouse in ein Lakehouse kopieren. Dieser Ansatz wird als inkrementelles Laden bezeichnet, und es ist hilfreich, wenn Sie Ihre Daten up-to-datum beibehalten möchten, ohne jedes Mal alles zu kopieren.
Dies ist der allgemeine Entwurf der Lösung:
Wählen Sie eine Wasserzeichenspalte aus. Wählen Sie eine Spalte in der Quelltabelle aus, die beim Nachverfolgen neuer oder geänderter Datensätze hilft. Diese Spalte enthält in der Regel Werte, die sich erhöhen, wenn Zeilen hinzugefügt oder aktualisiert werden (z. B. ein Zeitstempel oder eine ID). Wir verwenden den höchsten Wert in dieser Spalte als "Markierung", um festzustellen, wo wir aufgehört haben.
Richten Sie eine Tabelle ein, um Den letzten Wasserzeichenwert zu speichern.
Erstellen Sie eine Pipeline, die folgendes ausführt:
Die Pipeline umfasst die folgenden Aktivitäten:
- Zwei Nachschlageaktivitäten. Die erste Instanz ruft den letzten Wasserzeichenwert ab, dort wo wir das letzte Mal aufgehört haben. Die zweite Funktion erhält den neuen Wasserzeichenwert (wo wir dieses Mal anhalten). Beide Werte werden an die Kopieraktivität übergeben.
- Eine Kopieraktivität, die Zeilen findet, in denen der Wert der Wasserzeichenspalte zwischen den alten und neuen Wasserzeichen liegt. Anschließend werden diese Daten aus Ihrem Data Warehouse als neue Datei in Ihr Lakehouse kopiert.
- Eine gespeicherte Prozeduraktivität, die den neuen Wasserzeichenwert speichert, damit die nächste Pipelineausführung weiß, wo sie beginnen soll.
Voraussetzungen
- Data Warehouse: Sie verwenden das Data Warehouse als Datenquelle. Wenn Sie keins haben, lesen Sie die Anweisungen zum Erstellen eines Data Warehouses .
-
Lakehouse: Sie verwenden das Lakehouse als Zieldatenspeicher. Wenn Sie keins haben, finden Sie anweisungen unter Create a Lakehouse .
- Erstellen Sie einen Ordner namens "IncrementalCopy ", um ihre kopierten Daten zu speichern.
Bereiten Sie Ihre Quelle vor
Lassen Sie uns die Tabellen und gespeicherten Prozeduren einrichten, die Sie in Ihrem Data Warehouse benötigen, bevor Sie die inkrementelle Kopierpipeline konfigurieren.
1. Erstellen einer Datenquellentabelle in Ihrem Data Warehouse
Führen Sie den folgenden SQL-Befehl in Ihrem Data Warehouse aus, um eine Tabelle mit dem Namen data_source_table als Quelltabelle zu erstellen. Wir verwenden dies als Beispieldaten für die inkrementelle Kopie.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
Die Daten in der Quelltabelle sehen wie folgt aus:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
In diesem Lernprogramm verwenden wir LastModifytime als Wasserzeichenspalte.
2. Erstellen einer anderen Tabelle in Ihrem Data Warehouse zum Speichern des letzten Grenzwerts
Führen Sie den folgenden SQL-Befehl in Ihrem Data Warehouse aus, um eine Tabelle namens watermarktable zu erstellen und den letzten Grenzwert zu speichern:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );Legen Sie den Standardwert des letzten Wasserzeichens mit dem Namen der Quelltabelle fest. In diesem Lernprogramm ist der Tabellenname data_source_table, und wir legen den Standardwert auf
1/1/2010 12:00:00 AM.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')Überprüfen Sie die Daten in Ihrer watermarktable.
Select * from watermarktableAusgabe:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Erstellen einer gespeicherten Prozedur in Ihrem Data Warehouse
Führen Sie den folgenden Befehl zum Erstellen einer gespeicherten Prozedur in Ihrem Data Warehouse aus. Diese gespeicherte Prozedur aktualisiert den Wert des letzten Wasserzeichens nach jeder Pipeline-Ausführung.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Konfigurieren einer Pipeline für inkrementelles Kopieren
Schritt 1: Erstellen einer Pipeline
Wechseln Sie zu Power BI.
Wählen Sie unten links auf dem Bildschirm das Power BI-Symbol aus, und wählen Sie dann "Fabric" aus.
Wählen Sie "Mein Arbeitsbereich" aus, um Ihren Fabric-Arbeitsbereich zu öffnen.
Wählen Sie +Neues Element aus, wählen Sie dann "Pipeline" aus, und geben Sie dann einen Pipelinenamen ein, um eine neue Pipeline zu erstellen.
Schritt 2: Hinzufügen einer Lookupaktivität für den letzten Grenzwert
In diesem Schritt erstellen Sie eine Nachschlageaktivität, um den letzten Wasserzeichenwert abzurufen. Wir erhalten den Standardwert 1/1/2010 12:00:00 AM , den wir zuvor festgelegt haben.
Wählen Sie "Pipelineaktivität" und dann " Nachschlagen " aus der Dropdownliste aus.
Unter der Registerkarte Allgemein benennen Sie diese Aktivität in LookupOldWaterMarkActivity um.
Konfigurieren Sie auf der Registerkarte "Einstellungen " Folgendes:
- Verbindung: Wählen Sie unter "Warehouse" "Alle durchsuchen" aus, und wählen Sie ihr Data Warehouse aus der Liste aus.
- Abfrage verwenden: Wählen Sie Tabelle aus.
- Tabelle: Wählen Sie dbo.watermarktable aus.
- Nur erste Zeile: Ausgewählt.
Schritt 3: Hinzufügen einer Lookupaktivität für den neuen Grenzwert
In diesem Schritt erstellen Sie eine Nachschlageaktivität, um den neuen Wasserzeichenwert abzurufen. Sie verwenden eine Abfrage, um das neue Wasserzeichen aus der Quelldatentabelle abzurufen. Wir erhalten den höchsten Wert in der Spalte "LastModifytime " aus data_source_table.
Wählen Sie auf der oberen Leiste unter der Registerkarte „Aktivitäten“„Lookup“ aus, um die zweite Nachschlageaktivität hinzuzufügen.
Benennen Sie auf der Registerkarte Allgemein diese Aktivität in LookupNewWaterMarkActivity um.
Konfigurieren Sie auf der Registerkarte "Einstellungen " Folgendes:
Verbindung: Unter Warehouse wählen Sie Alle durchsuchen aus, und wählen Sie Ihr Data Warehouse aus der Liste aus, oder wählen Sie Ihr Data Warehouse aus Fabric-Elementverbindungen aus.
Abfrage verwenden: Wählen Sie Abfrage aus.
Abfrage: Geben Sie die folgende Abfrage ein, um den maximalen Zeitpunkt der letzten Änderung als neuen Grenzwert auszuwählen:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_tableNur erste Zeile: Ausgewählt.
Schritt 4: Hinzufügen der Kopieraktivität zum Kopieren inkrementeller Daten
In diesem Schritt fügen Sie eine Kopieraktivität hinzu, um die inkrementellen Daten zwischen dem letzten Wasserzeichen und dem neuen Wasserzeichen aus Ihrem Data Warehouse in Ihr Lakehouse zu kopieren.
Wählen Sie in der oberen Leiste Aktivitäten aus, und wählen Sie Daten kopieren –>In Canvas hinzufügen aus, um die Kopieraktivität abzurufen.
Benennen Sie diese Aktivität auf der Registerkarte " Allgemein " in "IncrementalCopyActivity" um.
Verbinden Sie beide Nachschlageaktivitäten mit der Kopieraktivität, indem Sie die grüne Schaltfläche (bei Erfolg) von den Nachschlageaktivitäten zur Kopieraktivität ziehen. Lassen Sie die Maustaste los, wenn Sie sehen, dass sich die Rahmenfarbe der Kopieraktivität in grün ändert.
Konfigurieren Sie auf der Registerkarte " Quelle " Folgendes:
Verbindung: Unter Warehouse wählen Sie Alle durchsuchen aus, und wählen Sie Ihr Data Warehouse aus der Liste aus, oder wählen Sie Ihr Data Warehouse aus Fabric-Elementverbindungen aus.
Lager: Wählen Sie Ihr Lager aus.
Abfrage verwenden: Wählen Sie Abfrage aus.
Abfrage: Geben Sie die folgende Abfrage ein, um inkrementelle Daten zwischen dem letzten Wasserzeichen und dem neuen Wasserzeichen zu kopieren.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Konfigurieren Sie auf der Registerkarte " Ziel " Folgendes:
- Verbindung: Wählen Sie unter Lakehouse " Alle durchsuchen" aus, und wählen Sie Ihr Lakehouse aus der Liste aus, oder wählen Sie Ihr Lakehouse aus Fabric-Elementverbindungen aus.
- Lakehouse: Wählen Sie Ihr Lakehouse aus.
- Stammordner: Wählen Sie Dateien aus.
-
Dateipfad: Wählen Sie den Ordner aus, in dem Die kopierten Daten gespeichert werden sollen. Wählen Sie Durchsuchen aus, um Ihren Ordner auszuwählen. Öffnen Sie für den Dateinamen Dynamischen Inhalt hinzufügen, und geben Sie
@CONCAT('Incremental-', pipeline().RunId, '.txt')im geöffneten Fenster ein, um Dateinamen für Ihre kopierte Datendatei im Lakehouse zu erstellen. - Dateiformat: Wählen Sie den Formattyp Ihrer Daten aus.
Schritt 5: Hinzufügen einer Aktivität einer gespeicherten Prozedur
In diesem Schritt fügen Sie eine Aktivität für eine gespeicherte Prozedur hinzu, um den letzten Wasserzeichenwert für die nächste Pipeline-Ausführung zu aktualisieren.
Wählen Sie auf der oberen Leiste Aktivitäten und dann Gespeicherte Prozedur aus, um eine Aktivität der gespeicherten Prozeduren hinzuzufügen.
Benennen Sie auf der Registerkarte Allgemein diese Aktivität in StoredProceduretoWriteWatermarkActivity um.
Verbinden Sie die grüne Ausgabe (bei Erfolg) der Kopieraktivität mit der Aktivität der gespeicherten Prozedur.
Konfigurieren Sie auf der Registerkarte "Einstellungen " Folgendes:
Data Warehouse: Wählen Sie Ihr Data Warehouse aus.
Name der gespeicherten Prozedur: Wählen Sie die gespeicherte Prozedur aus, die Sie in Ihrem Data Warehouse erstellt haben: [dbo].[ usp_write_watermark].
Erweitern Sie Parameter gespeicherter Prozeduren. Um Werte für die Parameter der gespeicherten Prozedur festzulegen, wählen Sie "Importieren" aus, und geben Sie die folgenden Werte für die Parameter ein:
Name type Wert LastModifiedtime Datetime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Schritt 6: Ausführen der Pipeline und Überwachen des Ergebnisses
Wählen Sie auf der oberen Leiste unter der Registerkarte "Start" die Option "Ausführen" aus. Wählen Sie dann "Speichern" und "Ausführen". Die Pipeline wird gestartet, und Sie können sie auf der Registerkarte " Ausgabe " überwachen.
Wechseln Sie zu Ihrem Lakehouse, und Sie finden die Datendatei unter dem ordner, den Sie ausgewählt haben. Sie können die Datei auswählen, um eine Vorschau der kopierten Daten anzuzeigen.
Fügen Sie weitere Daten hinzu, um die Ergebnisse des inkrementellen Kopierens anzuzeigen.
Nachdem Sie die erste Pipelineausführung abgeschlossen haben, fügen wir ihrer Data Warehouse-Quelltabelle weitere Daten hinzu, um festzustellen, ob diese Pipeline Ihre inkrementellen Daten kopieren kann.
Schritt 1: Hinzufügen weiterer Daten zur Quelle
Fügen Sie neue Daten in Ihr Data Warehouse ein, indem Sie die folgende Abfrage ausführen:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Die aktualisierten Daten für data_source_table sind die folgenden:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Schritt 2: Eine weitere Pipeline-Ausführung anstoßen und das Ergebnis überwachen
Wechseln Sie zurück zur Pipelineseite. Wählen Sie auf der oberen Leiste erneut unter der Registerkarte "Start" die Option "Ausführen" aus. Die Pipeline wird gestartet, und Sie können sie unter "Ausgabe" überwachen.
Wechseln Sie zu Ihrem Lakehouse, und Sie finden die neue kopierte Datendatei unter dem von Ihnen ausgewählten Ordner. Sie können die Datei auswählen, um eine Vorschau der kopierten Daten anzuzeigen. Ihre inkrementellen Daten werden in dieser Datei angezeigt.
Zugehöriger Inhalt
Als nächstes erfahren Sie mehr über das Kopieren von Azure Blob Storage in das Lakehouse.