Compartilhar via


Tutorial: COPY INTO com o SPARK SQL

O Databricks recomenda que você use o comando COPY INTO para carregamento de dados incremental e em massa para fontes de dados que contêm milhares de arquivos. Databricks recomenda que você use Auto Loader para casos de uso avançados.

Neste tutorial, você usará o comando COPY INTO para carregar dados do armazenamento de objetos de nuvem em uma tabela em seu workspace do Azure Databricks.

Requisitos

Etapa 1. Configurar seu ambiente e criar um gerador de dados

Este tutorial pressupõe familiaridade básica com o Azure Databricks e uma configuração de workspace padrão. Se não for possível executar o código fornecido, entre em contato com o administrador do workspace para verificar se você tem acesso aos recursos de computação e a um local para o qual você pode gravar dados.

Observe que o código fornecido usa um source parâmetro para especificar o local que você configurará como sua COPY INTO fonte de dados. Conforme escrito, esse código aponta para um local na raiz do DBFS. Se você tiver permissões de gravação em um local de armazenamento de objeto externo, substitua a parte dbfs:/ da cadeia de caracteres de origem pelo caminho para o armazenamento de objetos. Como esse bloco de código também faz uma exclusão recursiva para redefinir essa demonstração, certifique-se de não apontar isso para os dados de produção e mantenha o diretório /user/{username}/copy-into-demo aninhado para evitar substituir ou excluir dados existentes.

  1. Crie um novo notebook e anexe-o a um recurso de computação.

  2. Copie e execute o seguinte código para redefinir o local de armazenamento e o banco de dados usados neste tutorial:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Copie e execute o seguinte código para configurar algumas tabelas e funções que serão usadas para gerar dados aleatoriamente:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Etapa 2: gravar os dados de exemplo no armazenamento em nuvem

Gravar em formatos de dados diferentes do Delta Lake é raro no Azure Databricks. O código fornecido aqui grava no JSON, simulando um sistema externo que pode despejar resultados de outro sistema no armazenamento de objetos.

  1. Copie e execute o seguinte código para gravar um lote de dados JSON brutos:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Etapa 3: usar COPY INTO para carregar dados JSON de modo idempotente

Você deve criar uma tabela Delta Lake de destino antes de poder usar COPY INTO. Você não precisa fornecer nada além de um nome de tabela na sua instrução CREATE TABLE.

  1. Copie e execute o seguinte código para criar sua tabela Delta de destino e carregar dados de sua origem:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Como essa ação é idempotente, você pode executá-la várias vezes, mas os dados só serão carregados uma vez.

Etapa 4: Visualizar o conteúdo da tabela

Você pode executar uma consulta SQL simples para examinar manualmente o conteúdo desta tabela.

  1. Copie e execute o seguinte código para visualizar sua tabela:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Etapa 5: Carregar mais dados e visualizar resultados

Você pode executar novamente as etapas de 2 a 4 várias vezes para obter novos lotes de dados JSON brutos aleatórios em sua origem, carregá-los de forma idempotente no Delta Lake com COPY INTO e visualizar os resultados. Tente executar essas etapas fora de ordem ou várias vezes para simular vários lotes de dados brutos sendo gravados ou executados COPY INTO várias vezes sem que novos dados tenham chegado.

Etapa 6: Tutorial de limpeza

Quando terminar este tutorial, você poderá limpar os recursos associados se não quiser mais mantê-los.

Copie e execute o seguinte código para remover o banco de dados, tabelas e remover todos os dados:

%python
# Drop database and tables and remove data

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True)

Recursos adicionais