在本文中,您會使用 Azure 數據總管 Python 連結庫內嵌數據。 Azure 資料總管是一項快速且可高度調整的資料探索服務,可用於處理記錄和遙測資料。 Azure 數據探索者提供兩個適用於 Python 的用戶端函式庫:資料導入函式庫 和 資料函式庫。 這些連結庫可讓您擷取或載入叢集中的數據,並從程式代碼查詢數據。
首先,在叢集中建立數據表和數據對應。 接著,您會將資料擷取排入佇列以進行叢集處理,並驗證結果。
先決條件
- Microsoft帳戶或Microsoft Entra 使用者身分識別。 不需要 Azure 訂用帳戶。
- Azure Data Explorer 叢集和資料庫。 建立叢集和資料庫。
- Python 3.4+。
安裝資料和導入函式庫
安裝 azure-kusto-data 和 azure-kusto-ingest。
pip install azure-kusto-data
pip install azure-kusto-ingest
新增匯入語句和常數
從 azure-kusto-data 匯入類別。
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
若要驗證應用程式,Azure 數據總管會使用您的Microsoft Entra 租使用者標識碼。 若要尋找您的租使用者標識碼,請使用下列URL,以取代 YourDomain的網域。
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
例如,如果您的網域 contoso.com,則URL為: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/。 按兩下此網址以查看結果;第一行如下所示。
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
在此情況下,租使用者識別碼為 aaaabbbb-0000-cccc-1111-dddd2222eeee。 在執行此程式代碼之前,請先設定AAD_TENANT_ID、KUSTO_URI、KUSTO_INGEST_URI和KUSTO_DATABASE的值。
AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"
現在建構連接字串。 下列範例會使用裝置驗證來存取叢集。 您也可以使用受控識別驗證、Microsoft Entra 應用程式憑證、Microsoft Entra 應用程式金鑰,以及Microsoft Entra 使用者和密碼。
您會在稍後的步驟中建立目的資料表和映射。
KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_INGEST_URI)
KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_URI)
DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"
設定來源檔案資訊
匯入其他類別,並設定數據源檔案的常數。 此範例會使用裝載在 Azure Blob 記憶體上的範例檔案。 StormEvents 範例數據集包含來自國家環境資訊中心的天氣相關數據。
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod
CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = "" # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321 # in bytes
BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
CONTAINER + "/" + FILE_PATH + SAS_TOKEN
在叢集上建立數據表
建立符合 StormEvents.csv 檔案中數據架構的數據表。 當此程式代碼執行時,它會傳回類似下列訊息的訊息: 若要登入,請使用網頁瀏覽器開啟頁面 https://microsoft.com/devicelogin ,並輸入程式代碼F3W4VWZDM進行驗證。 請遵循步驟登入,然後返回執行下一個程式代碼區塊。 建立連線的後續程式代碼區塊會要求您再次登入。
KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
定義擷取對應
將傳入 CSV 數據對應至建立資料表時所使用的數據行名稱和數據類型。 這會將源數據欄位對應至目的地數據表數據行
CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
將訊息排入佇列以便處理
將訊息排入佇列,以從 Blob 記憶體提取數據,並將該數據內嵌至 Azure 數據總管。
INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)
# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
print('Done queuing up ingestion with Azure Data Explorer')
查詢匯入至數據表的數據
等待 5 到 10 分鐘,以便排定佇列中的資料擷取,並將資料載入 Azure 數據總管。 然後執行下列程序代碼,以取得 StormEvents 數據表中的記錄計數。
QUERY = "StormEvents | count"
RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)
dataframe_from_result_table(RESPONSE.primary_results[0])
執行疑難排解查詢
登入 https://dataexplorer.azure.com 然後連線到您的叢集。 在資料庫中執行下列命令,以查看過去四小時內是否有任何擷取失敗。 在執行之前,請先取代資料庫名稱。
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
執行下列命令,以檢視過去四小時內所有擷取作業的狀態。 在執行之前,請先取代資料庫名稱。
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
清理資源
如果您打算遵循我們的其他文章,請保留您所建立的資源。 如果沒有,請在資料庫中執行下列命令,以清除 StormEvents 數據表。
.drop table StormEvents