Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym samouczku użyjesz łącznika Spark dla usługi Azure Cosmos DB, aby odczytać lub zapisać dane z konta Azure Cosmos DB for NoSQL. W tym samouczku użyto usługi Azure Databricks i notesu Jupyter, aby zilustrować sposób integracji z interfejsem API for NoSQL z platformy Spark. Ten samouczek koncentruje się na językach Python i Scala, chociaż można używać dowolnego języka lub interfejsu obsługiwanego przez platformę Spark.
W tym poradniku nauczysz się, jak:
- Nawiąż połączenie z kontem API dla NoSQL, używając platformy Spark i notesu Jupyter.
- Utwórz zasoby bazy danych i kontenera.
- Ładowanie danych do kontenera.
- Wykonywanie zapytań o dane w kontenerze.
- Wykonywanie typowych operacji na elementach w kontenerze.
Wymagania wstępne
- Istniejące konto usługi Azure Cosmos DB for NoSQL.
- Jeśli masz istniejącą subskrypcję platformy Azure, utwórz nowe konto.
- Istniejący obszar roboczy usługi Azure Databricks.
Nawiązywanie połączenia przy użyciu platformy Spark i programu Jupyter
Użyj istniejącego obszaru roboczego usługi Azure Databricks, aby utworzyć klaster obliczeniowy gotowy do użycia platformy Apache Spark 3.4.x w celu nawiązania połączenia z kontem usługi Azure Cosmos DB for NoSQL.
Otwórz obszar roboczy usługi Azure Databricks.
W interfejsie obszaru roboczego utwórz nowy klaster. Skonfiguruj klaster przy użyciu tych ustawień co najmniej:
wersja Wartość Wersja środowiska uruchomieniowego 13.3 LTS (Scala 2.12, Spark 3.4.1) Użyj interfejsu obszaru roboczego do wyszukiwania pakietów Maven z Maven Central z identyfikatorem grupy
com.azure.cosmos.spark. Zainstaluj pakiet specjalnie dla platformy Spark 3.4, używając Artifact ID z prefiksemazure-cosmos-spark_3-4w klastrze.Na koniec utwórz nowy notatnik.
Wskazówka
Domyślnie notatnik jest dołączony do ostatnio utworzonego klastra.
W notesie ustaw ustawienia konfiguracji przetwarzania transakcji online (OLTP) dla punktu końcowego konta NoSQL, nazwy bazy danych i nazwy kontenera.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Tworzenie bazy danych i kontenera
Interfejs API katalogu umożliwia zarządzanie zasobami kont, takimi jak bazy danych i kontenery. Następnie można użyć olTP do zarządzania danymi w zasobach kontenera.
Skonfiguruj API katalogu do zarządzania API zasobów NoSQL przy użyciu Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))Utwórz nową bazę danych o nazwie
cosmicworksprzy użyciu poleceniaCREATE DATABASE IF NOT EXISTS.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")Utwórz nowy kontener o nazwie
productsprzy użyciu poleceniaCREATE TABLE IF NOT EXISTS. Upewnij się, że ustawiono ścieżkę klucza partycji na/categoryi włączono automatyczną skalowalność przepływności z maksymalną przepływnością1000jednostek zapytań (RU) na sekundę.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))Utwórz inny kontener o nazwie
employeesprzy użyciu hierarchicznej konfiguracji klucza partycji. Użyj parametrów/organization,/departmenti/teamjako zestawu ścieżek klucza partycji. Postępuj zgodnie z tym konkretnym zamówieniem. Ponadto ustaw przepływność na ręcznie określoną liczbę400RU.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))Uruchom komórki notesu, aby sprawdzić, czy baza danych i kontenery zostały utworzone w ramach konta interfejsu API dla noSQL.
Wczytywanie danych
Utwórz przykładowy zestaw danych. Następnie użyj OLTP, aby zaimportować te dane do API kontenera NoSQL.
Utwórz przykładowy zestaw danych.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )Użyj
spark.createDataFrameoraz wcześniej zapisanej konfiguracji OLTP, aby dodać przykładowe dane do kontenera docelowego.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Wykonywanie zapytań o dane
Ładowanie danych OLTP do ramki danych w celu wykonywania typowych zapytań dotyczących danych. Do filtrowania lub wykonywania zapytań dotyczących danych można użyć różnych składni.
Użyj
spark.readdo załadowania danych OLTP do obiektu ramki danych. Użyj tej samej konfiguracji, która była używana wcześniej w tym samouczku. Ustaw także wartośćspark.cosmos.read.inferSchema.enablednatrue, aby umożliwić łącznikowi Spark wnioskowanie schematu przez próbkowanie istniejących elementów.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()Renderowanie schematu danych załadowanych w ramce danych przy użyciu polecenia
printSchema.# Render schema df.printSchema()// Render schema df.printSchema()Renderuj wiersze danych, w których kolumna
quantityjest mniejsza niż20.whereUżyj funkcji ishow, aby wykonać to zapytanie.# Render filtered data df.where("quantity < 20") \ .show()// Render filtered data df.where("quantity < 20") .show()Renderuj pierwszy wiersz danych, w którym kolumna
clearancetotrue.filterUżyj funkcji , aby wykonać to zapytanie.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)Renderuj pięć wierszy danych bez filtru ani obcinania.
showUżyj funkcji , aby dostosować wygląd i liczbę renderowanych wierszy.# Render five rows of unfiltered and untruncated data df.show(5, False)// Render five rows of unfiltered and untruncated data df.show(5, false)Wykonaj zapytanie o dane przy użyciu tego nieprzetworzonego ciągu zapytania NoSQL:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Wykonywanie typowych operacji
Podczas pracy z API dla NoSQL w Spark można wykonywać częściowe aktualizacje lub pracować z danymi w postaci nieprzetworzonego JSON.
Aby wykonać częściową aktualizację elementu:
Skopiuj istniejącą
configzmienną konfiguracji i zmodyfikuj właściwości w nowej kopii. W szczególności skonfiguruj strategię zapisu naItemPatch. Następnie wyłącz wsparcie dla operacji zbiorczych. Ustaw kolumny i zamapowane operacje. Na koniec ustaw domyślny typ operacji naSet.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )Utwórz zmienne dla klucza partycji elementu i unikatowy identyfikator, który ma być przeznaczony jako część tej operacji poprawki.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"Utwórz zestaw obiektów poprawek, aby określić element docelowy i określić pola, które mają zostać zmodyfikowane.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )Utwórz ramkę danych przy użyciu zestawu obiektów poprawek. Użyj polecenia
write, aby wykonać operację stosowania poprawek.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()Uruchom zapytanie, aby przejrzeć wyniki operacji stosowania poprawki. Element powinien być teraz nazwany
Yamba New Surfboardbez innych zmian.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Aby pracować z nieprzetworzonymi danymi JSON:
Skopiuj istniejącą
configzmienną konfiguracji i zmodyfikuj właściwości w nowej kopii. W szczególności zmień kontener docelowy naemployees. Następnie skonfiguruj kolumnęcontacts/pole, aby używać nieprzetworzonych danych JSON.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )Utwórz zestaw pracowników do wczytywania do kontenera.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )Utwórz ramkę danych i użyj
writedo wczytywania danych pracowników.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()Renderuj dane z ramki danych przy użyciu polecenia
show. Zwróć uwagę, że kolumnacontactsjest nieprzetworzonym kodem JSON w danych wyjściowych.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Treści powiązane
- Apache Spark
- API katalogu Azure Cosmos DB
- Dokumentacja parametrów konfiguracji
- Przykłady łącznika Spark usługi Azure Cosmos DB
- Migrowanie z platformy Spark 2.4 do platformy Spark 3.*
- Przestarzałe wersje:
- Łącznik spark usługi Azure Cosmos DB dla platformy Spark 3.1 i 3.2 jest przestarzały, ponieważ nie ma już obsługiwanych środowisk uruchomieniowych platformy Spark 3.1 lub 3.2 w usłudze Azure Databricks, usłudze Azure Synapse lub usłudze Azure HDInsight.
- Przewodnik migracji do aktualizacji z platformy Spark 3.1
- Przewodnik migracji do aktualizacji z platformy Spark 3.2
- Zgodność wersji:
- Wersji:
- Linki pobierania: