共用方式為


使用 Azure 資料總管節點程式庫擷取資料

Azure 資料總管是快速及可調整的資料探索服務,以取得記錄和遙測資料。 Azure 資料總管提供兩個 Node 用戶端程式庫: 擷取程式庫資料程式庫。 這些程式庫可讓您將資料擷取 (載入) 到叢集中,並從程式碼中查詢資料。 在本文中,您會先在測試叢集中建立資料表和資料對應。 然後,您將資料引入排入叢集並驗證結果。

如果您沒有 Azure 訂用帳戶,請在開始之前建立 免費的 Azure 帳戶

先決條件

  • Microsoft帳戶或Microsoft Entra 使用者身分識別。 不需要 Azure 訂用帳戶。
  • Azure 資料總管叢集和資料庫。 建立叢集和資料庫
  • Node.js 安裝在您的開發電腦上

安裝資料和匯入程式庫

安裝 azure-kusto-ingestazure-kusto-data

npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2

新增匯入語句和常數

從程式庫匯入類別


const { Client: KustoClient, KustoConnectionStringBuilder } =  require('azure-kusto-data');
const {
    IngestClient: KustoIngestClient,
    IngestionProperties,
    IngestionDescriptors,
    DataFormat,
    IngestionMappingKind,
} =  require("azure-kusto-ingest");

若要驗證應用程式,Azure 數據總管會使用您的Microsoft Entra 租使用者標識碼。 若要尋找您的租用戶識別碼,請遵循 尋找您的 Microsoft 365 租用戶識別碼

在執行此程式碼之前,先設定 、 authorityIdkustoUrikustoIngestUrikustoDatabase值。

const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase  = "Weather";

現在建構 連接字串。 此範例會使用裝置驗證來存取叢集。 檢查主控台輸出以完成身份驗證。 您也可以使用 Microsoft Entra 應用程式憑證、應用程式金鑰,以及使用者和密碼。

您在稍後的步驟中建立目的地表格和映射。

const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";

設定來源檔案資訊

匯入更多類別,並設定資料來源檔案的常數。 此範例會使用裝載在 Azure Blob 儲存體上的範例檔案。 StormEvents 範例資料集包含來自國家環境資訊中心的天氣相關資料。

const container = "samplefiles";
const account = "kustosamples";
const sas = "";  // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;

在測試叢集上建立數據表

建立符合檔案中 StormEvents.csv 資料結構描述的資料表。 當此程式碼執行時,它會傳回如下所示的訊息: 若要登入,請使用 Web 瀏覽器開啟頁面 https://microsoft.com/devicelogin 並輸入代碼 XXXXXXXXX 進行驗證。 請遵循步驟登入,然後返回執行下一個程式碼區塊。 建立連線的後續程式碼區塊將需要您重新登入。

const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;

const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);

定義擷取對應

將傳入的 CSV 資料對應至建立表格時所使用的欄名稱和資料類型。

const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;

const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);

將訊息排入佇列以進行導入

將訊息排入佇列,以從 Blob 儲存體提取資料,並將該資料內嵌至 Azure 資料總管。

const defaultProps  = new IngestionProperties({
    database: kustoDatabase,
    table: destTable,
    format: DataFormat.CSV,
    ingestionMappingReference: destTableMapping,
    ingestionMappingKind: IngestionMappingKind.CSV,
    additionalProperties: {ignoreFirstRecord: true},
});

const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties

const blobDesc = new BlobDescriptor(blobPath, 10);
try {
	const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
	// Handle errors
}

驗證資料表是否包含資料

驗證資料是否已擷取到資料表中。 等候大約五到十分鐘,讓佇列中的資料擷取完成排程,然後將資料載入 Azure 資料總管。 然後執行以下程式碼來取得表中的 StormEvents 記錄數。

const query = `${destTable} | count`;

var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);

執行疑難排解查詢

登入 https://dataexplorer.azure.com 並連線到您的叢集。 在資料庫中執行下列命令,以查看過去四小時內是否有任何擷取失敗。 在執行之前取代資料庫名稱。

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

執行下列命令,以檢視過去四小時內所有擷取作業的狀態。 在執行之前取代資料庫名稱。

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

清理資源

如果您打算關注我們的其他文章,請保留您創建的資源。 如果沒有,請在資料庫中執行下列命令來清除 StormEvents 資料表。

.drop table StormEvents