Partilhar via


Ingerir dados usando a biblioteca Node.js do Azure Data Explorer

O Azure Data Explorer é um serviço de exploração de dados rápido e altamente escalável para dados de log e telemetria. O Azure Data Explorer fornece duas bibliotecas de cliente para Node: uma biblioteca de ingestão e uma biblioteca de dados. Essas bibliotecas permitem que você ingira (carregue) dados em um cluster e consulte dados do seu código. Neste artigo, você primeiro cria uma tabela e um mapeamento de dados em um cluster de teste. Em seguida, você enfileira a ingestão no cluster e valida os resultados.

Se não tiver uma subscrição do Azure, crie uma conta do Azure gratuita antes de começar.

Pré-requisitos

  • Uma conta Microsoft ou uma identidade de utilizador do Microsoft Entra. Uma assinatura do Azure não é necessária.
  • Um cluster e um banco de dados do Azure Data Explorer. Crie um cluster e um banco de dados.
  • Node.js instalado no computador de desenvolvimento

Instalar as bibliotecas de dados e de ingestão de dados

Instalar azure-kusto-ingest e azure-kusto-data

npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2

Adicionar instruções de importação e constantes

Importar classes das bibliotecas


const { Client: KustoClient, KustoConnectionStringBuilder } =  require('azure-kusto-data');
const {
    IngestClient: KustoIngestClient,
    IngestionProperties,
    IngestionDescriptors,
    DataFormat,
    IngestionMappingKind,
} =  require("azure-kusto-ingest");

Para autenticar um aplicativo, o Azure Data Explorer usa sua ID de locatário do Microsoft Entra. Para encontrar a sua ID de inquilino, siga Localizar a sua ID de inquilino do Microsoft 365.

Defina os valores para authorityId, kustoUrikustoIngestUri e antes kustoDatabase de executar este código.

const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase  = "Weather";

Agora construa a cadeia de conexão. Este exemplo usa a autenticação de dispositivo para acessar o cluster. Verifique a saída do console para concluir a autenticação. Você também pode usar um certificado de aplicativo Microsoft Entra, chave de aplicativo e usuário e senha.

Você cria a tabela de destino e o mapeamento em uma etapa posterior.

const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";

Definir informações do arquivo de origem

Importe mais classes e defina constantes para o arquivo de fonte de dados. Este exemplo usa um arquivo de exemplo hospedado no Armazenamento de Blob do Azure. O conjunto de dados de exemplo StormEvents contém dados relacionados a eventos climatéricos dos Centros Nacionais de Informação Ambiental.

const container = "samplefiles";
const account = "kustosamples";
const sas = "";  // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;

Criar uma tabela no cluster de teste

Crie uma tabela que corresponda ao esquema dos dados no StormEvents.csv arquivo. Quando esse código é executado, ele retorna uma mensagem como a seguinte: Para entrar, use um navegador da Web para abrir a página https://microsoft.com/devicelogin e digite o código XXXXXXXXX para autenticar. Siga os passos para iniciar sessão e, em seguida, regresse para executar o próximo bloco de código. Os blocos de código subsequentes que fazem uma conexão exigirão que você entre novamente.

const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;

const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);

Definir o mapeamento de ingestão

Mapeie os dados CSV de entrada para os nomes de colunas e tipos de dados usados ao criar a tabela.

const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;

const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);

Enfileirar uma mensagem para ingestão

Enfileire uma mensagem para extrair dados do armazenamento de blob e ingerir esses dados no Azure Data Explorer.

const defaultProps  = new IngestionProperties({
    database: kustoDatabase,
    table: destTable,
    format: DataFormat.CSV,
    ingestionMappingReference: destTableMapping,
    ingestionMappingKind: IngestionMappingKind.CSV,
    additionalProperties: {ignoreFirstRecord: true},
});

const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties

const blobDesc = new BlobDescriptor(blobPath, 10);
try {
	const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
	// Handle errors
}

Validar que a tabela contém dados

Confirme se os dados foram inseridos na tabela. Aguarde cinco a dez minutos para que o processo de ingestão em fila agende e carregue os dados no Azure Data Explorer. Em seguida, execute o código a seguir para obter a contagem de registros na StormEvents tabela.

const query = `${destTable} | count`;

var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);

Executar consultas de solução de problemas

Inicie sessão https://dataexplorer.azure.com e ligue-se ao cluster. Execute o seguinte comando em seu banco de dados para ver se houve alguma falha de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes de executar.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Execute o seguinte comando para visualizar o status de todas as operações de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes de executar.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Limpeza de recursos

Se você pretende seguir nossos outros artigos, mantenha os recursos que você criou. Caso contrário, execute o seguinte comando no banco de dados para limpar a StormEvents tabela.

.drop table StormEvents