Freigeben über


Lernprogramm: COPY INTO mit Spark SQL

Databricks empfiehlt, den COPY INTO Befehl für das inkrementelle Laden von Daten und massenweises Laden von Daten für Datenquellen zu verwenden, die Tausende von Dateien enthalten. Databricks empfiehlt, auto Loader für erweiterte Anwendungsfälle zu verwenden.

In diesem Lernprogramm verwenden Sie den COPY INTO Befehl, um Daten aus dem Cloudobjektspeicher in eine Tabelle in Ihrem Azure Databricks-Arbeitsbereich zu laden.

Anforderungen

Schritt 1. Konfigurieren Ihrer Umgebung und Erstellen eines Datengenerators

In diesem Lernprogramm wird die grundlegende Vertrautheit mit Azure Databricks und einer Standardarbeitsbereichskonfiguration vorausgesetzt. Wenn Sie den bereitgestellten Code nicht ausführen können, wenden Sie sich an den Arbeitsbereichsadministrator, um sicherzustellen, dass Sie Zugriff auf Computeressourcen und einen Speicherort haben, in den Sie Daten schreiben können.

Beachten Sie, dass der bereitgestellte Code einen source Parameter verwendet, um den Speicherort anzugeben, den Sie als COPY INTO Datenquelle konfigurieren. Wie geschrieben, verweist dieser Code auf einen Speicherort im DBFS-Stamm. Wenn Sie über Schreibberechtigungen für einen externen Objektspeicherort verfügen, ersetzen Sie den dbfs:/ Teil der Quellzeichenfolge durch den Pfad zum Objektspeicher. Da dieser Codeblock auch einen rekursiven Löschvorgang ausführt, um diese Demo zurückzusetzen, stellen Sie sicher, dass Sie dies nicht auf Produktionsdaten verweisen und das geschachtelte Verzeichnis beibehalten, /user/{username}/copy-into-demo um zu vermeiden, vorhandene Daten zu überschreiben oder zu löschen.

  1. Erstellen Sie ein neues Notizbuch , und fügen Sie es an eine Computeressource an.

  2. Kopieren Sie den folgenden Code, und führen Sie den folgenden Code aus, um den Speicherort und die Datenbank zurückzusetzen, die in diesem Lernprogramm verwendet wird:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Kopieren Sie den folgenden Code, und führen Sie den folgenden Code aus, um einige Tabellen und Funktionen zu konfigurieren, die zum zufälligen Generieren von Daten verwendet werden:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Schritt 2: Schreiben der Beispieldaten in Cloudspeicher

Das Schreiben in andere Datenformate als Delta Lake ist in Azure Databricks selten. Der hier bereitgestellte Code schreibt in JSON und simuliert ein externes System, das Ergebnisse von einem anderen System in den Objektspeicher speichern kann.

  1. Kopieren Sie den folgenden Code, und führen Sie den folgenden Code aus, um einen Batch von unformatierten JSON-Daten zu schreiben:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Schritt 3: Verwenden Sie COPY INTO zum idempotenten Laden von JSON-Daten.

Sie müssen eine Delta Lake-Zieltabelle erstellen, bevor Sie COPY INTO verwenden können. Sie müssen nichts anderes als einen Tabellennamen in Ihrer CREATE TABLE Anweisung angeben.

  1. Kopieren Sie den folgenden Code und führen Sie ihn aus, um die Zieldelta-Tabelle zu erstellen und Daten aus Ihrer Quelle zu laden.

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Da diese Aktion idempotent ist, können Sie sie mehrmals ausführen, daten werden jedoch nur einmal geladen.

Schritt 4: Anzeigen einer Vorschau des Inhalts Ihrer Tabelle

Sie können eine einfache SQL-Abfrage ausführen, um den Inhalt dieser Tabelle manuell zu überprüfen.

  1. Kopieren Sie den folgenden Code, und führen Sie den folgenden Code aus, um eine Vorschau der Tabelle anzuzeigen:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Schritt 5: Laden weiterer Daten und Vorschauergebnisse

Sie können die Schritte 2 bis 4 mehrmals erneut ausführen, um neue Batches zufälliger roher JSON-Daten in Ihrer Quelle abzulegen, sie idempotent in Delta Lake COPY INTO zu laden und die Ergebnisse in der Vorschau anzuzeigen. Versuchen Sie, diese Schritte außerhalb der Reihenfolge oder mehrmals auszuführen, um mehrere Batches von Rohdaten zu simulieren, die mehrmals geschrieben oder ausgeführt COPY INTO werden, ohne dass neue Daten eingegangen sind.

Schritt 6: Tutorial bereinigen

Wenn Sie mit diesem Tutorial fertig sind, können Sie die zugehörigen Ressourcen aufräumen, wenn Sie sie nicht mehr behalten möchten.

Kopieren Sie den folgenden Code, und führen Sie den folgenden Code aus, um die Datenbank, Tabellen zu löschen und alle Daten zu entfernen:

%python
# Drop database and tables and remove data

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True)

Weitere Ressourcen