Freigeben über


Aufnehmen von Daten mithilfe der Azure Data Explorer Node-Bibliothek

Azure-Daten-Explorer ist ein schneller und hochgradig skalierbarer Dienst zur Untersuchung von Daten (Protokoll- und Telemetriedaten). Azure Data Explorer stellt zwei Clientbibliotheken für Node bereit: eine Aufnahmebibliothek und eine Datenbibliothek. Mit diesen Bibliotheken können Sie Daten in einen Cluster aufnehmen (laden) und Daten aus Ihrem Code abfragen. In diesem Artikel erstellen Sie zunächst eine Tabelle und Datenzuordnung in einem Testcluster. Anschließend wird die Aufnahme in die Warteschlange an den Cluster gestellt und die Ergebnisse überprüft.

Wenn Sie über kein Azure-Abonnement verfügen, können Sie ein kostenloses Azure-Konto erstellen, bevor Sie beginnen.

Voraussetzungen

  • Ein Microsoft-Konto oder eine Microsoft Entra-Benutzeridentität. Ein Azure-Abonnement ist nicht erforderlich.
  • Schnellstart: Erstellen eines Azure Data Explorer-Clusters und einer Datenbank. Erstellen eines Clusters und einer Datenbank
  • Node.js auf Ihrem Entwicklungscomputer installiert

Daten und Ingest-Bibliotheken installieren

Installieren azure-kusto-ingest und azure-kusto-data

npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2

Hinzufügen von Importanweisungen und Konstanten

Importieren von Klassen aus den Bibliotheken


const { Client: KustoClient, KustoConnectionStringBuilder } =  require('azure-kusto-data');
const {
    IngestClient: KustoIngestClient,
    IngestionProperties,
    IngestionDescriptors,
    DataFormat,
    IngestionMappingKind,
} =  require("azure-kusto-ingest");

Um eine Anwendung zu authentifizieren, verwendet Azure Data Explorer Ihre Microsoft Entra-Mandanten-ID. Um Ihre Mandanten-ID zu finden, folgen Sie Suchen Ihrer Microsoft 365-Mandanten-ID.

Legen Sie die Werte für authorityId, kustoUriund kustoIngestUrikustoDatabase vor dem Ausführen dieses Codes fest.

const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase  = "Weather";

Erstellen Sie nun die Verbindungszeichenfolge. In diesem Beispiel wird die Geräteauthentifizierung zum Zugreifen auf den Cluster verwendet. Überprüfen Sie die Konsolenausgabe, um die Authentifizierung abzuschließen. Sie können auch ein Microsoft Entra-Anwendungszertifikat, einen Anwendungsschlüssel und ein Benutzer- und Kennwort verwenden.

Sie erstellen die Zieltabelle und die Zuordnung in einem späteren Schritt.

const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";

Festlegen von Quelldateiinformationen

Importieren Sie weitere Klassen, und legen Sie Konstanten für die Datenquellendatei fest. In diesem Beispiel wird eine Beispieldatei verwendet, die in Azure Blob Storage gehostet wird. Das StormEvents-Beispiel-Dataset enthält Wetterdaten aus den Nationalen Zentren für Umweltinformationen.

const container = "samplefiles";
const account = "kustosamples";
const sas = "";  // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;

Erstellen einer Tabelle im Testcluster

Erstellen Sie eine Tabelle, die dem Schema der Daten in der StormEvents.csv Datei entspricht. Wenn dieser Code ausgeführt wird, wird eine Meldung wie die folgende zurückgegeben: Um sich anzumelden, verwenden Sie einen Webbrowser, um die Seite https://microsoft.com/devicelogin zu öffnen, und geben Sie den Code XXXXXXX ein, um sich zu authentifizieren. Führen Sie die Schritte zum Anmelden aus, und kehren Sie dann zum Ausführen des nächsten Codeblocks zurück. Bei nachfolgenden Codeblöcken, die eine Verbindung herstellen, müssen Sie sich erneut anmelden.

const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;

const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);

Definieren der Erfassungszuordnung

Ordnen Sie eingehende CSV-Daten den Spaltennamen und Datentypen zu, die beim Erstellen der Tabelle verwendet werden.

const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;

const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);

Eine Nachricht in die Warteschlange stellen für die Aufnahme

Eine Nachricht in die Warteschlange stellen, um Daten aus dem Blob-Speicher abzurufen und diese in den Azure Data Explorer zu übernehmen.

const defaultProps  = new IngestionProperties({
    database: kustoDatabase,
    table: destTable,
    format: DataFormat.CSV,
    ingestionMappingReference: destTableMapping,
    ingestionMappingKind: IngestionMappingKind.CSV,
    additionalProperties: {ignoreFirstRecord: true},
});

const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties

const blobDesc = new BlobDescriptor(blobPath, 10);
try {
	const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
	// Handle errors
}

Überprüfen, ob die Tabelle Daten enthält

Überprüfen Sie, ob die Daten in die Tabelle aufgenommen wurden. Warten Sie fünf bis zehn Minuten, bis die in die Warteschlange eingereiht wurde, um die Aufnahme zu planen und die Daten in Azure-Daten-Explorer zu laden. Führen Sie dann den folgenden Code aus, um die Anzahl der Datensätze in der StormEvents Tabelle abzurufen.

const query = `${destTable} | count`;

var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);

Ausführen von Problembehandlungsabfragen

Melden Sie sich bei https://dataexplorer.azure.com an und verbinden Sie sich mit Ihrem Cluster. Führen Sie den folgenden Befehl in Ihrer Datenbank aus, um festzustellen, ob in den letzten vier Stunden Fehler bei der Datenaufnahme aufgetreten sind. Ersetzen Sie den Datenbanknamen vor der Ausführung.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Führen Sie den folgenden Befehl aus, um den Status aller Ingestion-Vorgänge in den letzten vier Stunden anzuzeigen. Ersetzen Sie den Datenbanknamen vor der Ausführung.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Bereinigen von Ressourcen

Wenn Sie beabsichtigen, unseren anderen Artikeln zu folgen, behalten Sie die von Ihnen erstellten Ressourcen bei. Wenn nicht, führen Sie den folgenden Befehl in Ihrer Datenbank aus, um die StormEvents Tabelle zu bereinigen.

.drop table StormEvents