Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Dans ce tutoriel, vous utilisez le connecteur Spark Azure Cosmos DB pour lire ou écrire des données à partir d’un compte Azure Cosmos DB for NoSQL. Ce tutoriel utilise Azure Databricks et un notebook Jupyter pour illustrer l’intégration à l’API pour NoSQL à partir de Spark. Ce tutoriel est axé sur Python et Scala, mais vous pouvez utiliser n’importe quel langage ou interface pris en charge par Spark.
Dans ce tutoriel, vous allez apprendre à :
- Connectez-vous à un compte API pour NoSQL à l’aide de Spark et d’un notebook Jupyter.
- Créer des ressources de base de données et de conteneur.
- Ingérez des données dans le conteneur.
- Interrogez les données du conteneur.
- Effectuez des opérations courantes sur les éléments du conteneur.
Prerequisites
- Un compte Azure Cosmos DB pour NoSQL existant.
- Si vous disposez d’un abonnement Azure, créez un compte.
- Un espace de travail Azure Databricks existant.
Se connecter à l’aide de Spark et Jupyter
Utilisez votre espace de travail Azure Databricks existant pour créer un cluster de calcul prêt à utiliser Apache Spark 3.4.x pour vous connecter à votre compte Azure Cosmos DB for NoSQL.
Ouvrez votre espace de travail Azure Databricks.
Dans l’interface de l’espace de travail, créez un cluster. Configurez le cluster avec ces paramètres, au minimum :
Version Valeur Version du runtime 13.3 LTS (Scala 2.12, Spark 3.4.1) Utilisez l’interface de l’espace de travail pour rechercher des packages Maven à partir de Maven Central avec un ID de groupe de
com.azure.cosmos.spark. Installez sur le cluster le package prévu spécifiquement pour Spark 3.4 dont l’ID d’artefact est précédé du préfixeazure-cosmos-spark_3-4.Pour finir, créez un notebook.
Conseil / Astuce
Par défaut, le notebook est attaché au cluster récemment créé.
Dans le notebook, définissez les paramètres de configuration OLTP (traitement transactionnel en ligne) pour le point de terminaison du compte NoSQL, le nom de la base de données et le nom du conteneur.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Créer une base de données et un conteneur
Utilisez l’API Catalogue pour gérer les ressources de compte telles que les bases de données et les conteneurs. Vous pouvez ensuite utiliser OLTP pour gérer les données au sein des ressources du conteneur.
Configurez l’API Catalogue afin de gérer les ressources d’API pour NoSQL en utilisant Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))Créez une base de données sous le nom
cosmicworksà l’aide deCREATE DATABASE IF NOT EXISTS.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")Créez un conteneur sous le nom
productsà l’aide deCREATE TABLE IF NOT EXISTS. Veillez à bien définir le chemin de la clé de partition sur/categoryet à activer le débit de mise à l’échelle automatique avec un débit maximal de1000unités de requête (RU) par seconde.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))Créez un autre conteneur sous le nom
employeesen utilisant une configuration de clé de partition hiérarchique. Utilisez/organization,/departmentet/teamcomme jeu de chemins de clé de partition. Suivez cet ordre spécifique. Définissez également manuellement le débit en400RUs.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))Exécutez les cellules de notebook pour vérifier que votre base de données et vos conteneurs sont créés dans votre compte API pour NoSQL.
Ingérer des données
Créer un exemple de jeu de données. Utilisez ensuite OLTP pour ingérer ces données dans le conteneur API pour NoSQL.
Créer un exemple de jeu de données.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )Utilisez
spark.createDataFrameet la configuration OLTP précédemment enregistrée pour ajouter des exemples de données au conteneur cible.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Rechercher des données
Chargez les données OLTP dans un DataFrame pour effectuer des requêtes courantes sur les données. Vous pouvez utiliser différentes syntaxes pour filtrer ou interroger les données.
Utilisez
spark.readpour charger les données OLTP dans un objet data-frame (trame de données). Utilisez la même configuration que celle que vous avez utilisée précédemment dans ce tutoriel. De même, affectez la valeurspark.cosmos.read.inferSchema.enabledàtruepour autoriser le connecteur Spark à déduire le schéma en échantillonnant les éléments existants.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()Affichez le schéma des données chargées dans la trame de données à l’aide de
printSchema.# Render schema df.printSchema()// Render schema df.printSchema()Affichez les lignes de données où la colonne
quantityest inférieure à20. Utilisez les fonctionswhereetshowpour effectuer cette requête.# Render filtered data df.where("quantity < 20") \ .show()// Render filtered data df.where("quantity < 20") .show()Affichez la première ligne de données où la colonne
clearanceesttrue. Utilisez la fonctionfilterpour effectuer cette requête.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)Affichez cinq lignes de données sans filtre ni troncation. Utilisez la fonction
showpour personnaliser l’apparence et le nombre de lignes affichées.# Render five rows of unfiltered and untruncated data df.show(5, False)// Render five rows of unfiltered and untruncated data df.show(5, false)Interrogez vos données à l’aide de cette chaîne de requête NoSQL brute :
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Effectuer des opérations courantes
Lorsque vous utilisez des données d’API pour NoSQL dans Spark, vous pouvez effectuer des mises à jour partielles ou utiliser des données sous forme de JSON brut.
Pour effectuer une mise à jour partielle d’un élément :
Copiez la variable de configuration existante
configet modifiez les propriétés dans la nouvelle copie. Plus précisément, configurez la stratégie d’écriture surItemPatch. Activez la prise en charge des opérations en bloc. Définissez les colonnes et les opérations mappées. Enfin, définissez le type d’opération par défaut surSet.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )Créez des variables pour la clé de partition d’élément et l’identificateur unique que vous envisagez de cibler dans le cadre de cette opération corrective.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"Créez un ensemble d’objets de correctif pour spécifier l’élément cible, et spécifiez les champs qui doivent être modifiés.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )Créez une trame de données en utilisant l’ensemble d’objets patch (correctif). Utilisez
writepour effectuer l’opération patch.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()Exécutez une requête pour examiner les résultats de l’opération corrective. L’élément doit maintenant être nommé
Yamba New Surfboardsans aucune autre modification.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Pour utiliser des données JSON brutes :
Copiez la variable de configuration existante
configet modifiez les propriétés dans la nouvelle copie. Plus précisément, remplacez le conteneur cible paremployees. Configurez ensuite la colonne/le champcontactspour utiliser les données JSON brutes.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )Créez un ensemble d’employés à ingérer dans le conteneur.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )Créez un DataFrame et utilisez
writepour ingérer les données des employés.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()Affichez les données de la trame de données à l’aide de
show. Notez que la colonnecontactsproduit du JSON brut dans la sortie.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Contenu connexe
- Apache Spark
- API Catalogue Azure Cosmos DB
- Informations de référence sur les paramètres de configuration
- Exemples de connecteurs Spark Azure Cosmos DB
- Migrer de Spark 2.4 vers Spark 3.*
- Versions déconseillées :
- Le connecteur Spark Azure Cosmos DB pour Spark 3.1 et 3.2 est déconseillé, car il n’existe plus de runtimes Spark 3.1 ou 3.2 pris en charge dans Azure Databricks, Azure Synapse ou Azure HDInsight.
- Guide de migration pour la mise à jour à partir de Spark 3.1
- Guide de migration pour la mise à jour à partir de Spark 3.2
- Compatibilité des versions :
- Notes de publication :
- Liens de téléchargement :