Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Tutorial verwenden Sie den Azure Cosmos DB Spark-Connector, um Daten aus einem Azure Cosmos DB for NoSQL-Konto zu lesen oder zu schreiben. In diesem Tutorial werden Azure Databricks und ein Jupyter Notebook verwendet, um zu veranschaulichen, wie sie die API für NoSQL mithilfe von Spark integrieren können. Dieses Tutorial konzentriert sich auf Python und Scala. Sie können allerdings jede beliebige Sprache oder Schnittstelle verwenden, die von Spark unterstützt wird.
In diesem Tutorial erfahren Sie, wie:
- Herstellen einer Verbindung mit einer API für ein NoSQL-Konto mithilfe von Spark und einem Jupyter Notebook
- Erstellen Sie Datenbank- und Containerressourcen.
- Erfassen von Daten im Container
- Abfragen von Daten im Container
- Ausführen allgemeiner Vorgänge für Elemente im Container
Voraussetzungen
- Ein vorhandenes Azure Cosmos DB-Konto für NoSQL.
- Falls Sie bereits über ein vorhandenes Azure-Abonnement verfügen, erstellen Sie ein neues Konto.
- Ein vorhandener Azure Databricks-Arbeitsbereich
Herstellen einer Verbindung mithilfe von Spark und Jupyter
Verwenden Sie Ihren vorhandenen Azure Databricks-Arbeitsbereich, um einen Computecluster zu erstellen, der Apache Spark 3.4.x verwenden kann, um eine Verbindung mit Ihrem Azure Cosmos DB for NoSQL-Konto herzustellen.
Öffnen Sie Ihren Azure Databricks-Arbeitsbereich.
Erstellen Sie auf der Oberfläche des Arbeitsbereichs einen neuen Cluster. Konfigurieren Sie den Cluster mit den folgenden (minimalen) Einstellungen:
Version Wert Laufzeitversion 13.3 LTS (Scala 2.12, Spark 3.4.1) Verwenden Sie die Benutzeroberfläche des Arbeitsbereichs, um nach Maven-Paketen von Maven Central mit einer Group ID von
com.azure.cosmos.sparkzu suchen. Installieren Sie das für Spark 3.4 spezifische Paket (der Artefakt-ID des Clusters hat das Präfixazure-cosmos-spark_3-4).Erstellen Sie zuletzt ein neues Notizbuch.
Tipp
Standardmäßig wird das Notebook an den zuletzt erstellten Cluster angefügt.
Legen Sie im Notebook Konfigurationseinstellungen für die Onlinetransaktionsverarbeitung (OLTP) für den NoSQL-Kontoendpunkt, den Datenbanknamen und den Containernamen fest.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Erstellen einer Datenbank und eines Containers
Verwenden Sie die Katalog-API, um Kontoressourcen wie Datenbanken und Container zu verwalten. Anschließend können Sie OLTP verwenden, um Daten innerhalb der Containerressourcen zu verwalten.
Konfigurieren Sie mithilfe von Spark die Katalog-API zum Verwalten der API für NoSQL-Ressourcen.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))Erstellen Sie mithilfe von
CREATE DATABASE IF NOT EXISTSeine neue Datenbank namenscosmicworks.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")Erstellen Sie mithilfe von
CREATE TABLE IF NOT EXISTSeinen neuen Container namensproducts. Stellen Sie sicher, dass Sie den Partitionsschlüsselpfad auf/categoryfestlegen und automatischen Durchsatz mit einem Maximum von1000Anforderungseinheiten pro Sekunde (RU/s) aktivieren.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))Erstellen Sie einen weiteren Container namens
employeesmithilfe einer hierarchischen Partitionsschlüsselkonfiguration. Verwenden Sie/organization,/departmentund/teamals Partitionsschlüsselpfade. Halten Sie diese Reihenfolge ein. Legen Sie außerdem den Durchsatz auf eine manuelle Anzahl von400RUs fest.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))Führen Sie die Notebookzellen aus, um zu überprüfen, ob die Datenbank und die Container in Ihrer API für das NoSQL-Konto erstellt werden.
Erfassen von Daten
Erstellen Sie ein Beispieldataset. Verwenden Sie dann OLTP, um diese Daten in die API für NoSQL-Container zu erfassen.
Erstellen Sie ein Beispieldataset.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )Verwenden Sie
spark.createDataFrameund die zuvor gespeicherte OLTP-Konfiguration, um Beispieldaten zum Zielcontainer hinzuzufügen.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Abfragedaten
Laden Sie OLTP-Daten in einen Datenrahmen, um gängige Abfragen für die Daten auszuführen. Sie können verschiedene Syntaxen verwenden, um Daten zu filtern oder abzufragen.
Verwenden Sie
spark.read, um die OLTP-Daten in ein Dataframeobjekt zu laden. Verwenden Sie dieselbe Konfiguration, die Sie bereits zuvor in diesem Tutorial verwendet haben. Legen Sie außerdemspark.cosmos.read.inferSchema.enabledauftruefest, damit der Spark-Connector das Schema ableiten kann, indem er vorhandene Elemente sampelt.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()Rendern Sie mithilfe von
printSchemadas Schema der im Dataframe geladenen Daten.# Render schema df.printSchema()// Render schema df.printSchema()Rendern Sie Datenzeilen, in denen die Spalte
quantitykleiner als20ist. Verwenden Sie die Funktionenwhereundshow, um diese Abfrage auszuführen.# Render filtered data df.where("quantity < 20") \ .show()// Render filtered data df.where("quantity < 20") .show()Rendern Sie die erste Datenzeile, in der die Spalte
clearancegleichtrueist. Verwenden Sie die Funktionfilter, um diese Abfrage auszuführen.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)Rendern Sie fünf Datenzeilen ohne Filter oder Kürzung. Verwenden Sie die Funktion
show, um die Darstellung und Anzahl der gerenderten Zeilen anzupassen.# Render five rows of unfiltered and untruncated data df.show(5, False)// Render five rows of unfiltered and untruncated data df.show(5, false)Fragen Sie Ihrer Daten mithilfe dieser unformatierten NoSQL-Abfragezeichenfolge ab:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800.# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Ausführen gängiger Vorgänge
Wenn Sie in Spark mit der API für NoSQL-Daten arbeiten, können Sie Teilupdates ausführen oder mit Daten im unformatierten JSON-Format arbeiten.
So führen Sie ein Teilupdate eines Elements aus:
Kopieren Sie die vorhandene Konfigurationsvariable
config, und ändern Sie die Eigenschaften in der neuen Kopie. Konfigurieren Sie insbesondere die Schreibstrategie aufItemPatch. Deaktivieren Sie dann die Massenunterstützung. Legen Sie die Spalten und zugeordneten Vorgänge fest. Legen Sie schließlich den Standardvorgangstyp aufSetfest.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )Erstellen Sie Variablen für den Elementpartitionsschlüssel und den eindeutigen Bezeichner, auf den Sie im Rahmen dieses Patchvorgangs als Ziel verwenden möchten.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"Erstellen Sie eine Reihe von Patchobjekten, um das Zielelement anzugeben, und geben Sie Felder an, die geändert werden sollen.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )Erstellen Sie einen Dataframe mithilfe der Gruppe von Patchobjekten. Verwenden Sie
write, um den Patchvorgang auszuführen.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()Führen Sie eine Abfrage aus, um die Ergebnisse des Patchvorgangs zu überprüfen. Das Element sollte jetzt
Yamba New Surfboardbenannt werden, ohne dass weitere Änderungen auftreten.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
So arbeiten Sie mit unformatierten JSON-Daten:
Kopieren Sie die vorhandene Konfigurationsvariable
config, und ändern Sie die Eigenschaften in der neuen Kopie. Ändern Sie insbesondere den Zielcontainer inemployees. Konfigurieren Sie dann die Spalte/das Feldcontacts, um unformatierte JSON-Daten zu verwenden.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )Erstellen Sie eine Gruppe von Mitarbeitern, die im Container erfasst werden sollen.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )Erstellen Sie einen Datenrahmen, und verwenden Sie
write, um die Mitarbeiterdaten zu erfassen.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()Rendern Sie die Daten aus dem Datenframe mithilfe von
show. Sie erkennen, dass die Spaltecontactsin der Ausgabe aus JSON-Rohdaten besteht.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Verwandte Inhalte
- Apache Spark
- Azure Cosmos DB-Katalog-API
- Konfigurationsparameterreferenz
- Azure Cosmos DB Spark-Connectorbeispiele
- Migration von Spark 2.4 zu Spark 3.*
- Veraltete Versionen:
- Der Azure Cosmos DB Spark Connector für Spark 3.1 und 3.2 ist veraltet, da keine unterstützten Spark 3.1- oder 3.2-Laufzeiten in Azure Databricks, Azure Synapse oder Azure HDInsight mehr verfügbar sind.
- Migrationshandbuch zum Aktualisieren von Spark 3.1
- Migrationshandbuch zum Aktualisieren von Spark 3.2
- Versionskompatibilität:
- Versionshinweise:
- Downloadlinks: