Compartir a través de


Tutorial: COPY INTO con Spark SQL

Databricks recomienda usar el COPY INTO comando para la carga incremental y masiva de datos para orígenes de datos que contienen miles de archivos. Databricks recomienda usar Auto Loader para casos de uso avanzados.

En este tutorial, usará el COPY INTO comando para cargar datos desde el almacenamiento de objetos en la nube en una tabla del área de trabajo de Azure Databricks.

Requisitos

Paso 1. Configuración del entorno y creación de un generador de datos

En este tutorial se da por supuesto la familiaridad básica con Azure Databricks y una configuración predeterminada del área de trabajo. Si no puede ejecutar el código proporcionado, póngase en contacto con el administrador del área de trabajo para asegurarse de que tiene acceso a los recursos de proceso y a una ubicación en la que puede escribir datos.

Tenga en cuenta que el código proporcionado usa un source parámetro para especificar la ubicación que configurará como origen COPY INTO de datos. Como se ha escrito, este código apunta a una ubicación en la raíz de DBFS. Si tiene permisos de escritura en una ubicación de almacenamiento de objetos externos, reemplace la dbfs:/ parte de la cadena de origen por la ruta de acceso al almacenamiento de objetos. Dado que este bloque de código también realiza una eliminación recursiva para restablecer esta demostración, asegúrese de que esto no señala a los datos de producción y que conserva el directorio anidado /user/{username}/copy-into-demo para evitar sobrescribir o eliminar los datos existentes.

  1. Cree un notebook y conéctelo a un recurso de cálculo.

  2. Copie y ejecute el código siguiente para restablecer la ubicación de almacenamiento y la base de datos usadas en este tutorial:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Copie y ejecute el código siguiente para configurar algunas tablas y funciones que se usarán para generar datos aleatoriamente:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Paso 2: Escribir los datos de ejemplo en el almacenamiento en la nube

Escribir en formatos de datos distintos de Delta Lake es poco frecuente en Azure Databricks. El código proporcionado aquí escribe en JSON, simulando un sistema externo que podría volcar los resultados de otro sistema en el almacenamiento de objetos.

  1. Copie y ejecute el código siguiente para escribir un lote de datos JSON sin procesar:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Paso 3: Use COPY INTO para cargar datos JSON idempotentemente

Debe crear antes una tabla Delta Lake de destino para poder usar COPY INTO. No es necesario proporcionar ningún otro dato que no sea el nombre de la tabla en su instrucción CREATE TABLE.

  1. Copie y ejecute el código siguiente para crear la tabla Delta de destino y cargar datos desde el origen:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Dado que esta acción es idempotente, puede ejecutarla varias veces, pero los datos solo se cargarán una vez.

Paso 4: Vista previa del contenido de la tabla

Puede ejecutar una consulta SQL sencilla para revisar manualmente el contenido de esta tabla.

  1. Copie y ejecute el código siguiente para obtener una vista previa de la tabla:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Paso 5: Cargar más datos y obtener una vista previa de los resultados

Puede volver a ejecutar los pasos del 2 al 4 muchas veces para colocar nuevos lotes de datos JSON aleatorios sin procesar en el origen, cargarlos de forma idempotente en Delta Lake con COPY INTO y obtener una vista previa de los resultados. Intente ejecutar estos pasos desordenados o varias veces para simular que se escriben varios lotes de datos sin procesar o ejecutar COPY INTO varias veces sin que hayan llegado nuevos datos.

Paso 6: Tutorial de limpieza

Cuando haya terminado con este tutorial, puede limpiar los recursos asociados si ya no desea mantenerlos.

Copie y ejecute el código siguiente para quitar la base de datos, las tablas y quitar todos los datos:

%python
# Drop database and tables and remove data

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True)

Recursos adicionales