Databricks는 수천 개의 파일을 포함하는 데이터 원본에 대한 증분 및 대량 데이터 로드에 COPY INTO 명령을 사용하는 것이 좋습니다. Databricks는 고급 사용 사례에 자동 로더 사용하는 것이 좋습니다.
이 자습서에서는 이 명령을 사용하여 COPY INTO 클라우드 개체 스토리지의 데이터를 Azure Databricks 작업 영역의 테이블로 로드합니다.
요구 사항
- 컴퓨팅 리소스에 대한 액세스. Compute를 참조하세요.
- 데이터를 쓸 수 있는 위치입니다. 이 데모에서는 DBFS 루트를 예로 사용하지만 Databricks는 Unity 카탈로그로 구성된 외부 스토리지 위치를 권장합니다. Unity 카탈로그를 사용하여 클라우드 개체 스토리지에 연결을 참조하세요.
1단계. 환경 구성 및 데이터 생성기 만들기
이 자습서에서는 Azure Databricks 및 기본 작업 영역 구성에 대한 기본적인 지식이 있다고 가정합니다. 제공된 코드를 실행할 수 없는 경우 작업 영역 관리자에게 문의하여 컴퓨팅 리소스 및 데이터를 쓸 수 있는 위치에 액세스할 수 있는지 확인합니다.
참고로, 제공된 코드는 source 매개 변수를 사용하여 COPY INTO 데이터 원본으로 구성할 위치를 지정합니다. 작성된 대로 이 코드는 DBFS 루트의 위치를 가리킵니다. 외부 개체 스토리지 위치에 대한 쓰기 권한이 있는 경우 원본 문자열의 dbfs:/ 부분을 개체 스토리지의 경로로 바꿉니다. 또한 이 코드 블록은 이 데모를 다시 설정하기 위해 재귀 삭제를 수행하므로 프로덕션 데이터를 가리키지 않고 기존 데이터를 덮어쓰거나 삭제하지 않도록 중첩된 디렉터리를 유지해야 /user/{username}/copy-into-demo 합니다.
다음 코드를 복사하여 실행하여 이 자습서에 사용된 스토리지 위치 및 데이터베이스를 다시 설정합니다.
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)다음 코드를 복사하여 실행하여 데이터를 임의로 생성하는 데 사용할 일부 테이블 및 함수를 구성합니다.
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
2단계: 클라우드 스토리지에 샘플 데이터 쓰기
Azure Databricks에서는 Delta Lake 이외의 데이터 형식에 쓰는 경우는 드뭅니다. 여기에 제공된 코드는 JSON에 기록하여 다른 시스템의 결과를 개체 스토리지로 덤프할 수 있는 외부 시스템을 시뮬레이션합니다.
다음 코드를 복사하여 실행하여 원시 JSON 데이터의 일괄 처리를 작성합니다.
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
3단계: COPY INTO를 사용하여 JSON 데이터를 항등적으로 로드하세요.
를 사용 COPY INTO하려면 먼저 대상 Delta Lake 테이블을 만들어야 합니다. 문에 테이블 이름 CREATE TABLE 외에는 아무것도 제공할 필요가 없습니다.
다음 코드를 복사하여 실행하여 대상 델타 테이블을 만들고 원본에서 데이터를 로드합니다.
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
이 작업은 동일한 결과를 내는 특성으로 인해 여러 번 실행해도 데이터는 한 번만 로드됩니다.
4단계: 테이블 내용 미리 보기
간단한 SQL 쿼리를 실행하여 이 테이블의 내용을 수동으로 검토할 수 있습니다.
다음 코드를 복사하여 실행하여 테이블을 미리 봅니다.
-- Review updated table SELECT * FROM user_ping_target
5단계: 더 많은 데이터 로드 및 결과 미리 보기
2~4단계를 여러 번 다시 실행하여 원본에 임의의 원시 JSON 데이터의 새로운 일괄 처리를 생성하고, COPY INTO을 사용하여 Delta Lake에 동일한 결과를 초래하도록 로드할 수 있으며, 결과를 미리 볼 수 있습니다. 이러한 단계를 순서대로 실행하거나 여러 번 실행하여 새 데이터가 도착하지 않고 여러 번 작성되거나 실행되는 원시 데이터의 여러 배치를 COPY INTO 시뮬레이션합니다.
6단계: 자습서 정리
이 자습서를 완료하면 더 이상 유지하지 않으려면 연결된 리소스를 정리할 수 있습니다.
다음 코드를 복사하여 실행하여 데이터베이스, 테이블을 삭제하고 모든 데이터를 제거합니다.
%python
# Drop database and tables and remove data
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True)
추가 리소스
- COPY INTO 참조 문서