Partilhar via


Trabalhar com o Cosmos DB num caderno Python no Microsoft Fabric

Pode usar o Cosmos DB Python SDK num caderno Python no Microsoft Fabric para ler, escrever e consultar dados do Cosmos DB no Microsoft Fabric. Também podes criar e gerir contentores de bases de dados Cosmos.

Usar o conector Spark é diferente de usar o Spark para ler dados do Cosmos DB em dados espelhados em Fabric armazenados no OneLake, pois liga-se diretamente ao endpoint do Cosmos DB para realizar operações.

Sugestão

Descarregue o exemplo completo do Cosmos DB em Microsoft Fabric Samples no GitHub.

Pré-requisitos

Observação

Este artigo utiliza a amostra incorporada do Cosmos DB criada com o nome de contentor SampleData.

Recuperar o endpoint da base de dados Cosmos

Primeiro, obtenha o endpoint para a base de dados Cosmos DB no sistema Fabric. Este endpoint é necessário para se ligar usando o Cosmos DB Spark Connector.

  1. Abra o portal do Fabric (https://app.fabric.microsoft.com).

  2. Navegue até o banco de dados existente do Cosmos DB.

  3. Selecione a opção Definições na barra de menu da base de dados.

    Captura de ecrã da opção de barra de menu 'Definições' para uma base de dados no portal do Fabric.

  4. No diálogo de definições, navegue até à secção Ligação . Depois, copie o valor do campo Endpoint para a base de dados NoSQL do Cosmos DB . Utiliza-se este valor nas etapas seguintes.

    Captura de ecrã da secção 'Connection' do diálogo 'Definições' de uma base de dados no portal do Fabric.

Instalar o pacote Python SDK do Cosmos DB

  • Instala o pacote azure-cosmos no teu caderno. Esta deve ser a versão 4.14.0 ou posterior.

    Célula [1]:

    #Install packages
    %pip install azure-cosmos
    

Importar bibliotecas e definir valores de configuração

  • Importa os pacotes para o teu caderno. Neste e noutros exemplos usamos a biblioteca assíncrona para a Cosmos DB. Depois aplica o endpoint do Cosmos DB, o nome da base de dados e o nome do contentor que guardaste numa etapa anterior.

    Célula [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

Crie uma credencial de token personalizada para autenticar

  1. Crie um objeto FabricTokenCredential() para produzir um objeto de credencial válido para o Cosmos DB SDK a partir da cadeia de tokens gerada pelas utilidades de credenciais Fabric NotebookUtils que é necessária para autenticar um utilizador.

    [NOTA!] Os cadernos Microsoft Fabric não suportam objetos Azure Credential nativamente. Não podes usar DefaultAzureCredential() para autenticar no Cosmos DB no Microsoft Fabric.

    Célula [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. Crie um objeto cliente Cosmos DB assíncrono e uma referência ao contentor Cosmos DB para usar num caderno.

    Célula [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Crie uma função assíncrona para consultar o contentor da base de dados Cosmos

    Célula [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. Chame a função assíncrona recém-definida para devolver os resultados da consulta

    Célula [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output