다음을 통해 공유


튜토리얼: Spark를 이용해 Azure Cosmos DB for NoSQL로 연결

이 튜토리얼에서는 Azure Cosmos DB Spark 커넥터를 사용하여 Azure Cosmos DB for NoSQL 계정에서 데이터를 읽고 쓰는 방법을 설명합니다. 본 튜토리얼은 Azure Databricks 및 Jupyter Notebook을 활용하여 Spark에서 API for NoSQL을 통합하는 과정을 안내합니다. 이 튜토리얼에서는 Python과 Scala에 중점을 두지만, Spark에서 지원하는 다른 언어나 인터페이스도 사용할 수 있습니다.

이 튜토리얼에서는 다음을 배우게 됩니다:

  • Spark와 Jupyter Notebook을 사용하여 NoSQL용 API 계정에 연결합니다.
  • 데이터베이스 및 컨테이너 리소스를 만듭니다.
  • 컨테이너에 데이터를 수집합니다.
  • 컨테이너의 데이터를 쿼리합니다.
  • 컨테이너에 있는 항목들에 대해 일반적인 작업을 수행합니다.

필수 조건

  • 기존 NoSQL용 Azure Cosmos DB 계정.
  • 기존 Azure Databricks 작업 영역

Spark 및 Jupyter를 사용하여 연결

기존 Azure Databricks 작업 영역에서 Apache Spark 3.4.x를 사용하여 Azure Cosmos DB for NoSQL 계정에 연결할 준비가 된 컴퓨팅 클러스터를 생성합니다.

  1. Azure Databricks 작업 영역을 엽니다.

  2. 작업 영역 인터페이스에서 새 클러스터를 만듭니다. 최소한 다음 설정을 사용하여 클러스터를 구성합니다.

    버전 가치
    런타임 버전 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. 작업 영역 인터페이스를 사용하여 Maven Central에서 Maven 패키지를 그룹 IDcom.azure.cosmos.spark인 것으로 검색합니다. 접두사 이 붙은 azure-cosmos-spark_3-4를 사용하여 Spark 3.4용 패키지를 클러스터에 설치합니다.

  4. 마지막으로 새 Notebook을 만듭니다.

    팁 (조언)

    기본적으로 Notebook은 최근 만들어진 클러스터에 연결됩니다.

  5. Notebook에서 NoSQL 계정의 엔드포인트, 데이터베이스 이름, 컨테이너 이름을 포함한 OLTP(온라인 트랜잭션 처리) 구성 설정을 지정합니다.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

데이터베이스 및 컨테이너 만들기

카탈로그 API를 사용하면 데이터베이스 및 컨테이너와 같은 계정 리소스를 관리할 수 있습니다. 이어서 OLTP를 통해 컨테이너 리소스 안의 데이터를 관리할 수 있습니다.

  1. Spark를 사용하여 API for NoSQL 리소스를 관리하도록 카탈로그 API를 구성합니다.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. 이름이 cosmicworks 인 새 데이터베이스를 CREATE DATABASE IF NOT EXISTS를 이용하여 만듭니다.

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. 이름이 products 인 새 컨테이너를 CREATE TABLE IF NOT EXISTS를 이용하여 만듭니다. 파티션 키 경로를 /category로 설정하고 초당 최대 1000 요청 단위(RU) 처리량을 사용하여 자동 크기 조정 기능으로 처리량을 설정해야 합니다.

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. 계층적 파티션 키 구성을 활용하여 employees 라는 새로운 컨테이너를 만들 수 있습니다. 파티션 키 경로 집합으로 /organization, /department/team을 사용합니다. 특정 순서를 따릅니다. 또한 처리량을 수동 RU 수 400 로 설정합니다.

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Notebook 셀을 실행하여 데이터베이스 및 컨테이너가 NoSQL용 API 계정 내에서 생성되었는지 확인합니다.

데이터 수집

샘플 데이터 세트를 만듭니다. 이어서 OLTP를 활용하여 해당 데이터를 NoSQL용 API 컨테이너에 수집합니다.

  1. 샘플 데이터 세트를 만듭니다.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. spark.createDataFrame 및 저장된 OLTP 구성을 활용하여 대상 컨테이너에 샘플 데이터를 추가합니다.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

쿼리 데이터

OLTP 데이터를 데이터 프레임에 로드한 다음, 해당 데이터에 대한 일반적인 쿼리를 수행합니다. 여러 구문을 사용하여 데이터를 필터링하거나 쿼리할 수 있습니다.

  1. OLTP 데이터를 데이터 프레임 객체에 로드하기 위해서는 spark.read 을 사용합니다. 이 튜토리얼의 앞부분에서 사용했던 것과 동일한 구성을 사용합니다. 또한 spark.cosmos.read.inferSchema.enabledtrue로 설정하여 Spark 커넥터는 기존 항목을 샘플링하여 스키마를 유추할 수 있습니다.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. printSchema를 사용하여 데이터 프레임에 불러온 데이터의 스키마를 렌더링합니다.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. quantity 열이 20보다 작은 데이터 행을 렌더링합니다. whereshow 함수를 사용하여 이 쿼리를 수행합니다.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. clearance 열이 true인 첫 번째 데이터 행을 렌더링합니다. filter 함수를 사용하여 이 쿼리를 수행합니다.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. 필터링이나 잘림 없이 데이터 행 5개를 렌더링합니다. show 함수를 사용하여 렌더링되는 행의 모양과 수를 사용자 지정합니다.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. 다음과 같은 Raw NoSQL 쿼리 문자열을 통해 데이터를 쿼리할 수 있습니다. SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

일반 작업 수행

Spark에서 NoSQL 데이터용 API를 사용할 때 부분 업데이트를 수행하거나 원시 JSON으로 데이터를 처리할 수 있습니다.

  1. 항목의 부분적인 업데이트를 수행하려면 다음 단계를 따르세요.

    1. 기존 config 구성 변수를 복사한 다음, 새로운 복사본의 속성을 수정합니다. 특히 쓰기 전략을 ItemPatch로 구성합니다. 그런 다음 대량 지원을 사용하지 않도록 설정합니다. 열에 매핑된 작업 설정을 진행합니다. 마지막으로 기본 작업 유형을 Set로 설정합니다.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. 이 패치 작업의 일환으로 대상 항목의 파티션 키와 고유 식별자에 대한 변수를 생성합니다.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. 패치할 객체 세트를 생성하여 대상 항목을 지정하고 수정해야 할 필드를 지정합니다.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. 패치 개체 컬렉션을 사용하여 데이터 프레임을 생성합니다. 패치 작업을 수행하려면 write를 사용합니다.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. 쿼리를 실행해서 패치 작업의 결과를 검토합니다. 이제 항목의 이름을 Yamba New Surfboard 로 지정하고, 다른 변경은 하지 않아야 합니다.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. 원시 JSON 데이터를 처리하려면 다음 단계를 따르세요.

    1. 기존 config 구성 변수를 복사한 다음, 새로운 복사본의 속성을 수정합니다. 특히 대상 컨테이너를 employees로 변경합니다. 그런 다음 contacts 열 또는 필드를 구성하여 원시 JSON 데이터를 사용하도록 설정합니다.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. 컨테이너에 수집할 직원 세트를 만듭니다.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. 데이터 프레임 생성 후 write 를 이용하여 직원 데이터를 수집합니다.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. show를 사용하여 데이터 프레임에서 데이터를 렌더링합니다. 출력에서 contacts 열이 원시 JSON인지 확인합니다.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

다음 단계