Partager via


Ingérer des données à l’aide de la bibliothèque Python d’Azure Data Explorer

Dans cet article, vous ingérez des données à l’aide de la bibliothèque Azure Data Explorer pour Python. L’Explorateur de données Azure est un service d’exploration de données rapide et hautement évolutif pour les logs et les données de télémétrie. Azure Data Explorer fournit deux bibliothèques clientes pour Python : une bibliothèque d’ingestion et une bibliothèque de données. Ces bibliothèques vous permettent d’ingérer ou de charger des données dans un cluster et d’interroger des données à partir de votre code.

Tout d’abord, créez un mappage de table et de données dans un cluster. Ensuite, mettez en file d'attente l'ingestion sur le cluster et validez les résultats.

Conditions préalables

Installer les données et les bibliothèques d’ingestion

Installez azure-kusto-data et azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Ajouter des instructions d'importation et des constantes

Importez des classes à partir d’azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Pour authentifier une application, Azure Data Explorer utilise votre ID de locataire Microsoft Entra. Pour rechercher votre ID de locataire, utilisez l’URL suivante, en remplaçant votre domaine pour VotreDomain.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Par exemple, si votre domaine est contoso.com, l’URL est : https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Cliquez sur cette URL pour afficher les résultats ; la première ligne est la suivante.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

L’ID de locataire dans ce cas est aaaabbbb-0000-cccc-1111-dddd2222eeee. Définissez les valeurs des AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI et KUSTO_DATABASE avant d’exécuter ce code.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

Construisez maintenant la chaîne de connexion. L’exemple suivant utilise l’authentification de l’appareil pour accéder au cluster. Vous pouvez également utiliser l’authentification d’identité gérée, le certificat d’application Microsoft Entra, la clé d’application Microsoft Entra, et l’utilisateur et le mot de passe Microsoft Entra.

Vous créez la table de destination et le mappage dans une étape ultérieure.

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Définir les informations du fichier source

Importez des classes supplémentaires et définissez des constantes pour le fichier de source de données. Cet exemple utilise un exemple de fichier hébergé sur stockage Blob Azure. L’exemple de jeu de données StormEvents contient des données météorologiques provenant des Centres nationaux d’informations environnementales.

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Créer une table sur votre cluster

Créez une table qui correspond au schéma des données dans le fichier StormEvents.csv. Lorsque ce code s’exécute, il retourne un message semblable au message suivant : Pour vous connecter, utilisez un navigateur web pour ouvrir la page https://microsoft.com/devicelogin et entrez le code F3W4VWZDM à authentifier. Suivez les étapes de connexion, puis revenez pour exécuter le bloc de code suivant. Les blocs de code suivants qui établissent une connexion nécessitent que vous vous reconnectiez.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Définir le mappage d’ingestion

Mappez les données CSV entrantes aux noms de colonnes et aux types de données utilisés lors de la création de la table. Cela mappe les champs de données sources aux colonnes de la table de destination

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Mettre en file d’attente un message pour l’ingestion

Mettre un message en file d'attente pour récupérer des données depuis le stockage d'objets blob et ingérer ces données dans Azure Data Explorer.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Interroger les données qui ont été importées dans la table

Attendez cinq à 10 minutes afin que l’ingestion mise en file d’attente puisse planifier l’ingestion et que les données soient chargées dans Azure Data Explorer. Exécutez ensuite le code suivant pour obtenir le nombre d’enregistrements dans la table StormEvents.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Exécuter des requêtes de résolution des problèmes

Connectez-vous à https://dataexplorer.azure.com et connectez-vous à votre cluster. Exécutez la commande suivante dans votre base de données pour voir s’il y a eu des échecs d’ingestion au cours des quatre dernières heures. Remplacez le nom de la base de données avant l’exécution.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Exécutez la commande suivante pour afficher l’état de toutes les opérations d’ingestion au cours des quatre dernières heures. Remplacez le nom de la base de données avant l’exécution.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Nettoyer les ressources

Si vous envisagez de suivre nos autres articles, conservez les ressources que vous avez créées. Si ce n’est pas le cas, exécutez la commande suivante dans votre base de données pour nettoyer la table StormEvents.

.drop table StormEvents

Étape suivante