Abfragen von Cosmos DB-Daten mit Spark

Abgeschlossen

Nachdem Sie Ihrer Azure Cosmos DB-Datenbank mit aktiviertem Analysespeicher einen verknüpften Dienst hinzugefügt haben, können Sie damit die Daten mithilfe eines Spark-Pools in Ihrem Azure Synapse Analytics-Arbeitsbereich abfragen.

Laden analytischer Azure Cosmos DB-Daten in einen DataFrame

Für eine erste Erkundung oder schnelle Analyse von aus einem mit Azure Cosmos DB verknüpften Dienst stammenden Daten ist es oft am einfachsten, Daten aus einem Container in einen DataFrame zu laden. Dazu verwenden Sie eine von Spark unterstützte Sprache wie PySpark (eine Spark-spezifische Implementierung von Python) oder Scala (eine Java-basierte Sprache, die häufig in Spark zum Einsatz kommt).

Beispielsweise könnte der folgende PySpark-Code verwendet werden, um einen Dataframe namens df aus den Daten im my-container Container zu laden, der über den verknüpften Dienst my_linked_service verbunden ist, und die ersten 10 Datenzeilen anzuzeigen.

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

Nehmen wir an, der Container "my-container " wird verwendet, um Elemente wie im folgenden Beispiel zu speichern:

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

Die Ausgabe des PySpark-Codes sieht ähnlich aus wie in der folgenden Tabelle:

_rid _ts Produkt-ID productName id _etag
mjMaAL...== 1655414791 123 Widget 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

Die Daten werden aus dem Analysespeicher und nicht aus dem Betriebsspeicher in den Container geladen. So wird sichergestellt, dass der Betriebsspeicher nicht durch Abfragen überlastet wird. Die Felder im analytischen Datenspeicher enthalten die anwendungsdefinierte Felder (in diesem Fall "productID " und "productName") und automatisch erstellte Metadatenfelder.

Nach Laden des DataFrames können Sie mit dessen nativen Methoden die Daten erkunden. Mit dem folgenden Code wird beispielsweise ein neuer Datenrahmen erstellt, der nur die Spalten "productID " und "productName " enthält, sortiert nach " productName":

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

Die Ausgabe dieses Codes sieht ähnlich aus wie diese Tabelle:

Produkt-ID productName
125 Thingumy
123 Widget
124 Wotsit
... ...

Schreiben eines DataFrames in einen Cosmos DB-Container

In den meisten HTAP-Szenarien sollten Sie mithilfe des verknüpften Diensts Daten aus dem Analysespeicher in Spark einlesen. Sie können den Inhalt eines Datenframes jedoch wie im folgenden Beispiel gezeigt in den Container schreiben:

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

Hinweis

Das Schreiben eines Datenframes in einen Container aktualisiert den Betriebsspeicher und kann auswirkungen auf die Leistung haben. Die Änderungen werden dann mit dem Analysespeicher synchronisiert.

Abfragen analytischer Azure Cosmos DB-Daten mit Spark SQL

Spark SQL ist eine Spark-API, die die SQL-Sprachsyntax und Semantik relationaler Datenbanken in einem Spark-Pool bereitstellt. Sie können mit Spark SQL Metadaten für Tabellen definieren, die mit SQL abgefragt werden können.

Der folgende Code erstellt beispielsweise eine Tabelle mit dem Namen "Produkte ", die auf dem hypothetischen Container basiert, der in den vorherigen Beispielen verwendet wird:

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

Tipp

Das %%sql Schlüsselwort am Anfang des Codes ist ein magisches Wort, das den Spark-Pool anweist, den Code als SQL und nicht in der Standardsprache auszuführen (normalerweise auf PySpark festgelegt).

Bei diesem Ansatz können Sie eine logische Datenbank in Ihrem Spark-Pool erstellen, mit der Sie dann die analytischen Daten in Azure Cosmos DB abfragen können. Auf diese Weise können Sie Workloads zur Datenanalyse und Berichterstellung unterstützen, ohne den Betriebsspeicher in Ihrem Azure Cosmos DB-Konto zu beeinträchtigen.