你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用 Azure 数据资源管理器节点库引入数据

Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。 Azure 数据资源管理器为 Node 提供两个客户端库: 引入库数据库。 通过这些库,可以将数据引入群集并查询代码中的数据。 在本文中,首先在测试群集中创建表和数据映射。 然后,将数据导入排队至群集并验证结果。

如果没有 Azure 订阅,请在开始之前创建 一个免费的 Azure 帐户

先决条件

  • Microsoft 帐户或 Microsoft Entra 用户标识。 无需 Azure 订阅。
  • Azure 数据资源管理器群集和数据库。 创建群集和数据库
  • 已在开发计算机上安装Node.js

安装数据和引入库

安装 azure-kusto-ingestazure-kusto-data

npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2

请添加导入语句和常量

从库导入类


const { Client: KustoClient, KustoConnectionStringBuilder } =  require('azure-kusto-data');
const {
    IngestClient: KustoIngestClient,
    IngestionProperties,
    IngestionDescriptors,
    DataFormat,
    IngestionMappingKind,
} =  require("azure-kusto-ingest");

若要对应用程序进行身份验证,Azure 数据资源管理器使用 Microsoft Entra 租户 ID。 若要查找租户 ID,请按照 查找 Microsoft 365 租户 ID

在运行此代码之前设置authorityIdkustoUrikustoIngestUrikustoDatabase的值。

const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase  = "Weather";

现在构造连接字符串。 此示例使用设备身份验证访问群集。 检查控制台输出以完成身份验证。 还可以使用 Microsoft Entra 应用程序证书、应用程序密钥和用户和密码。

在后面的步骤中创建目标表和映射。

const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";

设置源文件信息

导入更多类并为数据源文件设置常量。 此示例使用 Azure Blob 存储上托管的示例文件。 StormEvents 示例数据集包含来自国家环境信息中心的天气相关数据。

const container = "samplefiles";
const account = "kustosamples";
const sas = "";  // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;

在测试群集上创建表

创建与文件中数据的 StormEvents.csv 架构匹配的表。 运行此代码时,它将返回如下所示的消息: 若要登录,请使用 Web 浏览器打开页面 https://microsoft.com/devicelogin 并输入代码 XXXXXXXXX 进行身份验证。 按照步骤登录,然后返回运行下一个代码块。 建立连接的后续代码块将要求你重新登录。

const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;

const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);

定义引入映射

将传入 CSV 数据映射到创建表时使用的列名称和数据类型。

const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;

const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);

对消息进行排队以引入

将消息排入队列,以便从 Blob 存储拉取数据,并将该数据引入 Azure 数据资源管理器。

const defaultProps  = new IngestionProperties({
    database: kustoDatabase,
    table: destTable,
    format: DataFormat.CSV,
    ingestionMappingReference: destTableMapping,
    ingestionMappingKind: IngestionMappingKind.CSV,
    additionalProperties: {ignoreFirstRecord: true},
});

const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties

const blobDesc = new BlobDescriptor(blobPath, 10);
try {
	const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
	// Handle errors
}

验证表是否包含数据

验证数据是否已引入表中。 请等待5到10分钟,排队的引入过程将被安排并将数据加载到Azure数据资源管理器中。 然后运行以下代码以获取StormEvents表中的记录计数。

const query = `${destTable} | count`;

var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);

运行故障排除查询

登录https://dataexplorer.azure.com并连接到群集。 在数据库中运行以下命令,查看过去四小时内是否有任何引入失败。 在运行之前替换数据库名称。

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

运行以下命令以查看过去四小时内所有引入操作的状态。 在运行之前替换数据库名称。

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

清理资源

如果打算遵循我们的其他文章,请保留所创建的资源。 否则,请在数据库中运行以下命令以清理 StormEvents 表。

.drop table StormEvents