教程: COPY INTO 使用 Spark SQL

Databricks 建议对包含数千个文件的数据源使用 COPY INTO 命令进行增量和批量数据加载。 Databricks 建议对高级用例使用 自动加载程序

在本教程中,使用 COPY INTO 命令将数据从云对象存储加载到 Azure Databricks 工作区中的表中。

要求

  • 访问计算资源。 请参阅 计算
  • 可将数据写入到的位置;此演示使用 DBFS 根目录作为示例,但 Databricks 建议使用 Unity 目录配置的外部存储位置。 请参阅 使用 Unity 目录连接到云对象存储

步骤 1. 配置环境并创建数据生成器

本教程假定基本熟悉 Azure Databricks 和默认工作区配置。 如果无法运行提供的代码,请联系工作区管理员,以确保你有权访问计算资源和可以写入数据的位置。

请注意,提供的代码使用参数 source 来指定要配置为 COPY INTO 数据源的位置。 如本文所述,此代码指向 DBFS 根目录中的位置。 如果对外部对象存储位置具有写入权限,请将源字符串的 dbfs:/ 部分替换为对象存储的路径。 由于此代码块还会执行递归删除来重置此演示,因此请确保不要将此项指向生产数据,并且 /user/{username}/copy-into-demo 保留嵌套目录以避免覆盖或删除现有数据。

  1. 创建新笔记本 并将其 附加到计算资源

  2. 复制并运行以下代码以重置本教程中使用的存储位置和数据库:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. 复制并运行以下代码以配置将用于随机生成数据的某些表和函数:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

步骤 2:将示例数据写入云存储

Azure Databricks 上很少写入 Delta Lake 以外的数据格式。 此处提供的代码将写入 JSON,模拟可能将另一个系统的结果转储到对象存储的外部系统。

  1. 复制并运行以下代码以编写一批原始 JSON 数据:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

步骤 3:使用 COPY INTO 以幂等方式加载 JSON 数据

必须先创建目标 Delta Lake 表,然后才能使用 COPY INTO。 你在 CREATE TABLE 语句中只需提供表名,无需提供其他内容。

  1. 复制并运行以下代码以创建目标 Delta 表并从源加载数据:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

由于此操作是幂等的,因此可以多次运行该操作,但数据只会加载一次。

步骤 4:预览表的内容

可以运行简单的 SQL 查询来手动查看此表的内容。

  1. 复制并执行以下代码以预览表:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

步骤 5:加载更多数据和预览结果

可以多次重新运行步骤 2-4,以将新批随机原始 JSON 数据加载到源中,并用 COPY INTO 以幂等方式将其加载到 Delta Lake,并预览结果。 尝试无序地或多次运行这些步骤,以模拟多个批次的原始数据被写入或在没有新数据到来的情况下多次执行COPY INTO

步骤 6:整理教程

完成本教程后,如果不再需要保留关联资源,则可以清理这些资源。

复制并运行以下代码以删除数据库、表和删除所有数据:

%python
# Drop database and tables and remove data

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True)

其他资源