Abfragen von Cosmos DB-Daten mit Spark
Nachdem Sie Ihrer Azure Cosmos DB-Datenbank mit aktiviertem Analysespeicher einen verknüpften Dienst hinzugefügt haben, können Sie damit die Daten mithilfe eines Spark-Pools in Ihrem Azure Synapse Analytics-Arbeitsbereich abfragen.
Laden analytischer Azure Cosmos DB-Daten in einen DataFrame
Für eine erste Erkundung oder schnelle Analyse von aus einem mit Azure Cosmos DB verknüpften Dienst stammenden Daten ist es oft am einfachsten, Daten aus einem Container in einen DataFrame zu laden. Dazu verwenden Sie eine von Spark unterstützte Sprache wie PySpark (eine Spark-spezifische Implementierung von Python) oder Scala (eine Java-basierte Sprache, die häufig in Spark zum Einsatz kommt).
Beispielsweise könnte der folgende PySpark-Code verwendet werden, um einen Dataframe namens df aus den Daten im my-container Container zu laden, der über den verknüpften Dienst my_linked_service verbunden ist, und die ersten 10 Datenzeilen anzuzeigen.
df = spark.read
.format("cosmos.olap")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.load()
display(df.limit(10))
Nehmen wir an, der Container "my-container " wird verwendet, um Elemente wie im folgenden Beispiel zu speichern:
{
"productID": 123,
"productName": "Widget",
"id": "7248f072-11c3-42b1-a368-...",
"_rid": "mjMaAL...==",
"_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
"_etag": "\"54004b09-0000-2300-...\"",
"_attachments": "attachments/",
"_ts": 1655414791
}
Die Ausgabe des PySpark-Codes sieht ähnlich aus wie in der folgenden Tabelle:
| _rid | _ts | Produkt-ID | productName | id | _etag |
|---|---|---|---|---|---|
| mjMaAL...== | 1655414791 | 123 | Widget | 7248f072-11c3-42b1-a368-... | 54004b09-0000-2300-... |
| mjMaAL...== | 1655414829 | 124 | Wotsit | dc33131c-65c7-421a-a0f7-... | 5400ca09-0000-2300-... |
| mjMaAL...== | 1655414835 | 125 | Thingumy | ce22351d-78c7-428a-a1h5-... | 5400ca09-0000-2300-... |
| ... | ... | ... | ... | ... | ... |
Die Daten werden aus dem Analysespeicher und nicht aus dem Betriebsspeicher in den Container geladen. So wird sichergestellt, dass der Betriebsspeicher nicht durch Abfragen überlastet wird. Die Felder im analytischen Datenspeicher enthalten die anwendungsdefinierte Felder (in diesem Fall "productID " und "productName") und automatisch erstellte Metadatenfelder.
Nach Laden des DataFrames können Sie mit dessen nativen Methoden die Daten erkunden. Mit dem folgenden Code wird beispielsweise ein neuer Datenrahmen erstellt, der nur die Spalten "productID " und "productName " enthält, sortiert nach " productName":
products_df = df.select("productID", "productName").orderBy("productName")
display(products_df.limit(10))
Die Ausgabe dieses Codes sieht ähnlich aus wie diese Tabelle:
| Produkt-ID | productName |
|---|---|
| 125 | Thingumy |
| 123 | Widget |
| 124 | Wotsit |
| ... | ... |
Schreiben eines DataFrames in einen Cosmos DB-Container
In den meisten HTAP-Szenarien sollten Sie mithilfe des verknüpften Diensts Daten aus dem Analysespeicher in Spark einlesen. Sie können den Inhalt eines Datenframes jedoch wie im folgenden Beispiel gezeigt in den Container schreiben:
mydf.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.mode('append')\
.save()
Hinweis
Das Schreiben eines Datenframes in einen Container aktualisiert den Betriebsspeicher und kann auswirkungen auf die Leistung haben. Die Änderungen werden dann mit dem Analysespeicher synchronisiert.
Abfragen analytischer Azure Cosmos DB-Daten mit Spark SQL
Spark SQL ist eine Spark-API, die die SQL-Sprachsyntax und Semantik relationaler Datenbanken in einem Spark-Pool bereitstellt. Sie können mit Spark SQL Metadaten für Tabellen definieren, die mit SQL abgefragt werden können.
Der folgende Code erstellt beispielsweise eine Tabelle mit dem Namen "Produkte ", die auf dem hypothetischen Container basiert, der in den vorherigen Beispielen verwendet wird:
%%sql
-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;
USE mydb;
-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
spark.synapse.linkedService 'my_linked_service',
spark.cosmos.container 'my-container'
);
-- Query the table
SELECT productID, productName
FROM products;
Tipp
Das %%sql Schlüsselwort am Anfang des Codes ist ein magisches Wort, das den Spark-Pool anweist, den Code als SQL und nicht in der Standardsprache auszuführen (normalerweise auf PySpark festgelegt).
Bei diesem Ansatz können Sie eine logische Datenbank in Ihrem Spark-Pool erstellen, mit der Sie dann die analytischen Daten in Azure Cosmos DB abfragen können. Auf diese Weise können Sie Workloads zur Datenanalyse und Berichterstellung unterstützen, ohne den Betriebsspeicher in Ihrem Azure Cosmos DB-Konto zu beeinträchtigen.