Partager via


Tutoriel : COPY INTO avec Spark SQL

Databricks recommande d’utiliser la COPY INTO commande pour le chargement incrémentiel et en bloc de données pour les sources de données qui contiennent des milliers de fichiers. Databricks recommande d’utiliser le chargeur automatique pour les cas d’usage avancés.

Dans ce tutoriel, vous utilisez la COPY INTO commande pour charger des données à partir du stockage d’objets cloud dans une table de votre espace de travail Azure Databricks.

Spécifications

Étape 1. Configurer votre environnement et créer un générateur de données

Ce tutoriel suppose une connaissance de base d’Azure Databricks et d’une configuration d’espace de travail par défaut. Si vous ne parvenez pas à exécuter le code fourni, contactez l’administrateur de votre espace de travail pour vous assurer que vous avez accès aux ressources de calcul et à un emplacement dans lequel vous pouvez écrire des données.

Notez que le code fourni utilise un source paramètre pour spécifier l’emplacement que vous allez configurer comme COPY INTO source de données. Comme écrit, ce code pointe vers un emplacement sur la racine DBFS. Si vous disposez d’autorisations d’écriture sur un emplacement de stockage d’objets externes, remplacez la dbfs:/ partie de la chaîne source par le chemin d’accès à votre stockage d’objets. Étant donné que ce bloc de code effectue également une suppression récursive pour réinitialiser cette démonstration, assurez-vous que vous ne pointez pas cela sur les données de production et que vous conservez le /user/{username}/copy-into-demo répertoire imbriqué pour éviter de remplacer ou de supprimer des données existantes.

  1. Créez un bloc-notes et attachez-le à une ressource de calcul.

  2. Copiez et exécutez le code suivant pour réinitialiser l’emplacement de stockage et la base de données utilisés dans ce tutoriel :

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Copiez et exécutez le code suivant pour configurer certaines tables et fonctions qui seront utilisées pour générer des données de manière aléatoire :

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Étape 2 : Écrire les exemples de données dans le stockage cloud

L’écriture dans des formats de données autres que Delta Lake est rare sur Azure Databricks. Le code fourni ici écrit au format JSON, dans une simulation de système externe qui peut transférer les résultats d'un autre système vers un stockage d'objets.

  1. Copiez et exécutez le code suivant pour écrire un lot de données JSON brutes :

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Étape 3 : Utiliser COPY INTO pour charger des données JSON idempotentes

Vous devez créer une table Delta Lake cible avant de pouvoir utiliser COPY INTO. Vous n’avez pas besoin de fournir autre chose qu’un nom de table dans votre CREATE TABLE instruction.

  1. Copiez et exécutez le code suivant pour créer votre table Delta cible et charger des données à partir de votre source :

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Étant donné que cette action est idempotente, vous pouvez l’exécuter plusieurs fois, mais les données ne seront chargées qu’une seule fois.

Étape 4 : Afficher un aperçu du contenu de votre table

Vous pouvez exécuter une requête SQL simple pour passer en revue manuellement le contenu de cette table.

  1. Copiez et exécutez le code suivant pour afficher un aperçu de votre table :

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Étape 5 : Charger plus de données et afficher un aperçu des résultats

Vous pouvez réexécuter les étapes 2 à 4 plusieurs fois pour atterrir de nouveaux lots de données JSON brutes aléatoires dans votre source, les charger de manière idempotente dans Delta Lake avec COPY INTOet afficher un aperçu des résultats. Essayez d’exécuter ces étapes hors d’ordre ou plusieurs fois pour simuler plusieurs lots de données brutes en cours d’écriture ou d’exécution COPY INTO plusieurs fois sans que de nouvelles données ne soient arrivées.

Étape 6 : Tutoriel de nettoyage

Lorsque vous avez terminé ce didacticiel, vous pouvez nettoyer les ressources associées si vous ne souhaitez plus les conserver.

Copiez et exécutez le code suivant pour supprimer la base de données, les tables et supprimer toutes les données :

%python
# Drop database and tables and remove data

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True)

Ressources supplémentaires