다음을 통해 공유


자습서: Spark SQL을 사용하여 COPY INTO

Databricks는 수천 개의 파일을 포함하는 데이터 원본에 대한 증분 및 대량 데이터 로드에 COPY INTO 명령을 사용하는 것이 좋습니다. Databricks는 고급 사용 사례에 자동 로더 사용하는 것이 좋습니다.

이 자습서에서는 이 명령을 사용하여 COPY INTO 클라우드 개체 스토리지의 데이터를 Azure Databricks 작업 영역의 테이블로 로드합니다.

요구 사항

1단계. 환경 구성 및 데이터 생성기 만들기

이 자습서에서는 Azure Databricks 및 기본 작업 영역 구성에 대한 기본적인 지식이 있다고 가정합니다. 제공된 코드를 실행할 수 없는 경우 작업 영역 관리자에게 문의하여 컴퓨팅 리소스 및 데이터를 쓸 수 있는 위치에 액세스할 수 있는지 확인합니다.

참고로, 제공된 코드는 source 매개 변수를 사용하여 COPY INTO 데이터 원본으로 구성할 위치를 지정합니다. 작성된 대로 이 코드는 DBFS 루트의 위치를 가리킵니다. 외부 개체 스토리지 위치에 대한 쓰기 권한이 있는 경우 원본 문자열의 dbfs:/ 부분을 개체 스토리지의 경로로 바꿉니다. 또한 이 코드 블록은 이 데모를 다시 설정하기 위해 재귀 삭제를 수행하므로 프로덕션 데이터를 가리키지 않고 기존 데이터를 덮어쓰거나 삭제하지 않도록 중첩된 디렉터리를 유지해야 /user/{username}/copy-into-demo 합니다.

  1. 새 Notebook을 만들고컴퓨팅 리소스에 연결합니다.

  2. 다음 코드를 복사하여 실행하여 이 자습서에 사용된 스토리지 위치 및 데이터베이스를 다시 설정합니다.

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. 다음 코드를 복사하여 실행하여 데이터를 임의로 생성하는 데 사용할 일부 테이블 및 함수를 구성합니다.

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

2단계: 클라우드 스토리지에 샘플 데이터 쓰기

Azure Databricks에서는 Delta Lake 이외의 데이터 형식에 쓰는 경우는 드뭅니다. 여기에 제공된 코드는 JSON에 기록하여 다른 시스템의 결과를 개체 스토리지로 덤프할 수 있는 외부 시스템을 시뮬레이션합니다.

  1. 다음 코드를 복사하여 실행하여 원시 JSON 데이터의 일괄 처리를 작성합니다.

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

3단계: COPY INTO를 사용하여 JSON 데이터를 항등적으로 로드하세요.

를 사용 COPY INTO하려면 먼저 대상 Delta Lake 테이블을 만들어야 합니다. 문에 테이블 이름 CREATE TABLE 외에는 아무것도 제공할 필요가 없습니다.

  1. 다음 코드를 복사하여 실행하여 대상 델타 테이블을 만들고 원본에서 데이터를 로드합니다.

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

이 작업은 동일한 결과를 내는 특성으로 인해 여러 번 실행해도 데이터는 한 번만 로드됩니다.

4단계: 테이블 내용 미리 보기

간단한 SQL 쿼리를 실행하여 이 테이블의 내용을 수동으로 검토할 수 있습니다.

  1. 다음 코드를 복사하여 실행하여 테이블을 미리 봅니다.

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

5단계: 더 많은 데이터 로드 및 결과 미리 보기

2~4단계를 여러 번 다시 실행하여 원본에 임의의 원시 JSON 데이터의 새로운 일괄 처리를 생성하고, COPY INTO을 사용하여 Delta Lake에 동일한 결과를 초래하도록 로드할 수 있으며, 결과를 미리 볼 수 있습니다. 이러한 단계를 순서대로 실행하거나 여러 번 실행하여 새 데이터가 도착하지 않고 여러 번 작성되거나 실행되는 원시 데이터의 여러 배치를 COPY INTO 시뮬레이션합니다.

6단계: 자습서 정리

이 자습서를 완료하면 더 이상 유지하지 않으려면 연결된 리소스를 정리할 수 있습니다.

다음 코드를 복사하여 실행하여 데이터베이스, 테이블을 삭제하고 모든 데이터를 제거합니다.

%python
# Drop database and tables and remove data

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True)

추가 리소스