이 튜토리얼에서는 Azure Cosmos DB Spark 커넥터를 사용하여 Azure Cosmos DB for NoSQL 계정에서 데이터를 읽고 쓰는 방법을 설명합니다. 본 튜토리얼은 Azure Databricks 및 Jupyter Notebook을 활용하여 Spark에서 API for NoSQL을 통합하는 과정을 안내합니다. 이 튜토리얼에서는 Python과 Scala에 중점을 두지만, Spark에서 지원하는 다른 언어나 인터페이스도 사용할 수 있습니다.
이 튜토리얼에서는 다음을 배우게 됩니다:
- Spark와 Jupyter Notebook을 사용하여 NoSQL용 API 계정에 연결합니다.
- 데이터베이스 및 컨테이너 리소스를 만듭니다.
- 컨테이너에 데이터를 수집합니다.
- 컨테이너의 데이터를 쿼리합니다.
- 컨테이너에 있는 항목들에 대해 일반적인 작업을 수행합니다.
필수 조건
- 기존 NoSQL용 Azure Cosmos DB 계정.
- 기존 Azure 구독이 있는 경우 새 계정을 만듭니다.
- 기존 Azure Databricks 작업 영역
Spark 및 Jupyter를 사용하여 연결
기존 Azure Databricks 작업 영역에서 Apache Spark 3.4.x를 사용하여 Azure Cosmos DB for NoSQL 계정에 연결할 준비가 된 컴퓨팅 클러스터를 생성합니다.
Azure Databricks 작업 영역을 엽니다.
작업 영역 인터페이스에서 새 클러스터를 만듭니다. 최소한 다음 설정을 사용하여 클러스터를 구성합니다.
버전 가치 런타임 버전 13.3 LTS (Scala 2.12, Spark 3.4.1) 작업 영역 인터페이스를 사용하여 Maven Central에서 Maven 패키지를 그룹 ID가
com.azure.cosmos.spark인 것으로 검색합니다. 접두사 이 붙은azure-cosmos-spark_3-4를 사용하여 Spark 3.4용 패키지를 클러스터에 설치합니다.마지막으로 새 Notebook을 만듭니다.
팁 (조언)
기본적으로 Notebook은 최근 만들어진 클러스터에 연결됩니다.
Notebook에서 NoSQL 계정의 엔드포인트, 데이터베이스 이름, 컨테이너 이름을 포함한 OLTP(온라인 트랜잭션 처리) 구성 설정을 지정합니다.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
데이터베이스 및 컨테이너 만들기
카탈로그 API를 사용하면 데이터베이스 및 컨테이너와 같은 계정 리소스를 관리할 수 있습니다. 이어서 OLTP를 통해 컨테이너 리소스 안의 데이터를 관리할 수 있습니다.
Spark를 사용하여 API for NoSQL 리소스를 관리하도록 카탈로그 API를 구성합니다.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))이름이
cosmicworks인 새 데이터베이스를CREATE DATABASE IF NOT EXISTS를 이용하여 만듭니다.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")이름이
products인 새 컨테이너를CREATE TABLE IF NOT EXISTS를 이용하여 만듭니다. 파티션 키 경로를/category로 설정하고 초당 최대1000요청 단위(RU) 처리량을 사용하여 자동 크기 조정 기능으로 처리량을 설정해야 합니다.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))계층적 파티션 키 구성을 활용하여
employees라는 새로운 컨테이너를 만들 수 있습니다. 파티션 키 경로 집합으로/organization,/department및/team을 사용합니다. 특정 순서를 따릅니다. 또한 처리량을 수동 RU 수400로 설정합니다.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))Notebook 셀을 실행하여 데이터베이스 및 컨테이너가 NoSQL용 API 계정 내에서 생성되었는지 확인합니다.
데이터 수집
샘플 데이터 세트를 만듭니다. 이어서 OLTP를 활용하여 해당 데이터를 NoSQL용 API 컨테이너에 수집합니다.
샘플 데이터 세트를 만듭니다.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )spark.createDataFrame및 저장된 OLTP 구성을 활용하여 대상 컨테이너에 샘플 데이터를 추가합니다.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
쿼리 데이터
OLTP 데이터를 데이터 프레임에 로드한 다음, 해당 데이터에 대한 일반적인 쿼리를 수행합니다. 여러 구문을 사용하여 데이터를 필터링하거나 쿼리할 수 있습니다.
OLTP 데이터를 데이터 프레임 객체에 로드하기 위해서는
spark.read을 사용합니다. 이 튜토리얼의 앞부분에서 사용했던 것과 동일한 구성을 사용합니다. 또한spark.cosmos.read.inferSchema.enabled를true로 설정하여 Spark 커넥터는 기존 항목을 샘플링하여 스키마를 유추할 수 있습니다.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()printSchema를 사용하여 데이터 프레임에 불러온 데이터의 스키마를 렌더링합니다.# Render schema df.printSchema()// Render schema df.printSchema()quantity열이20보다 작은 데이터 행을 렌더링합니다.where및show함수를 사용하여 이 쿼리를 수행합니다.# Render filtered data df.where("quantity < 20") \ .show()// Render filtered data df.where("quantity < 20") .show()clearance열이true인 첫 번째 데이터 행을 렌더링합니다.filter함수를 사용하여 이 쿼리를 수행합니다.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)필터링이나 잘림 없이 데이터 행 5개를 렌더링합니다.
show함수를 사용하여 렌더링되는 행의 모양과 수를 사용자 지정합니다.# Render five rows of unfiltered and untruncated data df.show(5, False)// Render five rows of unfiltered and untruncated data df.show(5, false)다음과 같은 Raw NoSQL 쿼리 문자열을 통해 데이터를 쿼리할 수 있습니다.
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
일반 작업 수행
Spark에서 NoSQL 데이터용 API를 사용할 때 부분 업데이트를 수행하거나 원시 JSON으로 데이터를 처리할 수 있습니다.
항목의 부분적인 업데이트를 수행하려면 다음 단계를 따르세요.
기존
config구성 변수를 복사한 다음, 새로운 복사본의 속성을 수정합니다. 특히 쓰기 전략을ItemPatch로 구성합니다. 그런 다음 대량 지원을 사용하지 않도록 설정합니다. 열에 매핑된 작업 설정을 진행합니다. 마지막으로 기본 작업 유형을Set로 설정합니다.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )이 패치 작업의 일환으로 대상 항목의 파티션 키와 고유 식별자에 대한 변수를 생성합니다.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"패치할 객체 세트를 생성하여 대상 항목을 지정하고 수정해야 할 필드를 지정합니다.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )패치 개체 컬렉션을 사용하여 데이터 프레임을 생성합니다. 패치 작업을 수행하려면
write를 사용합니다.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()쿼리를 실행해서 패치 작업의 결과를 검토합니다. 이제 항목의 이름을
Yamba New Surfboard로 지정하고, 다른 변경은 하지 않아야 합니다.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
원시 JSON 데이터를 처리하려면 다음 단계를 따르세요.
기존
config구성 변수를 복사한 다음, 새로운 복사본의 속성을 수정합니다. 특히 대상 컨테이너를employees로 변경합니다. 그런 다음contacts열 또는 필드를 구성하여 원시 JSON 데이터를 사용하도록 설정합니다.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )컨테이너에 수집할 직원 세트를 만듭니다.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )데이터 프레임 생성 후
write를 이용하여 직원 데이터를 수집합니다.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()show를 사용하여 데이터 프레임에서 데이터를 렌더링합니다. 출력에서contacts열이 원시 JSON인지 확인합니다.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
관련 콘텐츠
- Apache Spark
- Azure Cosmos DB 카탈로그 API
- 구성 매개 변수 참조
- Azure Cosmos DB Spark 커넥터 샘플
- Spark 2.4에서 Spark 3.*으로 마이그레이션
- 더 이상 사용되지 않는 버전:
- 더 이상 사용할 수 있는 Azure Databricks, Azure Synapse 또는 Azure HDInsight에서 지원되는 Spark 3.1 또는 3.2 런타임이 없으므로 Spark 3.1 및 3.2용 Azure Cosmos DB Spark 커넥터는 더 이상 사용되지 않습니다.
- Spark 3.1에서 업데이트하는 마이그레이션 가이드
- Spark 3.2에서 업데이트하는 마이그레이션 가이드
- 버전 호환성:
- 릴리스 정보:
- 다운로드 링크: