Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Neste tutorial, você usa o conector Azure Cosmos DB Spark para ler ou gravar dados de uma conta do Azure Cosmos DB para NoSQL. Este tutorial usa o Azure Databricks e um bloco de anotações Jupyter para ilustrar como integrar com a API para NoSQL do Spark. Este tutorial se concentra em Python e Scala, embora você possa usar qualquer linguagem ou interface suportada pelo Spark.
Neste tutorial, aprenderás como:
- Conecte-se a uma API para conta NoSQL usando o Spark e um bloco de anotações Jupyter.
- Crie recursos de banco de dados e contêiner.
- Ingerir dados para o recipiente.
- Consultar dados no contêiner.
- Execute operações comuns em itens no contêiner.
Pré-requisitos
- Uma conta existente do Azure Cosmos DB para NoSQL.
- Se você tiver uma assinatura existente do Azure, crie uma nova conta.
- Um espaço de trabalho existente do Azure Databricks.
Conecte-se usando o Spark e o Jupyter
Use seu espaço de trabalho existente do Azure Databricks para criar um cluster de computação pronto para usar o Apache Spark 3.4.x para se conectar à sua conta do Azure Cosmos DB para NoSQL.
Abra seu espaço de trabalho do Azure Databricks.
Na interface do espaço de trabalho, crie um novo cluster. Configure o cluster com estas configurações, no mínimo:
Versão Valor Versão em tempo de execução 13,3 LTS (Scala 2.12, Spark 3.4.1) Use a interface do espaço de trabalho para procurar pacotes Maven do Maven Central com uma ID de Grupo de
com.azure.cosmos.spark. Instale o pacote especificamente para o Spark 3.4 com um ID de Artefato prefixado para o cluster.Por fim, crie um novo bloco de anotações.
Sugestão
Por padrão, o bloco de anotações é anexado ao cluster criado recentemente.
No notebook, defina as configurações de OLTP (processamento de transações online) para o endpoint da conta NoSQL, o nome da base de dados e o nome do contêiner.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Criar uma base de dados e um contentor
Use a API de catálogo para gerenciar recursos de conta, como bancos de dados e contêineres. Em seguida, você pode usar OLTP para gerenciar dados dentro dos recursos de contêiner.
Configure a API de catálogo para gerenciar a API para recursos NoSQL usando o Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))Crie um novo banco de dados nomeado
cosmicworksusandoCREATE DATABASE IF NOT EXISTSo .# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")Crie um novo contêiner nomeado
productsusandoCREATE TABLE IF NOT EXISTS. Certifique-se de definir o caminho da chave de partição para/categorye habilitar a taxa de transferência de dimensionamento automático com uma taxa de transferência máxima de1000unidades de solicitação (RUs) por segundo.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))Crie outro contêiner nomeado
employeesusando uma configuração de chave de partição hierárquica. Use/organization,/department, e/teamcomo o conjunto de caminhos de chave de partição. Siga essa ordem específica. Além disso, defina a taxa de transferência para um valor manual de400RUs.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))Execute as células do notebook para validar se o banco de dados e os containers são criados na sua conta de API para NoSQL.
Ingerir dados
Crie um conjunto de dados de exemplo. Em seguida, use OLTP para ingerir esses dados para a API para contêiner NoSQL.
Crie um conjunto de dados de exemplo.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )Utilize
spark.createDataFramee a configuração OLTP salva anteriormente para adicionar dados de exemplo ao contêiner de destino.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Consultar dados
Carregue dados OLTP em um quadro de dados para executar consultas comuns nos dados. Você pode usar várias sintaxes para filtrar ou consultar dados.
Use
spark.readpara carregar os dados OLTP em um objeto de quadro de dados. Use a mesma configuração usada anteriormente neste tutorial. Além disso, definaspark.cosmos.read.inferSchema.enabledcomotruepara permitir que o conector Spark infera o esquema por amostragem de itens existentes.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()Renderize o esquema dos dados carregados no quadro de dados usando
printSchema.# Render schema df.printSchema()// Render schema df.printSchema()Renderizar linhas de dados onde a coluna
quantityé menor que20. Utilize as funçõeswhereeshowpara executar esta consulta.# Render filtered data df.where("quantity < 20") \ .show()// Render filtered data df.where("quantity < 20") .show()Renderize a primeira linha de dados em que a coluna
clearancesejatrue. Use afilterfunção para executar essa consulta.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)Renderize cinco linhas de dados sem filtro ou truncamento. Use a
showfunção para personalizar a aparência e o número de linhas que são renderizadas.# Render five rows of unfiltered and untruncated data df.show(5, False)// Render five rows of unfiltered and untruncated data df.show(5, false)Consulte seus dados usando esta cadeia de caracteres de consulta NoSQL bruta:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Executar operações comuns
Ao trabalhar com API para dados NoSQL no Spark, você pode executar atualizações parciais ou trabalhar com dados como JSON bruto.
Para executar uma atualização parcial de um item:
Copie a variável de configuração existente
confige modifique as propriedades na nova cópia. Especificamente, configure a estratégia de gravação comoItemPatch. Em seguida, desative a função de suporte em massa. Defina as colunas e as operações mapeadas. Finalmente, defina o tipo de operação padrão comoSet.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )Crie variáveis para a chave de partição do item e o identificador exclusivo que você pretende direcionar como parte desta operação de patch.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"Crie um conjunto de objetos de patch para especificar o item de destino e especificar campos que devem ser modificados.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )Crie um quadro de dados usando o conjunto de objetos de patch. Use
writepara executar a operação de patch.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()Execute uma consulta para revisar os resultados da operação de patch. O item agora deve ser nomeado
Yamba New Surfboardsem outras alterações.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Para trabalhar com dados JSON brutos:
Copie a variável de configuração existente
confige modifique as propriedades na nova cópia. Especificamente, altere o contêiner de destino paraemployees. Em seguida, configure acontactscoluna/campo para usar dados JSON brutos.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )Crie um conjunto de funcionários para integrar no contentor.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )Crie um quadro de dados e use
writepara ingerir os dados do funcionário.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()Renderizar os dados do quadro de dados usando
show. Observe que acontactscoluna é JSON bruto na saída.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Conteúdo relacionado
- Apache Spark
- API de catálogo do Azure Cosmos DB
- Referência do parâmetro de configuração
- Exemplos de conector do Azure Cosmos DB Spark
- Migrar do Spark 2.4 para o Spark 3.*
- Versões preteridas:
- O Azure Cosmos DB Spark Connector para Spark 3.1 e 3.2 foi preterido, porque não há mais tempos de execução do Spark 3.1 ou 3.2 com suporte no Azure Databricks, Azure Synapse ou Azure HDInsight.
- Guia de migração para atualização do Spark 3.1
- Guia de migração para atualização do Spark 3.2
- Compatibilidade de versão:
- Notas de versão:
- Links para download: