Partager via


Ingérer des données à l’aide de la bibliothèque de nœuds Azure Data Explorer

L’Explorateur de données Azure est un service d’exploration de données rapide et hautement évolutive pour les données des journaux et les données de télémétrie. Azure Data Explorer fournit deux bibliothèques clientes pour Node : une bibliothèque d’ingestion et une bibliothèque de données. Ces bibliothèques vous permettent d’ingérer (charger) des données dans un cluster et d’interroger des données à partir de votre code. Dans cet article, vous créez d’abord une table et un mappage de données dans un cluster de test. Ensuite, vous mettez en file d’attente l’ingestion dans le cluster et validez les résultats.

Si vous n’avez pas d’abonnement Azure, créez un compte Azure gratuit avant de commencer.

Prerequisites

  • Un compte Microsoft ou une identité utilisateur Microsoft Entra. Un abonnement Azure n’est pas requis.
  • Un cluster et une base de données Azure Data Explorer. Créez un cluster et une base de données.
  • Node.js installé sur votre ordinateur de développement

Installer les données et les bibliothèques d’ingestion

Installer azure-kusto-ingest et azure-kusto-data

npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2

Ajouter les constantes et les instructions d’importation

Importer des classes à partir des bibliothèques


const { Client: KustoClient, KustoConnectionStringBuilder } =  require('azure-kusto-data');
const {
    IngestClient: KustoIngestClient,
    IngestionProperties,
    IngestionDescriptors,
    DataFormat,
    IngestionMappingKind,
} =  require("azure-kusto-ingest");

Pour authentifier une application, Azure Data Explorer utilise votre ID de locataire Microsoft Entra. Pour trouver votre ID de locataire, suivez le lien Find your Microsoft 365 tenant ID.

Définissez les valeurs pour authorityId, kustoUrikustoIngestUriet kustoDatabase avant d’exécuter ce code.

const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase  = "Weather";

Maintenant, créez la chaîne de connexion. Cet exemple utilise l’authentification de l’appareil pour accéder au cluster. Vérifiez la sortie de la console pour terminer l’authentification. Vous pouvez également utiliser un certificat d’application Microsoft Entra, une clé d’application et un utilisateur et un mot de passe.

Vous créez la table de destination et le mappage dans une étape ultérieure.

const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";

Définir les informations du fichier source

Importez d’autres classes et définissez des constantes pour le fichier de source de données. Cet exemple utilise un exemple de fichier hébergé sur stockage Blob Azure. L’exemple de jeu de données StormEvents contient des données météorologiques provenant des Centres nationaux d’informations environnementales.

const container = "samplefiles";
const account = "kustosamples";
const sas = "";  // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;

Créer une table sur votre cluster de test

Créez une table qui correspond au schéma des données dans le StormEvents.csv fichier. Lorsque ce code s’exécute, il retourne un message comme suit : Pour vous connecter, utilisez un navigateur web pour ouvrir la page https://microsoft.com/devicelogin et entrer le code XXXXXXXXX pour l’authentification. Suivez les étapes de connexion, puis revenez pour exécuter le bloc de code suivant. Les blocs de code suivants qui effectuent une connexion vous obligeront à vous reconnecter.

const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;

const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);

Définir le mappage d’ingestion

Mappez les données CSV entrantes aux noms de colonnes et aux types de données utilisés lors de la création de la table.

const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;

const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);

Mettre en file d’attente un message pour l’intégration

Mettre en file d’attente un message pour extraire des données du stockage d’objets blob et ingérer ces données dans Azure Data Explorer.

const defaultProps  = new IngestionProperties({
    database: kustoDatabase,
    table: destTable,
    format: DataFormat.CSV,
    ingestionMappingReference: destTableMapping,
    ingestionMappingKind: IngestionMappingKind.CSV,
    additionalProperties: {ignoreFirstRecord: true},
});

const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties

const blobDesc = new BlobDescriptor(blobPath, 10);
try {
	const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
	// Handle errors
}

Valider que la table contient des données

Vérifiez que les données ont été intégrées dans la table. Attendez cinq à dix minutes pour que l’ingestion mise en file d’attente soit planifiée, puis que les données soient chargées dans Azure Data Explorer. Exécutez ensuite le code suivant pour obtenir le nombre d’enregistrements dans la StormEvents table.

const query = `${destTable} | count`;

var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);

Exécuter des requêtes de résolution des problèmes

Connectez-vous à https://dataexplorer.azure.com et accédez à votre cluster. Exécutez la commande suivante dans votre base de données pour voir s’il y a eu des échecs d’ingestion au cours des quatre dernières heures. Remplacez le nom de la base de données avant l’exécution.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Exécutez la commande suivante pour afficher l’état de toutes les opérations d’ingestion au cours des quatre dernières heures. Remplacez le nom de la base de données avant l’exécution.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Nettoyer les ressources

Si vous envisagez de suivre nos autres articles, conservez les ressources que vous avez créées. Si ce n’est pas le cas, exécutez la commande suivante dans votre base de données pour nettoyer la StormEvents table.

.drop table StormEvents