Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Important
O Link do Synapse para Cosmos DB não tem mais suporte para novos projetos. Não use esse recurso.
Use o Espelhamento do Azure Cosmos DB para Microsoft Fabric, que agora está em Disponibilidade Geral. O espelhamento fornece os mesmos benefícios de ETL zero e é totalmente integrado ao Microsoft Fabric. Saiba mais na Visão Geral do Espelhamento do Cosmos DB.
Neste artigo, você aprenderá a interagir com o Azure Cosmos DB usando o Synapse Apache Spark 3. Os clientes podem usar Scala, Python, SparkSQL e C#para análise, engenharia de dados, ciência de dados e cenários de exploração de dados no Link do Azure Synapse para Azure Cosmos DB.
Os seguintes recursos têm suporte ao interagir com o Azure Cosmos DB:
- O Synapse Apache Spark 3 permite analisar dados em seus contêineres do Azure Cosmos DB que são habilitados com o Link do Azure Synapse quase em tempo real sem afetar o desempenho das cargas de trabalho transacionais. As duas seguintes opções estão disponíveis para consultar o armazenamento de análise do Azure Cosmos DB do Spark:
- Carregar para o DataFrame do Spark
- Criar tabela do Spark
- O Synapse Apache Spark também permite ingerir dados no Azure Cosmos DB. É importante observar que os dados sempre são ingeridos em contêineres do Azure Cosmos DB por meio do armazenamento transacional. Quando o Link do Azure Synapse está habilitado, todas as novas inserções, atualizações e exclusões são sincronizadas automaticamente com o repositório analítico.
- O Apache Spark do Synapse também dá suporte ao streaming estruturado do Spark com o Azure Cosmos DB como uma origem e um coletor.
As seções a seguir orientam você pela sintaxe. Confira também o módulo do Learn sobre como Consultar o Azure Cosmos DB com o Apache Spark para Azure Synapse Analytics. Os gestos no workspace do Azure Synapse Analytics são projetados para proporcionar uma experiência fácil de usar para começar. Os gestos ficam visíveis quando você clica com o botão direito do mouse em um contêiner do Azure Cosmos DB na guia Dados do workspace do Azure Synapse. Com gestos, você pode gerar código rapidamente e ajustá-lo às suas necessidades. Os gestos também são perfeitos para descobrir dados com um único clique.
Important
Você deve estar ciente sobre algumas restrições no esquema analítico que poderão gerar um comportamento inesperado em operações de carregamento de dados. Por exemplo, somente as primeiras 1.000 propriedades do esquema transacional estão disponíveis no esquema analítico, as propriedades com espaços não estão disponíveis etc. Se você estiver tendo alguns resultados inesperados, verifique as restrições de esquema do repositório analítico para obter mais detalhes.
Consultar o armazenamento de análise do Azure Cosmos DB
Os clientes podem carregar dados de armazenamento analítico para DataFrames do Spark ou criar tabelas do Spark.
A diferença na experiência é relativa a se as alterações de dados subjacentes no contêiner de Azure Cosmos DB devem ser refletidas automaticamente na análise realizada no Spark. Quando os DataFrames do Spark são registrados ou uma tabela do Spark é criada, o Spark busca metadados de armazenamento analítico para uma execução eficiente de pushdown. É importante observar isso, já que o Spark segue uma política de avaliação lenta. Você precisa agir para criar o último instantâneo dos dados em consultas DataFrames do Spark ou SparkSQL.
No caso de carregamento para o DataFrame do Spark, os metadados buscados são armazenados em cache durante o tempo de vida da sessão do Spark, portanto, as ações subsequentes invocadas no DataFrame são avaliadas no instantâneo do armazenamento de análise no momento da criação do DataFrame.
Por outro lado, no caso da criação de uma tabela do Spark, os metadados do estado do repositório analítico não são armazenados em cache no Spark e recarregados em cada execução de consulta do SparkSQL em relação à tabela do Spark.
Para concluir, você pode escolher entre carregar um instantâneo no DataFrame do Spark ou consultar uma tabela do Spark para o instantâneo mais recente.
Note
Para consultar contas do Azure Cosmos DB for MongoDB, saiba mais sobre a representação de esquema de fidelidade completa no armazenamento de análise e os nomes de propriedade estendida a serem usados.
Note
Todas options diferenciam maiúsculas de minúsculas.
Authentication
Agora, os clientes do Spark 3.x podem se autenticar no repositório analítico do Azure Cosmos DB usando tokens de acesso de identidades confiáveis ou chaves de conta de banco de dados. Os tokens são mais seguros por serem de curta duração e atribuídos à permissão necessária usando o RBAC do Cosmos DB.
O conector agora dá suporte a dois tipos de autenticação, MasterKey e AccessToken, para a propriedade spark.cosmos.auth.type.
Autenticação de chave mestra
Use a chave para ler um dataframe usando o Spark:
val config = Map(
"spark.cosmos.accountEndpoint" -> "<endpoint>",
"spark.cosmos.accountKey" -> "<key>",
"spark.cosmos.database" -> "<db>",
"spark.cosmos.container" -> "<container>"
)
val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)
Autenticação de token de acesso
A nova autenticação sem chave apresenta suporte para tokens de acesso:
val config = Map(
"spark.cosmos.accountEndpoint" -> "<endpoint>",
"spark.cosmos.auth.type" -> "AccessToken",
"spark.cosmos.auth.accessToken" -> "<accessToken>",
"spark.cosmos.database" -> "<db>",
"spark.cosmos.container" -> "<container>"
)
val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)
Note
O conector do Spark do Link do Azure Synapse do Azure Cosmos DB não dá suporte à Identidade Gerenciada.
A autenticação de token de acesso requer atribuição de função
Para usar a abordagem de token de acesso, você precisa gerar tokens de acesso. Como os tokens de acesso estão associados às identidades do Azure, o RBAC (controle de acesso baseado em função) correto deve ser atribuído à identidade. A atribuição de função está no nível do plano de dados e você deve ter permissões mínimas do plano de controle para executar a atribuição de função.
As atribuições de função do IAM (Gerenciamento de Acesso de Identidade) do portal do Azure estão no nível do plano de controle e não afetam as atribuições de função no plano de dados. As atribuições de função do plano de dados só estão disponíveis por meio da CLI do Azure. A ação readAnalytics é necessária para ler dados do repositório analítico no Cosmos DB e não faz parte de nenhuma função predefinida. Como tal, devemos criar uma definição de função personalizada. Além da ação readAnalytics , adicione também as ações necessárias para o Leitor de Dados. Crie um arquivo JSON com o conteúdo a seguir e nomeie-o role_definition.json
{
"RoleName": "CosmosAnalyticsRole",
"Type": "CustomRole",
"AssignableScopes": ["/"],
"Permissions": [{
"DataActions": [
"Microsoft.DocumentDB/databaseAccounts/readAnalytics",
"Microsoft.DocumentDB/databaseAccounts/readMetadata",
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read",
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/executeQuery",
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed"
]
}]
}
A autenticação de Token de Acesso requer a CLI do Azure
- Faça logon na CLI do Azure:
az login - Defina a assinatura padrão, que tem sua conta do Cosmos DB:
az account set --subscription <name or id> - Crie a definição de função na conta desejada do Cosmos DB:
az cosmosdb sql role definition create --account-name <cosmos-account-name> --resource-group <resource-group-name> --body @role_definition.json - Copia a
definition idda função retornada:/subscriptions/<subscription-id>/resourceGroups/<resource-group-name>/providers/Microsoft.DocumentDB/databaseAccounts/< cosmos-account-name >/sqlRoleDefinitions/<a-random-generated-guid> - Obtenha a ID principal da identidade à qual você deseja atribuir a função. A identidade pode ser um registro de aplicativo do Azure, uma máquina virtual ou qualquer outro recurso do Azure com suporte. Atribua a função ao principal usando:
az cosmosdb sql role assignment create --account-name "<cosmos-account-name>" --resource-group "<resource-group>" --scope "/" --principal-id "<principal-id-of-identity>" --role-definition-id "<role-definition-id-from-previous-step>"
Note
Ao usar um registro de aplicativo Azure, use Object Id como a ID da entidade de serviço. Além disso, a ID principal e a conta do Cosmos DB devem estar no mesmo locatário.
Gerar token de acesso – Notebooks do Synapse
O método recomendado para Notebooks do Synapse é usar uma entidade de serviço com um certificado para gerar tokens de acesso. Clique aqui para obter mais informações.
The following code snippet has been validated to work in a Synapse notebook
val tenantId = "<azure-tenant-id>"
val clientId = "<client-id-of-service-principal>"
val kvLinkedService = "<azure-key-vault-linked-service>"
val certName = "<certificate-name>"
val token = mssparkutils.credentials.getSPTokenWithCertLS(
"https://<cosmos-account-name>.documents.azure.com/.default",
"https://login.microsoftonline.com/" + tenantId, clientId, kvLinkedService, certName)
Agora você pode usar o token de acesso gerado nesta etapa para ler dados do repositório analítico quando o tipo de autenticação estiver configurado para token de acesso.
Note
Ao usar um registro de aplicativo do Azure, use o aplicativo (ID do cliente).
Note
Atualmente, o Synapse não dá suporte à geração de tokens de acesso usando o pacote azure-identity em notebooks. Além disso, VHDs do Synapse não incluem o pacote azure-identity e suas dependências. Clique aqui para obter mais informações.
Carregar para o DataFrame do Spark
Neste exemplo, você cria um DataFrame do Spark que aponta para o repositório analítico do Azure Cosmos DB. Em seguida, você pode executar mais análise invocando ações do Spark no DataFrame. Esta operação não afeta o repositório transacional.
A sintaxe no Python seria a seguinte:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
df = spark.read.format("cosmos.olap")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.load()
A sintaxe equivalente no Scala seria a seguinte:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
load()
Criar tabela do Spark
Neste exemplo, você cria uma tabela spark que aponta o repositório analítico do Azure Cosmos DB. Em seguida, você pode executar outras análises invocando consultas do SparkSQL na tabela. Essa operação não afeta o armazenamento transacional nem incorre na movimentação de dados. Se você decidir excluir essa tabela do Spark, o contêiner do Azure Cosmos DB subjacente e o repositório analítico correspondente não serão afetados.
Esse cenário é conveniente para reutilizar tabelas do Spark por meio de ferramentas de terceiros e fornecer acessibilidade aos dados subjacentes para o tempo de execução.
A sintaxe para criar uma tabela do Spark é a seguinte:
%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options
create table call_center using cosmos.olap options (
spark.synapse.linkedService '<enter linked service name>',
spark.cosmos.container '<enter container name>'
)
Note
Se você tiver cenários em que o esquema do contêiner do Azure Cosmos DB subjacente mude ao longo do tempo e quiser que o esquema atualizado se reflita automaticamente nas consultas na tabela do Spark, poderá conseguir isso definindo a opção spark.cosmos.autoSchemaMerge como true nas opções da tabela do Spark.
Gravar o DataFrame do Spark no contêiner do Azure Cosmos DB
Neste exemplo, você escreve um DataFrame do Spark em um contêiner do Azure Cosmos DB. Essa operação afeta o desempenho de cargas de trabalho transacionais e consome unidades de solicitação provisionadas no contêiner do Azure Cosmos DB ou no banco de dados compartilhado.
A sintaxe no Python seria a seguinte:
# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
YOURDATAFRAME.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.mode('append')\
.save()
A sintaxe equivalente no Scala seria a seguinte:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
import org.apache.spark.sql.SaveMode
df.write.format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
mode(SaveMode.Append).
save()
Carregar DataFrame de streaming do contêiner
Nesse gesto, você usa a funcionalidade de Streaming do Spark para carregar dados de um contêiner em um dataframe. Os dados são armazenados na conta principal do data lake (e do sistema de arquivos) que você conectou ao workspace.
Note
Se você estiver procurando fazer referência a bibliotecas externas do Apache Spark no Synapse, saiba mais aqui. Por exemplo, se você pretende ingerir um DataFrame do Spark para um contêiner do Azure Cosmos DB for MongoDB, use o conector do Mongo DB para Spark aqui.
Carregar DataFrame de streaming do contêiner do Azure Cosmos DB
Neste exemplo, você usa o streaming estruturado do Spark para carregar dados de um contêiner do Azure Cosmos DB em um DataFrame de streaming do Spark usando a funcionalidade de feed de alterações no Azure Cosmos DB. Os dados do ponto de verificação usados pelo Spark serão armazenados na conta principal do data lake (e sistema de arquivos) que você conectou ao workspace.
A sintaxe no Python seria a seguinte:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
dfStream = spark.readStream\
.format("cosmos.oltp.changeFeed")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.changeFeed.startFrom", "Beginning")\
.option("spark.cosmos.changeFeed.mode", "Incremental")\
.load()
A sintaxe equivalente no Scala seria a seguinte:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val dfStream = spark.readStream.
format("cosmos.oltp.changeFeed").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.changeFeed.startFrom", "Beginning").
option("spark.cosmos.changeFeed.mode", "Incremental").
load()
Gravar DataFrame de streaming para o contêiner do Azure Cosmos DB
Neste exemplo, você escreve um DataFrame de streaming em um contêiner do Azure Cosmos DB. Essa operação afeta o desempenho de cargas de trabalho transacionais e consome unidades de solicitação provisionadas no contêiner do Azure Cosmos DB ou no banco de dados compartilhado. Se a pasta /localWriteCheckpointFolder não for criada (no exemplo abaixo), ela será criada automaticamente.
A sintaxe no Python seria a seguinte:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
streamQuery = dfStream\
.writeStream\
.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("checkpointLocation", "/tmp/myRunId/")\
.outputMode("append")\
.start()
streamQuery.awaitTermination()
A sintaxe equivalente no Scala seria a seguinte:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val query = dfStream.
writeStream.
format("cosmos.oltp").
outputMode("append").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("checkpointLocation", "/tmp/myRunId/").
start()
query.awaitTermination()
Próximas etapas
- Exemplos para começar a usar o Link do Azure Synapse no GitHub
- Saiba o que é compatível com o Link do Azure Synapse para Azure Cosmos DB
- Conectar-se ao Link do Azure Synapse para Azure Cosmos DB
- Confira o módulo Learn sobre como consultar o Azure Cosmos DB com o Apache Spark para o Azure Synapse Analytics.