Partager via


Utiliser Cosmos DB dans un notebook Python dans Microsoft Fabric

Vous pouvez utiliser le Kit de développement logiciel (SDK) Python Cosmos DB dans un notebook Python dans Microsoft Fabric pour lire et interroger des données à partir de Cosmos DB dans Microsoft Fabric. Vous pouvez également créer et gérer des conteneurs Cosmos DB.

L’utilisation du connecteur Spark diffère de l’utilisation de Spark pour lire des données à partir de Cosmos DB dans les données mises en miroir Fabric stockées dans OneLake, car elle se connecte directement au point de terminaison Cosmos DB pour effectuer des opérations.

Conseil / Astuce

Téléchargez l’exemple complet à partir de Cosmos DB dans des exemples Microsoft Fabric sur GitHub.

Prerequisites

Note

Cet article utilise l’exemple Cosmos DB intégré créé avec un nom de conteneur sampleData.

Récupérer le point de terminaison Cosmos DB

Tout d’abord, obtenez le point de terminaison de la base de données Cosmos DB dans Fabric. Ce point de terminaison est requis pour se connecter à l’aide du connecteur Spark Cosmos DB.

  1. Ouvrez le portail Fabric (https://app.fabric.microsoft.com).

  2. Accédez à votre base de données Cosmos DB existante.

  3. Sélectionnez l’option Paramètres dans la barre de menus de la base de données.

    Capture d’écran de l’option de barre de menus « Paramètres » pour une base de données dans le portail Fabric.

  4. Dans la boîte de dialogue paramètres, accédez à la section Connexion . Ensuite, copiez la valeur du champ Point de terminaison pour la base de données NoSQL Cosmos DB. Vous utilisez cette valeur à l’étape ultérieure[s].

    Capture d’écran de la section « Connexion » de la boîte de dialogue « Paramètres » d’une base de données dans le portail Fabric.

Installer le package du Kit de développement logiciel (SDK) Python Cosmos DB

  • Installez le package azure-cosmos dans votre notebook. Il doit s’agir de la version 4.14.0 ou ultérieure.

    Cellule [1] :

    #Install packages
    %pip install azure-cosmos
    

Importer des bibliothèques et définir des valeurs de configuration

  • Importez les packages dans votre bloc-notes. Dans cet exemple et d’autres exemples, nous utilisons la bibliothèque asynchrone pour Cosmos DB. Ensuite, appliquez le point de terminaison Cosmos DB, le nom de la base de données et le nom du conteneur que vous avez enregistrés à l’étape précédente.

    Cellule [2] :

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

Créer des informations d’identification de jeton personnalisées pour s’authentifier

  1. Créez un objet FabricTokenCredential() pour produire un objet d’informations d’identification valide pour le Kit de développement logiciel (SDK) Cosmos DB à partir de la chaîne de jeton générée par les utilitaires d’informations d’identification Fabric NotebookUtils requis pour authentifier un utilisateur.

    [REMARQUE !] Les notebooks Microsoft Fabric ne prennent pas en charge nativement les objets d'informations d'authentification Azure. Vous ne pouvez pas utiliser DefaultAzureCredential() pour vous authentifier auprès de Cosmos DB dans Microsoft Fabric.

    Cellule [3] :

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. Créez un objet client Cosmos DB asynchrone et une référence au conteneur Cosmos DB à utiliser dans un notebook.

    Cellule [4] :

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Créer une fonction asynchrone pour interroger le conteneur Cosmos DB

    Cellule [5] :

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. Appeler la fonction asynchrone nouvellement définie pour retourner les résultats de la requête

    Cellule [6] :

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output