Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Neste tutorial, você configura um pipeline de ETL reverso para mover dados enriquecidos de tabelas Delta no Azure Databricks para o Azure Cosmos DB para NoSQL. Em seguida, use o conector OLTP (Online Transaction Processing) Spark para Azure Cosmos DB para NoSQL para sincronizar dados.
Pré-requisitos para configuração de pipeline ETL reverso
- Uma conta existente do Azure Cosmos DB.
- Se você tiver uma assinatura do Azure, crie uma nova conta.
- Um espaço de trabalho existente do Azure Databricks.
- Se você tiver uma assinatura do Azure, crie um novo espaço de trabalho.
- Versão mais recente da CLI do Azure.
- Se preferir, você também pode usar o Azure Cloud Shell.
Configurar o controle de acesso baseado em função com o Microsoft Entra
As identidades gerenciadas do Azure garantem autenticação segura e sem senha no Azure Cosmos DB para NoSQL sem gerenciar credenciais manualmente. Nesta etapa de pré-requisito, configure a identidade gerenciada atribuída pelo usuário que o Azure Databricks cria automaticamente com acesso de leitura a metadados e acesso de gravação a dados para sua conta do Azure Cosmos DB para NoSQL. Esta etapa configura as funções de controle e controle de acesso baseado em função do plano de dados para a identidade gerenciada.
Entre no portal do Azure (https://portal.azure.com).
Navegue até o recurso existente do Azure Databricks.
No painel Essentials , localize e navegue até o grupo de recursos gerenciados associado ao espaço de trabalho.
No grupo de recursos gerenciados, selecione a identidade gerenciada atribuída ao usuário que foi criada automaticamente com o espaço de trabalho.
Registre o valor dos campos ID do Cliente e ID do Objeto (principal) no painel Essentials . Use esse valor posteriormente para atribuir funções de controle e plano de dados.
Sugestão
Como alternativa, você pode obter a ID principal da identidade gerenciada usando a CLI do Azure. Supondo que o nome da identidade gerenciada seja
dbmanagedidentity, use oaz resource showcomando para obter a ID principal.az resource show \ --resource-group "<name-of-managed-resource-group>" \ --name "dbmanagedidentity" \ --resource-type "Microsoft.ManagedIdentity/userAssignedIdentities" \ --query "{clientId: properties.clientId, principalId: properties.principalId}"Navegue até a conta de destino do Azure Cosmos DB para NoSQL.
Na página da conta, selecione Controle de acesso (IAM).
No painel Controle de acesso , selecione as opções Adicionar e, em seguida, Adicionar atribuição de função para iniciar o processo de atribuição de uma função de plano de controle à identidade gerenciada atribuída pelo usuário.
Selecione a função Leitor de Conta do Cosmos DB na lista de funções para atribuição.
Na seção para atribuir acesso a um usuário, grupo ou entidade de serviço, interaja com a opção selecionar membros .
Na caixa de diálogo membros, insira a ID principal a ser filtrada para a identidade gerenciada atribuída pelo usuário associada ao Azure Databricks. Selecione essa identidade.
Por fim, selecione Revisar + Atribuir para criar a atribuição de função do plano de controle.
Use o
az cosmosdb sql role assignment createcomando para atribuir a função doCosmos DB Built-in Data Contributorplano de dados e o/escopo à identidade gerenciada atribuída pelo usuário associada ao Azure Databricks.az cosmosdb sql role assignment create \ --resource-group "<name-of-resource-group>" \ --account-name "<name-of-cosmos-nosql-account>" \ --principal-id "<managed-identity-principal-id>" \ --role-definition-name "Cosmos DB Built-in Data Contributor" \ --scope "/"Use
az account showpara obter seus identificadores de assinatura e locatário. Esses valores são necessários em uma etapa posterior com o conector Spark usando a autenticação do Microsoft Entra.az account show --query '{subscriptionId: id, tenantId: tenantId}'
Criar um bloco de anotações Databricks
Navegue até o recurso existente do Azure Databricks e abra a interface do usuário do espaço de trabalho.
Se você ainda não tiver um cluster, crie um novo cluster.
Importante
Verifique se o cluster tem o Runtime versão 15.4 superior, que tem suporte de longo prazo para Spark 3.5.0 e Scala 2.12. As etapas restantes neste guia assumem essas versões das ferramentas.
Navegue até Bibliotecas>Install New> e Maven para instalar um pacote Maven.
Procure o conector Spark para Azure Cosmos DB para NoSQL usando o filtro
com.azure.cosmos.sparke selecionando o pacote com uma ID de Artefato deazure-cosmos-spark_3-5_2-12.Crie um novo bloco de anotações navegando até Espaço de trabalho>[Pasta]>Novo>bloco de anotações.
Anexe o bloco de notas ao cluster.
Configurar o conector Spark no Azure Databricks
Configure o conector Spark para se conectar ao contêiner da sua conta usando a autenticação do Microsoft Entra. Além disso, configure o conector para usar apenas um limite limitado de taxa de transferência para operações do Spark. Para configurar o conector spark, defina um dicionário de configuração com credenciais para se conectar à sua conta. Essas credenciais incluem:
| Valor | |
|---|---|
spark.cosmos.accountEndpoint |
O ponto de extremidade da conta NoSQL |
spark.cosmos.database |
O nome da base de dados de destino |
spark.cosmos.container |
O nome do contêiner de destino |
spark.cosmos.auth.type |
ManagedIdentity |
spark.cosmos.auth.aad.clientId |
A ID do Cliente da identidade gerenciada atribuída pelo usuário |
spark.cosmos.account.subscriptionId |
O ID da subscrição |
spark.cosmos.account.tenantId |
A ID do locatário associado do Microsoft Entra |
spark.cosmos.account.resourceGroupName |
O nome do grupo de recursos |
spark.cosmos.throughputControl.enabled |
true |
spark.cosmos.throughputControl.name |
TargetContainerThroughputControl |
spark.cosmos.throughputControl.targetThroughputThreshold |
0.30 |
spark.cosmos.throughputControl.globalControl.useDedicatedContainer |
«falso |
cosmos_config = {
# General settings
"spark.cosmos.accountEndpoint": "<endpoint>",
"spark.cosmos.database": "products",
"spark.cosmos.container": "recommendations",
# Entra authentication settings
"spark.cosmos.auth.type": "ManagedIdentity",
"spark.cosmos.account.subscriptionId": "<subscriptionId>",
"spark.cosmos.account.tenantId": "<tenantId>",
"spark.cosmos.account.resourceGroupName": "<resourceGroupName>",
# Throughput control settings
"spark.cosmos.throughputControl.enabled": "true",
"spark.cosmos.throughputControl.name": "TargetContainerThroughputControl",
"spark.cosmos.throughputControl.targetThroughputThreshold": "0.30",
"spark.cosmos.throughputControl.globalControl.useDedicatedContainer": "false",
}
val cosmosconfig = Map(
// General settings
"spark.cosmos.accountEndpoint" -> "<endpoint>",
"spark.cosmos.database" -> "products",
"spark.cosmos.container" -> "recommendations",
// Entra authentication settings
"spark.cosmos.auth.type" -> "ManagedIdentity",
"spark.cosmos.account.subscriptionId" -> "<subscriptionId>",
"spark.cosmos.account.tenantId" -> "<tenantId>",
"spark.cosmos.account.resourceGroupName" -> "<resourceGroupName>",
// Throughput control settings
"spark.cosmos.throughputControl.enabled" -> "true",
"spark.cosmos.throughputControl.name" -> "TargetContainerThroughputControl",
"spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.30",
"spark.cosmos.throughputControl.globalControl.useDedicatedContainer" -> "false"
)
Observação
Neste exemplo, o banco de dados de destino é nomeado products e o contêiner de destino é nomeado recommendations.
A configuração de taxa de transferência, conforme especificado nesta etapa, garante que apenas 30% das unidades de solicitação (RUs) alocadas ao contêiner de destino estejam disponíveis para operações do Spark.
Ingerir dados de recomendações de produtos de amostra para uma tabela Delta
Crie um DataFrame de exemplo com informações de recomendações de produtos para usuários e escreva-o em uma tabela Delta chamada recommendations_delta. Esta etapa simula dados curados e transformados em seu data lake que você pretende sincronizar com o Azure Cosmos DB para NoSQL. Gravar no formato Delta garante que você possa habilitar posteriormente a captura de dados de alteração (CDC) para sincronização incremental.
from pyspark.sql import SparkSession
# Create sample data and convert it to a DataFrame
df = spark.createDataFrame([
("yara-lima", "Full-Finger Gloves", "clothing-gloves", 80),
("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 90)
], ["id", "productname", "category", "recommendationscore"])
# Write the DataFrame to a Delta table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
// Create sample data as a sequence and convert it to a DataFrame
val df = Seq(
("yara-lima", "Full-Finger Gloves", "clothing-gloves", 12.95),
("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 19.99)
).toDF("id", "productname", "category", "recommendationscore")
// Write the DataFrame to a table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
Carregar dados iniciais em lote no Azure Cosmos DB para NoSQL
Em seguida, leia a recommendations_delta tabela Delta num Spark DataFrame e execute uma escrita em lote inicial para o Azure Cosmos DB NoSQL usando o formato cosmos.oltp. Use o modo de acréscimo para adicionar os dados sem substituir o conteúdo existente no banco de dados e no contêiner de destino. Esta etapa garante que todos os dados históricos estejam disponíveis na conta antes do início do CDC.
# Read the Delta table into a DataFrame
df_delta = spark.read.format("delta").table("recommendations_delta")
# Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Read the Delta table into a DataFrame
val df_delta = spark.read.format("delta").table("recommendations_delta")
// Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(cosmosconfig).save()
Habilitar a sincronização de streaming com o feed de dados de alteração
Habilite o recommendations_delta recurso Change Data Feed (CDF) do Delta Lake na tabela alterando as propriedades da tabela. O CDF permite que o Delta Lake rastreie todas as inserções, atualizações e exclusões futuras no nível da linha. Habilitar essa propriedade é essencial para executar sincronizações incrementais com o Azure Cosmos DB para NoSQL, pois expõe alterações sem a necessidade de comparar instantâneos.
Após a carga de dados históricos, as alterações na tabela Delta podem ser capturadas usando o Delta Change Data Feed (CDF). Você pode implementar CDC em modo de lote ou em fluxo contínuo.
# Enable Change Data Feed (CDF)
spark.sql("""
ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Read the Change Data Capture (CDC) data from the Delta table
cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")
# Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Enable Change Data Feed (CDF)
spark.sql("""
ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
// Read the Change Data Capture (CDC) data from the Delta table
val cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")
// Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(cosmos_config).save()
Verificar dados usando consultas NoSQL
Depois de escrever no Azure Cosmos DB para NoSQL, verifique os dados consultando-os novamente no Spark usando a mesma configuração de conta. Depois; inspecione os dados ingeridos, execute validações ou junte-se a outros conjuntos de dados no Delta Lake para análises ou relatórios. O Azure Cosmos DB para NoSQL oferece suporte a leituras rápidas e indexadas para desempenho de consulta em tempo real.
# Load DataFrame
df_cosmos = spark.read.format("cosmos.oltp").options(**cosmos_config).load()
# Run query
df_cosmos.select("id", "productname", "category", "recommendationscore").show()
// Load DataFrame
val dfCosmos = spark.read.format("cosmos.oltp").options(cosmosConfig).load()
// Run query
dfCosmos.select("id", "productname", "category", "recommendationscore").show()