Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Databricks empfiehlt, den COPY INTO Befehl für das inkrementelle Laden von Daten und massenweises Laden von Daten für Datenquellen zu verwenden, die Tausende von Dateien enthalten. Databricks empfiehlt, auto Loader für erweiterte Anwendungsfälle zu verwenden.
In diesem Lernprogramm verwenden Sie den COPY INTO Befehl, um Daten aus dem Cloudobjektspeicher in eine Tabelle in Ihrem Azure Databricks-Arbeitsbereich zu laden.
Anforderungen
- Zugriff auf eine Computeressource. Siehe Compute.
- Ein Speicherort, in den Sie Daten schreiben können; Diese Demo verwendet den DBFS-Stamm als Beispiel, aber Databricks empfiehlt einen externen Speicherort, der mit Unity Catalog konfiguriert ist. Siehe Verbinden mit Cloudobjektspeicher mithilfe des Unity-Katalogs.
Schritt 1. Konfigurieren Ihrer Umgebung und Erstellen eines Datengenerators
In diesem Lernprogramm wird die grundlegende Vertrautheit mit Azure Databricks und einer Standardarbeitsbereichskonfiguration vorausgesetzt. Wenn Sie den bereitgestellten Code nicht ausführen können, wenden Sie sich an den Arbeitsbereichsadministrator, um sicherzustellen, dass Sie Zugriff auf Computeressourcen und einen Speicherort haben, in den Sie Daten schreiben können.
Beachten Sie, dass der bereitgestellte Code einen source Parameter verwendet, um den Speicherort anzugeben, den Sie als COPY INTO Datenquelle konfigurieren. Wie geschrieben, verweist dieser Code auf einen Speicherort im DBFS-Stamm. Wenn Sie über Schreibberechtigungen für einen externen Objektspeicherort verfügen, ersetzen Sie den dbfs:/ Teil der Quellzeichenfolge durch den Pfad zum Objektspeicher. Da dieser Codeblock auch einen rekursiven Löschvorgang ausführt, um diese Demo zurückzusetzen, stellen Sie sicher, dass Sie dies nicht auf Produktionsdaten verweisen und das geschachtelte Verzeichnis beibehalten, /user/{username}/copy-into-demo um zu vermeiden, vorhandene Daten zu überschreiben oder zu löschen.
Erstellen Sie ein neues Notizbuch , und fügen Sie es an eine Computeressource an.
Kopieren Sie den folgenden Code, und führen Sie den folgenden Code aus, um den Speicherort und die Datenbank zurückzusetzen, die in diesem Lernprogramm verwendet wird:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)Kopieren Sie den folgenden Code, und führen Sie den folgenden Code aus, um einige Tabellen und Funktionen zu konfigurieren, die zum zufälligen Generieren von Daten verwendet werden:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
Schritt 2: Schreiben der Beispieldaten in Cloudspeicher
Das Schreiben in andere Datenformate als Delta Lake ist in Azure Databricks selten. Der hier bereitgestellte Code schreibt in JSON und simuliert ein externes System, das Ergebnisse von einem anderen System in den Objektspeicher speichern kann.
Kopieren Sie den folgenden Code, und führen Sie den folgenden Code aus, um einen Batch von unformatierten JSON-Daten zu schreiben:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
Schritt 3: Verwenden Sie COPY INTO zum idempotenten Laden von JSON-Daten.
Sie müssen eine Delta Lake-Zieltabelle erstellen, bevor Sie COPY INTO verwenden können. Sie müssen nichts anderes als einen Tabellennamen in Ihrer CREATE TABLE Anweisung angeben.
Kopieren Sie den folgenden Code und führen Sie ihn aus, um die Zieldelta-Tabelle zu erstellen und Daten aus Ihrer Quelle zu laden.
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Da diese Aktion idempotent ist, können Sie sie mehrmals ausführen, daten werden jedoch nur einmal geladen.
Schritt 4: Anzeigen einer Vorschau des Inhalts Ihrer Tabelle
Sie können eine einfache SQL-Abfrage ausführen, um den Inhalt dieser Tabelle manuell zu überprüfen.
Kopieren Sie den folgenden Code, und führen Sie den folgenden Code aus, um eine Vorschau der Tabelle anzuzeigen:
-- Review updated table SELECT * FROM user_ping_target
Schritt 5: Laden weiterer Daten und Vorschauergebnisse
Sie können die Schritte 2 bis 4 mehrmals erneut ausführen, um neue Batches zufälliger roher JSON-Daten in Ihrer Quelle abzulegen, sie idempotent in Delta Lake COPY INTO zu laden und die Ergebnisse in der Vorschau anzuzeigen. Versuchen Sie, diese Schritte außerhalb der Reihenfolge oder mehrmals auszuführen, um mehrere Batches von Rohdaten zu simulieren, die mehrmals geschrieben oder ausgeführt COPY INTO werden, ohne dass neue Daten eingegangen sind.
Schritt 6: Tutorial bereinigen
Wenn Sie mit diesem Tutorial fertig sind, können Sie die zugehörigen Ressourcen aufräumen, wenn Sie sie nicht mehr behalten möchten.
Kopieren Sie den folgenden Code, und führen Sie den folgenden Code aus, um die Datenbank, Tabellen zu löschen und alle Daten zu entfernen:
%python
# Drop database and tables and remove data
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True)
Weitere Ressourcen
- Der COPY INTO Referenzartikel