Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Databricks recomienda usar el COPY INTO comando para la carga incremental y masiva de datos para orígenes de datos que contienen miles de archivos. Databricks recomienda usar Auto Loader para casos de uso avanzados.
En este tutorial, usará el COPY INTO comando para cargar datos desde el almacenamiento de objetos en la nube en una tabla del área de trabajo de Azure Databricks.
Requisitos
- Acceso a un recurso informático. Consulte Computación.
- Una ubicación en la que puede escribir datos; esta demostración usa la raíz de DBFS como ejemplo, pero Databricks recomienda una ubicación de almacenamiento externa configurada con El catálogo de Unity. Consulte Conexión al almacenamiento de objetos en la nube mediante el catálogo de Unity.
Paso 1. Configuración del entorno y creación de un generador de datos
En este tutorial se da por supuesto la familiaridad básica con Azure Databricks y una configuración predeterminada del área de trabajo. Si no puede ejecutar el código proporcionado, póngase en contacto con el administrador del área de trabajo para asegurarse de que tiene acceso a los recursos de proceso y a una ubicación en la que puede escribir datos.
Tenga en cuenta que el código proporcionado usa un source parámetro para especificar la ubicación que configurará como origen COPY INTO de datos. Como se ha escrito, este código apunta a una ubicación en la raíz de DBFS. Si tiene permisos de escritura en una ubicación de almacenamiento de objetos externos, reemplace la dbfs:/ parte de la cadena de origen por la ruta de acceso al almacenamiento de objetos. Dado que este bloque de código también realiza una eliminación recursiva para restablecer esta demostración, asegúrese de que esto no señala a los datos de producción y que conserva el directorio anidado /user/{username}/copy-into-demo para evitar sobrescribir o eliminar los datos existentes.
Copie y ejecute el código siguiente para restablecer la ubicación de almacenamiento y la base de datos usadas en este tutorial:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)Copie y ejecute el código siguiente para configurar algunas tablas y funciones que se usarán para generar datos aleatoriamente:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
Paso 2: Escribir los datos de ejemplo en el almacenamiento en la nube
Escribir en formatos de datos distintos de Delta Lake es poco frecuente en Azure Databricks. El código proporcionado aquí escribe en JSON, simulando un sistema externo que podría volcar los resultados de otro sistema en el almacenamiento de objetos.
Copie y ejecute el código siguiente para escribir un lote de datos JSON sin procesar:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
Paso 3: Use COPY INTO para cargar datos JSON idempotentemente
Debe crear antes una tabla Delta Lake de destino para poder usar COPY INTO. No es necesario proporcionar ningún otro dato que no sea el nombre de la tabla en su instrucción CREATE TABLE.
Copie y ejecute el código siguiente para crear la tabla Delta de destino y cargar datos desde el origen:
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Dado que esta acción es idempotente, puede ejecutarla varias veces, pero los datos solo se cargarán una vez.
Paso 4: Vista previa del contenido de la tabla
Puede ejecutar una consulta SQL sencilla para revisar manualmente el contenido de esta tabla.
Copie y ejecute el código siguiente para obtener una vista previa de la tabla:
-- Review updated table SELECT * FROM user_ping_target
Paso 5: Cargar más datos y obtener una vista previa de los resultados
Puede volver a ejecutar los pasos del 2 al 4 muchas veces para colocar nuevos lotes de datos JSON aleatorios sin procesar en el origen, cargarlos de forma idempotente en Delta Lake con COPY INTO y obtener una vista previa de los resultados. Intente ejecutar estos pasos desordenados o varias veces para simular que se escriben varios lotes de datos sin procesar o ejecutar COPY INTO varias veces sin que hayan llegado nuevos datos.
Paso 6: Tutorial de limpieza
Cuando haya terminado con este tutorial, puede limpiar los recursos asociados si ya no desea mantenerlos.
Copie y ejecute el código siguiente para quitar la base de datos, las tablas y quitar todos los datos:
%python
# Drop database and tables and remove data
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True)
Recursos adicionales
- Artículo de referencia COPY INTO