Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Wichtig
Synapse Link for Cosmos DB wird für neue Projekte nicht mehr unterstützt. Verwenden Sie dieses Feature nicht.
Verwenden Sie Azure Cosmos DB Mirroring für Microsoft Fabric, das jetzt allgemein verfügbar ist. Das Mirroring bietet die gleichen Null-ETL-Vorteile und ist vollständig in Microsoft Fabric integriert. Weitere Informationen finden Sie unter Cosmos DB Mirroring Overview.
Hinweis
Informationen zu Azure Synapse Link für Azure Cosmos DB unter Verwendung von Spark 3 finden Sie im Artikel Azure Synapse Link für Azure Cosmos DB in Spark 3.
In diesem Artikel erfahren Sie, wie Sie unter Verwendung von Synapse Apache Spark 2 mit Azure Cosmos DB interagieren. Synapse Apache Spark bietet vollständige Unterstützung für Scala, Python, SparkSQL und C# und ist essentiell für Analyse-, Datentechnik-, Data-Science-, und Datenerkundungsszenarios in Azure Synapse Link für Azure Cosmos DB.
Die folgenden Funktionen werden für die Interaktion mit Azure Cosmos DB unterstützt:
- Mit Synapse Apache Spark können Sie Daten in Ihren Azure Cosmos DB-Containern mit Azure Synapse Link-Aktivierung in Quasi-Echtzeit analysieren, ohne dass sich dies auf die Leistung Ihrer Transaktionsarbeitsauslastungen auswirkt. Die folgenden beiden Optionen stehen zur Verfügung, um den Analysespeicher von Azure Cosmos DB in Spark abzufragen:
- Laden in Datenrahmen in Spark
- Spark-Tabelle erstellen
- Synapse Apache Spark ermöglicht es Ihnen außerdem, Daten in Azure Cosmos DB zu erfassen. Es ist wichtig zu beachten, dass Daten immer über den Transaktionsspeicher in Azure Cosmos DB-Container aufgenommen werden. Wenn Azure Synapse Link aktiviert ist, werden alle neuen Einfügungen, Updates und Löschvorgänge automatisch mit dem Analysespeicher synchronisiert.
- Synapse Apache Spark unterstützt auch Spark strukturiertes Streaming mit Azure Cosmos DB als Quelle und Spüle.
Die folgenden Abschnitte führen Sie durch die Syntax der oben genannten Funktionen. Sie können auch das Lernmodul zum Abfragen von Azure Cosmos DB mit Apache Spark für Azure Synapse Analytics auschecken. Gesten in Azure Synapse Analytics-Arbeitsbereichen dienen einem einfachen Start der Verwendung. Gesten werden angezeigt, wenn Sie auf einen Azure Cosmos DB-Container auf der Registerkarte Daten im Synapse-Arbeitsbereich rechtsklicken. Mit Gesten können Sie schnell Code generieren und an Ihre Anforderungen anpassen. Gesten eignen sich auch ideal zum Ermitteln von Daten mit nur einem Mausklick.
Wichtig
Sie müssen einige Einschränkungen im analytischen Schema beachten, die zu unerwartetem Verhalten bei Datenladevorgängen führen können. Als Beispiel sind nur die ersten 1.000 Eigenschaften aus dem Transaktionsschema im analytischen Schema verfügbar, Eigenschaften mit Leerzeichen sind nicht verfügbar usw. Wenn unerwartete Ergebnisse auftreten, überprüfen Sie die Einschränkungen des Analytischen Speicherschemas , um weitere Details zu erhalten.
Abfragen von Azure Cosmos DB-Analysespeicher
Bevor Sie Informationen zu den zwei verfügbaren Optionen erhalten, mit denen der Analysespeicher in Azure Cosmos DB abgefragt werden kann, mit denen Ladevorgänge für Datenrahmen in Spark ausgeführt und Spark-Tabellen erstellt werden können, sollten Sie sich die Unterschiede genauer ansehen, damit Sie die Option auswählen können, die Ihre Anforderungen am besten erfüllt.
Ein Unterschied liegt darin, ob die zugrunde liegenden Datenänderungen im Azure Cosmos DB-Container automatisch in der in Spark durchgeführten Analyse widergespiegelt werden sollten oder nicht. Wenn für den Analysespeicher eines Containers entweder ein Datenrahmen in Spark registriert oder eine Tabelle in Spark erstellt wird, werden die Metadaten zur aktuellen Momentaufnahme der Daten im Analysespeicher in Spark abgerufen, um eine effiziente Weitergabe für anschließende Analysen zu ermöglichen. Es ist wichtig zu beachten, dass Spark nach einer faulen Auswertungsrichtlinie folgt, es sei denn, eine Aktion wird auf dem Spark DataFrame aufgerufen, oder eine SparkSQL-Abfrage wird für die Spark-Tabelle ausgeführt, tatsächliche Daten werden nicht aus dem analytischen Speicher des zugrunde liegenden Containers abgerufen.
Im Fall des Ladens für einen Datenrahmen in Spark werden die abgerufenen Metadaten für die Lebensdauer der Spark-Sitzung zwischengespeichert, und folglich werden Aktionen, die anschließend für den Datenrahmen aufgerufen werden, für die Momentaufnahme des Analysespeichers zum Zeitpunkt der Erstellung des Datenrahmens bewertet.
Beim Erstellen einer Spark-Tabelle werden die Metadaten des Analysespeicherzustands hingegen nicht in Spark zwischengespeichert und bei jeder SparkSQL-Abfrageausführung für die Spark-Tabelle neu geladen.
Folglich können Sie entscheiden, ob für den Datenrahmen in Spark geladen werden soll oder ob eine Spark-Tabelle erstellt werden soll. Dies basiert darauf, ob Sie Ihre Spark-Analyse jeweils für eine festgelegte Momentaufnahme des Analysespeichers oder für eine aktuelle Momentaufnahme des Analysespeichers bewerten möchten.
Wenn Ihre analytischen Abfragen häufig verwendete Filter enthalten, können Sie basierend auf diesen Feldern partitionieren, um die Abfrageleistung zu verbessern. Sie können in regelmäßigen Abständen einen Partitionierungsauftrag über ein Azure Synapse Spark-Notebook ausführen, um die Partitionierung für einen Analysespeicher auszulösen. Der partitionierte Speicher verweist auf das primäre ADLS Gen2-Speicherkonto, das mit Ihrem Azure Synapse-Arbeitsbereich verknüpft ist. Weitere Informationen finden Sie in den Artikeln Benutzerdefinierte Partitionierung in Azure Synapse Link für Azure Cosmos DB (Vorschauversion) und Konfigurieren der benutzerdefinierten Partitionierung zum Partitionieren von Analysespeicherdaten (Vorschau).
Hinweis
Wenn Sie Azure Cosmos DB for MongoDB-Konten abfragen möchten, finden Sie hier weitere Informationen zur Schemadarstellung mit vollständiger Genauigkeit im Analysespeicher und den dabei zu verwendenden Namen für erweiterte Eigenschaften.
Hinweis
Beachten Sie, dass für alle options-Elemente in den nachstehenden Befehlen die Groß-/Kleinschreibung beachtet werden muss. Beispielsweise müssen Sie Gateway verwenden, für gateway wird ein Fehler zurückgegeben.
Laden in Datenrahmen in Spark
In diesem Beispiel erstellen Sie einen Datenrahmen in Spark, der auf den Analysespeicher in Azure Cosmos DB verweist. Anschließend können Sie eine andere Analyse durchführen, indem Sie Spark-Aktionen für den DataFrame aufrufen. Dieser Vorgang wirkt sich nicht auf den Transaktionsspeicher aus.
Die Syntax in Python sieht folgendermaßen aus:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
df = spark.read.format("cosmos.olap")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.load()
Die entsprechende Syntax in Scala entspräche dem folgenden Code:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
load()
Spark-Tabelle erstellen
In diesem Beispiel erstellen Sie eine Tabelle in Spark, die auf den Analysespeicher in Azure Cosmos DB verweist. Anschließend können Sie eine andere Analyse durchführen, indem Sie SparkSQL-Abfragen für die Tabelle aufrufen. Dieser Vorgang wirkt sich weder auf den Transaktionsspeicher aus, noch führt er zu Datenverschiebungen jeglicher Art. Wenn Sie diese Spark-Tabelle löschen möchten, sind der zugrunde liegende Azure Cosmos DB-Container und der entsprechende Analysespeicher nicht betroffen.
Dieses Szenario ist praktisch, wenn Sie Spark-Tabellen über Drittanbietertools wiederverwenden und den Zugriff auf die zugrunde liegenden Daten für die Laufzeit ermöglichen möchten.
Die Syntax für das Erstellen einer Spark-Tabelle ist folgende:
%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options
create table call_center using cosmos.olap options (
spark.synapse.linkedService '<enter linked service name>',
spark.cosmos.container '<enter container name>'
)
Hinweis
Wenn Sie über Szenarien verfügen, in denen sich das Schema des zugrunde liegenden Azure Cosmos DB-Containers im Laufe der Zeit ändert, und wenn das aktualisierte Schema automatisch in Abfragen für die Spark-Tabelle widergespiegelt werden soll, können Sie dies festlegen, indem die spark.cosmos.autoSchemaMerge-Option in den Optionen für Spark-Tabellen auf true festgelegt wird.
Schreiben eines Spark-Datenrahmens in Azure Cosmos DB-Container
In diesem Beispiel schreiben Sie einen Spark-Datenrahmen in einen Azure Cosmos DB-Container. Dieser Vorgang wirkt sich auf die Leistung transaktionaler Arbeitsauslastungen aus und benötigt Anforderungseinheiten, die im Azure Cosmos DB-Container oder in der freigegebenen Datenbank bereitgestellt werden.
Die Syntax in Python sieht folgendermaßen aus:
# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
YOURDATAFRAME.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.write.upsertEnabled", "true")\
.mode('append')\
.save()
Die entsprechende Syntax in Scala entspräche dem folgenden Code:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
import org.apache.spark.sql.SaveMode
df.write.format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.write.upsertEnabled", "true").
mode(SaveMode.Overwrite).
save()
Streamingdatenrahmen aus Container laden
Bei dieser Geste verwenden Sie die Spark-Streamingfunktion, um Daten aus einem Container in einen Datenrahmen zu laden. Die Daten werden im primären Data Lake-Konto (und Dateisystem) gespeichert, das Sie mit dem Arbeitsbereich verbunden haben.
Hinweis
Wenn Sie auf externe Bibliotheken in Synapse Apache Spark verweisen möchten, erfahren Sie hier mehr. Wenn Sie beispielsweise einen Spark DataFrame in einen Container von Azure Cosmos DB für MongoDB aufnehmen möchten, können Sie den MongoDB-Connector für Spark verwenden.
Laden von Streamingdatenrahmen aus Azure Cosmos DB-Containern
In diesem Beispiel verwenden Sie die Funktion für strukturiertes Streaming in Spark, um Daten aus einem Azure Cosmos DB-Container in einen Streamingdatenrahmen in Spark zu laden. Dafür nutzen Sie die Änderungsfeedfunktion in Azure Cosmos DB. Die von Spark verwendeten Prüfpunktdaten werden im primären Data Lake-Konto (und Dateisystem) gespeichert, das Sie mit dem Arbeitsbereich verbunden haben.
Wurde der Ordner /localReadCheckpointFolder nicht erstellt (siehe Beispiel unten), wird er automatisch erstellt. Dieser Vorgang wirkt sich auf die Leistung transaktionaler Arbeitsauslastungen aus und benötigt Anforderungseinheiten, die im Azure Cosmos DB-Container oder in der freigegebenen Datenbank bereitgestellt werden.
Die Syntax in Python sieht folgendermaßen aus:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
dfStream = spark.readStream\
.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.changeFeed.readEnabled", "true")\
.option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
.option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
.option("spark.cosmos.changeFeed.queryName", "streamQuery")\
.load()
Die entsprechende Syntax in Scala entspräche dem folgenden Code:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val dfStream = spark.readStream.
format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.changeFeed.readEnabled", "true").
option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
option("spark.cosmos.changeFeed.queryName", "streamQuery").
load()
Schreiben von Streamingdatenrahmen in Azure Cosmos DB-Container
In diesem Beispiel schreiben Sie einen Streamingdatenrahmen in einen Azure Cosmos DB-Container. Dieser Vorgang wirkt sich auf die Leistung transaktionaler Arbeitsauslastungen aus und benötigt Anforderungseinheiten, die im Azure Cosmos DB-Container oder in der freigegebenen Datenbank bereitgestellt werden. Wurde der Ordner /localWriteCheckpointFolder nicht erstellt (siehe Beispiel unten), wird er automatisch erstellt.
Die Syntax in Python sieht folgendermaßen aus:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway.
def writeBatchToCosmos(batchDF, batchId):
batchDF.persist()
print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
batchDF.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.write.upsertEnabled", "true")\
.mode('append')\
.save()
print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
batchDF.unpersist()
streamQuery = dfStream\
.writeStream\
.foreachBatch(writeBatchToCosmos) \
.option("checkpointLocation", "/localWriteCheckpointFolder")\
.start()
streamQuery.awaitTermination()
Die entsprechende Syntax in Scala entspräche dem folgenden Code:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway.
val query = dfStream.
writeStream.
foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.write.upsertEnabled", "true").
mode(SaveMode.Overwrite).
save()
println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
batchDF.unpersist()
()
}.
option("checkpointLocation", "/localWriteCheckpointFolder").
start()
query.awaitTermination()
Nächste Schritte
- Beispiele für die ersten Schritte mit Azure Synapse Link auf GitHub
- Von Azure Synapse Link unterstützte Features für Azure Cosmos DB
- Herstellen einer Verbindung mit Azure Synapse Link für Azure Cosmos DB
- Schauen Sie sich das Lernmodul zum Abfragen von Azure Cosmos DB mit Apache Spark für Azure Synapse Analytics an.