Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Vous pouvez utiliser le Kit de développement logiciel (SDK) Python Cosmos DB dans un notebook Python dans Microsoft Fabric pour lire et interroger des données à partir de Cosmos DB dans Microsoft Fabric. Vous pouvez également créer et gérer des conteneurs Cosmos DB.
L’utilisation du connecteur Spark diffère de l’utilisation de Spark pour lire des données à partir de Cosmos DB dans les données mises en miroir Fabric stockées dans OneLake, car elle se connecte directement au point de terminaison Cosmos DB pour effectuer des opérations.
Conseil / Astuce
Téléchargez l’exemple complet à partir de Cosmos DB dans des exemples Microsoft Fabric sur GitHub.
Prerequisites
Capacité de Fabric existante
- Si vous n’avez pas de capacité Fabric, démarrez un essai Fabric.
Base de données Cosmos DB existante dans Fabric
- Si vous n’en avez pas déjà, créez une base de données Cosmos DB dans Fabric.
Conteneur existant avec des données
- Si vous n’en avez pas encore, nous vous suggérons de charger l’exemple de conteneur de données.
Note
Cet article utilise l’exemple Cosmos DB intégré créé avec un nom de conteneur sampleData.
Récupérer le point de terminaison Cosmos DB
Tout d’abord, obtenez le point de terminaison de la base de données Cosmos DB dans Fabric. Ce point de terminaison est requis pour se connecter à l’aide du connecteur Spark Cosmos DB.
Ouvrez le portail Fabric (https://app.fabric.microsoft.com).
Accédez à votre base de données Cosmos DB existante.
Sélectionnez l’option Paramètres dans la barre de menus de la base de données.
Dans la boîte de dialogue paramètres, accédez à la section Connexion . Ensuite, copiez la valeur du champ Point de terminaison pour la base de données NoSQL Cosmos DB. Vous utilisez cette valeur à l’étape ultérieure[s].
Installer le package du Kit de développement logiciel (SDK) Python Cosmos DB
Installez le package azure-cosmos dans votre notebook. Il doit s’agir de la version 4.14.0 ou ultérieure.
Cellule [1] :
#Install packages %pip install azure-cosmos
Importer des bibliothèques et définir des valeurs de configuration
Importez les packages dans votre bloc-notes. Dans cet exemple et d’autres exemples, nous utilisons la bibliothèque asynchrone pour Cosmos DB. Ensuite, appliquez le point de terminaison Cosmos DB, le nom de la base de données et le nom du conteneur que vous avez enregistrés à l’étape précédente.
Cellule [2] :
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
Créer des informations d’identification de jeton personnalisées pour s’authentifier
Créez un objet FabricTokenCredential() pour produire un objet d’informations d’identification valide pour le Kit de développement logiciel (SDK) Cosmos DB à partir de la chaîne de jeton générée par les utilitaires d’informations d’identification Fabric NotebookUtils requis pour authentifier un utilisateur.
[REMARQUE !] Les notebooks Microsoft Fabric ne prennent pas en charge nativement les objets d'informations d'authentification Azure. Vous ne pouvez pas utiliser
DefaultAzureCredential()pour vous authentifier auprès de Cosmos DB dans Microsoft Fabric.Cellule [3] :
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)Créez un objet client Cosmos DB asynchrone et une référence au conteneur Cosmos DB à utiliser dans un notebook.
Cellule [4] :
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)Créer une fonction asynchrone pour interroger le conteneur Cosmos DB
Cellule [5] :
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raiseAppeler la fonction asynchrone nouvellement définie pour retourner les résultats de la requête
Cellule [6] :
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output