Udostępnij przez


Interakcja z usługą Azure Cosmos DB przy użyciu platformy Apache Spark 2 w usłudze Azure Synapse Link

Ważne

Usługa Synapse Link dla usługi Cosmos DB nie jest już obsługiwana w przypadku nowych projektów. Nie używaj tej funkcji.

Użyj funkcji mirroringu Azure Cosmos DB dla Microsoft Fabric, która jest teraz ogólnie dostępna. Mirroring zapewnia takie same korzyści płynące z braku konieczności ETL i jest w pełni zintegrowane z Microsoft Fabric. Dowiedz się więcej na stronie Omówienie dublowania usługi Cosmos DB.

Uwaga

W przypadku usługi Azure Synapse Link dla usługi Azure Cosmos DB przy użyciu platformy Spark 3 zapoznaj się z tym artykułem Usługa Azure Synapse Link dla usługi Azure Cosmos DB na platformie Spark 3

W tym artykule dowiesz się, jak korzystać z usługi Azure Cosmos DB przy użyciu usługi Synapse Apache Spark 2. Dzięki pełnej obsłudze języków Scala, Python, SparkSQL i C#usługa Synapse Apache Spark jest centralna w scenariuszach analizy, inżynierii danych, nauki o danych i eksploracji danych w usłudze Azure Synapse Link dla usługi Azure Cosmos DB.

Podczas interakcji z usługą Azure Cosmos DB obsługiwane są następujące możliwości:

  • Usługa Synapse Apache Spark umożliwia analizowanie danych w kontenerach usługi Azure Cosmos DB, które są włączone za pomocą usługi Azure Synapse Link w czasie niemal rzeczywistym bez wpływu na wydajność obciążeń transakcyjnych. Następujące dwie opcje są dostępne do wykonywania zapytań względem analitycznego magazynu Azure Cosmos DB za pomocą Spark:
    • Ładowanie do ramki danych platformy Spark
    • Tworzenie tabeli platformy Spark
  • Usługa Synapse Apache Spark umożliwia również pozyskiwanie danych do usługi Azure Cosmos DB. Należy pamiętać, że dane są zawsze pozyskiwane do kontenerów usługi Azure Cosmos DB za pośrednictwem magazynu transakcyjnego. Po włączeniu usługi Azure Synapse Link wszystkie nowe wstawki, aktualizacje i usunięcia są automatycznie synchronizowane z magazynem analitycznym.
  • Usługa Synapse Apache Spark obsługuje również strukturalne przesyłanie strumieniowe Spark z Azure Cosmos DB jako źródłem i odbiornikiem.

W poniższych sekcjach przedstawiono składnię powyższych możliwości. Możesz również zapoznać się z modułem Learn dotyczącym wykonywania zapytań dotyczących usługi Azure Cosmos DB przy użyciu platformy Apache Spark dla usługi Azure Synapse Analytics. Gesty w obszarze roboczym usługi Azure Synapse Analytics zostały zaprojektowane w celu zapewnienia łatwego gotowego środowiska do rozpoczęcia pracy. Gesty są widoczne po kliknięciu prawym przyciskiem myszy kontenera usługi Azure Cosmos DB na karcie Dane obszaru roboczego usługi Synapse. Za pomocą gestów można szybko wygenerować kod i dostosować go do własnych potrzeb. Gesty są również idealne do odnajdywania danych jednym kliknięciem.

Ważne

Należy pamiętać o pewnych ograniczeniach w schemacie analitycznym, które mogą prowadzić do nieoczekiwanego zachowania operacji ładowania danych. Na przykład w schemacie analitycznym dostępne są tylko pierwsze 1000 właściwości ze schematu transakcyjnego, właściwości zawierające spacje nie są dostępne itp. Jeśli występują nieoczekiwane wyniki, sprawdź ograniczenia schematu repozytorium analitycznego, aby uzyskać więcej szczegółów.

Wykonywanie zapytań w magazynie analitycznym usługi Azure Cosmos DB

Przed zapoznaniem się z dwiema możliwymi opcjami wykonywania zapytań dotyczących magazynu analitycznego usługi Azure Cosmos DB, ładowania do ramki danych Spark i tworzenia tabeli Spark, warto zapoznać się z różnicami w doświadczeniu, aby wybrać opcję, która będzie odpowiednia dla Twoich potrzeb.

Różnica w środowisku polega na tym, czy zmiany danych bazowych w kontenerze usługi Azure Cosmos DB powinny zostać automatycznie odzwierciedlone w analizie wykonywanej na platformie Spark. Gdy ramka danych platformy Spark jest zarejestrowana lub tabela Spark utworzona z magazynem analitycznym kontenera, metadane dotyczące bieżącej migawki danych w magazynie analitycznym są pobierane do platformy Spark w celu wydajnego przetwarzania kolejnych analiz. Należy pamiętać, że ponieważ platforma Spark jest zgodna z leniwymi zasadami oceny, chyba że akcja jest wywoływana w ramce danych Platformy Spark lub w zapytaniu SparkSQL jest wykonywana względem tabeli Spark, rzeczywiste dane nie są pobierane z magazynu analitycznego kontenera bazowego.

W przypadku ładowania do ramki danych Spark pobrane metadane są buforowane przez okres istnienia sesji Spark. Tym samym kolejne akcje wywoływane dla ramki danych są oceniane względem migawki magazynu analitycznego z czasu utworzenia ramki danych.

Z drugiej strony w przypadku tworzenia tabeli Spark metadane stanu magazynu analitycznego nie są buforowane na platformie Spark i są ponownie ładowane na każdym wykonaniu zapytania SparkSQL względem tabeli Spark.

W związku z tym możesz wybrać między ładowaniem do ramki danych Spark a utworzeniem tabeli Spark, w zależności od tego, czy chcesz, aby analiza Spark była oceniana względem utrwalonej migawki magazynu analitycznego, czy najnowszej migawki magazynu analitycznego.

Jeśli zapytania analityczne często używały filtrów, możesz podzielić je na partycje na podstawie tych pól w celu uzyskania lepszej wydajności zapytań. Okresowo możesz uruchamiać zadanie partycjonowania z notesu Azure Synapse Spark, aby wyzwolić partycjonowanie w magazynie analitycznym. Magazyn podzielony na partycje wskazuje na główne konto magazynowe ADLS Gen2 połączone z obszarem roboczym usługi Azure Synapse. Aby dowiedzieć się więcej, zobacz wprowadzenie do partycjonowania niestandardowego i sposób konfigurowania partycjonowania niestandardowego artykuły.

Uwaga

Aby przesyłać zapytania dotyczące kont usługi Azure Cosmos DB dla baz danych MongoDB, dowiedz się więcej o reprezentacji schematu o pełnej zgodności w magazynie analitycznym i rozszerzonych nazwach właściwości, które powinny być używane.

Uwaga

Należy pamiętać, że w options poniższych poleceniach uwzględniana jest wielkość liter. Na przykład, należy użyć Gateway, ponieważ użycie gateway spowoduje błąd.

Ładowanie do ramki danych platformy Spark

W tym przykładzie utworzysz ramkę danych Spark, która wskazuje magazyn analityczny usługi Azure Cosmos DB. Następnie możesz wykonać inną analizę, wywołując akcje platformy Spark względem ramki danych. Ta operacja nie ma wpływu na magazyn transakcyjny.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Równoważna składnia w języku Scala byłaby następująca :

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Tworzenie tabeli platformy Spark

W tym przykładzie utworzysz tabelę Platformy Spark, która wskazuje magazyn analityczny usługi Azure Cosmos DB. Następnie możesz wykonać inną analizę, wywołując zapytania SparkSQL względem tabeli. Ta operacja nie wpływa ani na magazyn transakcyjny, ani nie powoduje żadnego przenoszenia danych. Jeśli zdecydujesz się usunąć tę tabelę Spark, podstawowy kontener usługi Azure Cosmos DB i odpowiedni magazyn analityczny nie będą miały wpływu.

Ten scenariusz jest wygodny do ponownego użycia tabel platformy Spark za pomocą narzędzi innych firm i zapewnia dostępność bazowych danych w czasie wykonywania.

Składnia do utworzenia tabeli Spark jest następująca:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Uwaga

Jeśli masz scenariusze, w których schemat bazowego kontenera usługi Azure Cosmos DB zmienia się w czasie; a jeśli chcesz, aby zaktualizowany schemat automatycznie odzwierciedlał zapytania względem tabeli Spark, możesz to osiągnąć, ustawiając spark.cosmos.autoSchemaMerge opcję w true opcjach tabeli Spark.

Zapisywanie DataFrame Spark do kontenera w usłudze Azure Cosmos DB

W tym przykładzie napiszesz ramkę danych platformy Spark do kontenera usługi Azure Cosmos DB. Ta operacja wpłynie na wydajność obciążeń transakcyjnych i będzie korzystać z jednostek żądań aprowizowanych w kontenerze usługi Azure Cosmos DB lub udostępnionej bazie danych.

Składnia w języku Python będzie następująca:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

Równoważna składnia w języku Scala byłaby następująca :

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

Ładowanie ramki danych strumieniowych z kontenera

W tym gestzie użyjesz możliwości przesyłania strumieniowego platformy Spark, aby załadować dane z kontenera do ramki danych. Dane będą przechowywane na podstawowym koncie usługi Data Lake (i systemie plików) połączonym z obszarem roboczym.

Uwaga

Jeśli chcesz odwołać się do bibliotek zewnętrznych w usłudze Synapse Apache Spark, dowiedz się więcej tutaj. Jeśli na przykład chcesz załadować ramkę danych Spark do kontenera usługi Azure Cosmos DB dla MongoDB, możesz użyć łącznika MongoDB dla Spark.

Ładowanie danych strumieniowych z kontenera bazy danych Azure Cosmos DB

W tym przykładzie użyjesz funkcji przetwarzania strumieniowego ze strukturą Spark, aby załadować dane z kontenera Azure Cosmos DB do strumieniowego DataFrame Spark, korzystając z funkcji strumienia zmian w Azure Cosmos DB. Dane kontrolne używane przez platformę Spark będą przechowywane na podstawowym koncie platformy Data Lake (i systemie plików), które zostało połączone z obszarem roboczym.

Jeśli folder /localReadCheckpointFolder nie zostanie utworzony (w poniższym przykładzie), zostanie utworzony automatycznie. Ta operacja wpłynie na wydajność obciążeń transakcyjnych i zużyje jednostki żądań przydzielone w kontenerze usługi Azure Cosmos DB lub udostępnionej bazie danych.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

Równoważna składnia w języku Scala byłaby następująca :

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

Pisanie ramki danych przesyłanych strumieniowo do kontenera w usłudze Azure Cosmos DB

W tym przykładzie napiszesz strumieniową ramkę danych do kontenera w usłudze Azure Cosmos DB. Ta operacja wpłynie na wydajność obciążeń transakcyjnych i zużyje jednostki żądań przydzielone w kontenerze usługi Azure Cosmos DB lub udostępnionej bazie danych. Jeśli folder /localWriteCheckpointFolder nie zostanie utworzony (w poniższym przykładzie), zostanie utworzony automatycznie.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

def writeBatchToCosmos(batchDF, batchId):
  batchDF.persist()
  print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()
  print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.unpersist()

streamQuery = dfStream\
        .writeStream\
        .foreachBatch(writeBatchToCosmos) \
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .start()

streamQuery.awaitTermination()

Równoważna składnia w języku Scala byłaby następująca :

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.persist()
              batchDF.write.format("cosmos.oltp").
                option("spark.synapse.linkedService", "<enter linked service name>").
                option("spark.cosmos.container", "<enter container name>"). 
                option("spark.cosmos.write.upsertEnabled", "true").
                mode(SaveMode.Overwrite).
                save()
              println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
              batchDF.unpersist()
              ()
            }.        
            option("checkpointLocation", "/localWriteCheckpointFolder").
            start()

query.awaitTermination()

Następne kroki