在本文中,您將瞭解如何使用事件方格數據連線,將記憶體帳戶中的 Blob 內嵌至 Azure 數據總管。 您將建立設定 Azure 事件方格 訂用帳戶的事件方格資料連線。 事件方格訂用帳戶會透過 Azure 事件中樞,將事件從記憶體帳戶路由傳送至 Azure 資料總管。
若要瞭解如何在 Azure 入口網站 或 ARM 範本中建立連線,請參閱建立事件方格數據連線。
如需從事件方格擷取至 Azure 數據總管的一般資訊,請參閱 連線至事件方格。
注意
若要使用事件方格聯機達到最佳效能,請透過 Blob 元數據設定 rawSizeBytes 擷取屬性。 如需詳細資訊,請參閱 擷取屬性。
如需以舊版 SDK 為基礎的程式代碼範例,請參閱 封存一文。
必要條件
建立事件方格資料連線
在本節中,您將建立事件方格與 Azure 數據總管數據表之間的連線。
安裝 Microsoft.Azure.Management.Kusto NuGet 套件。
建立要用於驗證的Microsoft Entra 應用程式主體 。 您將需要目錄(租使用者)識別碼、應用程式識別碼和客戶端密碼。
執行下列程式碼。
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Application ID
var clientSecret = "PlaceholderClientSecret"; //Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var credentials = new ClientSecretCredential(tenantId, clientId, clientSecret);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var database = (await cluster.GetKustoDatabaseAsync(databaseName)).Value;
var dataConnections = database.GetKustoDataConnections();
var eventGridConnectionName = "myeventgridconnect";
//The event hub and storage account that are created as part of the Prerequisites
var eventHubResourceId = new ResourceIdentifier("/subscriptions/<storageAccountSubscriptionId>/resourceGroups/<storageAccountResourceGroupName>/providers/Microsoft.Storage/storageAccounts/<storageAccountName>");
var storageAccountResourceId = new ResourceIdentifier("/subscriptions/<eventHubSubscriptionId>/resourceGroups/<eventHubResourceGroupName>/providers/Microsoft.EventHub/namespaces/<eventHubNamespaceName>/eventhubs/<eventHubName>");
var consumerGroup = "$Default";
var location = AzureLocation.CentralUS;
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = KustoEventGridDataFormat.Csv;
var blobStorageEventType = BlobStorageEventType.MicrosoftStorageBlobCreated;
var databaseRouting = KustoDatabaseRouting.Multi;
var eventGridConnectionData = new KustoEventGridDataConnection
{
StorageAccountResourceId = storageAccountResourceId, EventHubResourceId = eventHubResourceId,
ConsumerGroup = consumerGroup, TableName = tableName, Location = location, MappingRuleName = mappingRuleName,
DataFormat = dataFormat, BlobStorageEventType = blobStorageEventType, DatabaseRouting = databaseRouting
};
await dataConnections.CreateOrUpdateAsync(WaitUntil.Completed, eventGridConnectionName, eventGridConnectionData);
|
設定 |
建議的值 |
欄位描述 |
| tenantId |
xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx |
您的租用戶識別碼。 也稱為目錄標識碼。 |
| subscriptionId |
xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx |
您用來建立資源的訂用帳戶標識碼。 |
| clientId |
xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx |
應用程式可存取租用戶中資源的用戶端標識碼。 |
| clientSecret |
PlaceholderClientSecret |
應用程式可存取租用戶中資源的客戶端密碼。 |
| resourceGroupName |
testrg |
包含叢集的資源群組名稱。 |
| clusterName |
mykustocluster |
您叢集的名稱。 |
| databaseName |
mykustodatabase |
叢集中目標資料庫的名稱。 |
| eventGridConnectionName |
myeventgridconnect |
數據連線所需的名稱。 |
| tableName |
StormEvents |
目標資料庫中的目標數據表名稱。 |
| mappingRuleName |
StormEvents_CSV_Mapping |
與目標數據表相關的數據行對應名稱。 |
| dataFormat |
csv |
訊息的數據格式。 |
| eventHubResourceId |
資源識別碼 |
事件方格設定為傳送事件的事件中樞資源標識符。 |
| storageAccountResourceId |
資源識別碼 |
記憶體帳戶的資源標識碼,此帳戶會保存要擷取的數據。 |
| consumerGroup |
$Default |
事件中樞的取用者群組。 |
| 位置 |
美國中部 |
數據連線資源的位置。 |
| blobStorageEventType |
Microsoft.Storage.BlobCreated |
觸發擷取的事件類別。 支援的事件包括:Microsoft.Storage.BlobCreated 或 Microsoft.Storage.BlobRenamed。 只有ADLSv2記憶體才支援 Blob 重新命名。 |
| databaseRouting |
多重 或 單一 |
線上的資料庫路由。 如果您將值設定為Single,數據連線將會路由傳送至叢集中的單一資料庫,如databaseName設定中所指定。 如果您將值設定為 Multi,您可以使用資料庫擷取屬性覆寫預設目標資料庫。 如需詳細資訊,請參閱 事件路由。 |
安裝必要的程式庫。
pip install azure-common
pip install azure-mgmt-kusto
建立要用於驗證的Microsoft Entra 應用程式主體 。 您將需要目錄(租使用者)識別碼、應用程式識別碼和客戶端密碼。
執行下列程式碼。
from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventGridDataConnection
from azure.common.credentials import ServicePrincipalCredentials
#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
kusto_management_client = KustoManagementClient(credentials, subscription_id)
resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub and storage account that are created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx"
storage_account_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.Storage/storageAccounts/xxxxxx"
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
database_routing = "Multi"
blob_storage_event_type = "Microsoft.Storage.BlobCreated"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.begin_create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
parameters=EventGridDataConnection(storage_account_resource_id=storage_account_resource_id, event_hub_resource_id=event_hub_resource_id,
consumer_group=consumer_group, table_name=table_name, location=location, mapping_rule_name=mapping_rule_name, data_format=data_format, database_routing=database_routing,
blob_storage_event_type=blob_storage_event_type))
# The creation of the connection is async. Validation errors are only visible if you wait for the results.
poller.wait()
print(poller.result())
|
設定 |
建議的值 |
欄位描述 |
| tenant_id |
xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx |
您的租用戶識別碼。 也稱為目錄標識碼。 |
| subscription_id |
xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx |
您用來建立資源的訂用帳戶標識碼。 |
| client_id |
xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx |
應用程式可存取租用戶中資源的用戶端標識碼。 |
| client_secret |
xxxxxxxx |
應用程式可存取租用戶中資源的客戶端密碼。 |
| resource_group_name |
testrg |
包含叢集的資源群組名稱。 |
| cluster_name |
mykustocluster |
您叢集的名稱。 |
| database_name |
mykustodatabase |
叢集中目標資料庫的名稱。 |
| data_connection_name |
myeventhubconnect |
數據連線所需的名稱。 |
| table_name |
StormEvents |
目標資料庫中的目標數據表名稱。 |
| mapping_rule_name |
StormEvents_CSV_Mapping |
與目標數據表相關的數據行對應名稱。 |
| database_routing |
多重 或 單一 |
線上的資料庫路由。 如果您將值設定為Single,數據連線將會路由傳送至叢集中的單一資料庫,如databaseName設定中所指定。 如果您將值設定為 Multi,您可以使用資料庫擷取屬性覆寫預設目標資料庫。 如需詳細資訊,請參閱 事件路由。 |
| data_format |
csv |
訊息的數據格式。 |
| event_hub_resource_id |
資源識別碼 |
事件方格設定為傳送事件的事件中樞資源標識符。 |
| storage_account_resource_id |
資源識別碼 |
記憶體帳戶的資源標識碼,此帳戶會保存要擷取的數據。 |
| consumer_group |
$Default |
事件中樞的取用者群組。 |
| 位置 |
美國中部 |
數據連線資源的位置。 |
| blob_storage_event_type |
Microsoft.Storage.BlobCreated |
觸發擷取的事件類別。 支援的事件包括:Microsoft.Storage.BlobCreated 或 Microsoft.Storage.BlobRenamed。 只有ADLSv2記憶體才支援 Blob 重新命名。 |
使用事件方格數據連線
本節說明如何在 Blob 建立或 Blob 重新命名之後,從 Azure Blob 儲存體 或 Azure Data Lake Gen 2 觸發擷取至叢集。
根據用來上傳 Blob 的記憶體 SDK 類型,選取相關的索引標籤。
下列程式代碼範例會使用 Azure Blob 儲存體 SDK 將檔案上傳至 Azure Blob 儲存體。 上傳會觸發事件方格數據連線,以將數據內嵌至 Azure 數據總管。
var azureStorageAccountConnectionString=<storage_account_connection_string>;
var containerName = <container_name>;
var blobName = <blob_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mappingReference>;
// Create a new container in your storage account.
var azureStorageAccount = CloudStorageAccount.Parse(azureStorageAccountConnectionString);
var blobClient = azureStorageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
container.CreateIfNotExists();
// Set metadata and upload a file to the blob.
var blob = container.GetBlockBlobReference(blobName);
blob.Metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
blob.Metadata.Add("kustoIngestionMappingReference", mapping);
blob.UploadFromFile(localFileName);
// Confirm success of the upload by listing the blobs in your container.
var blobs = container.ListBlobs();
下列程式代碼範例會 使用 Azure Data Lake SDK 將檔案上傳至 Data Lake Storage Gen2。 上傳會觸發事件方格數據連線,以將數據內嵌至 Azure 數據總管。
var accountName = <storage_account_name>;
var accountKey = <storage_account_key>;
var fileSystemName = <file_system_name>;
var fileName = <file_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mapping_reference>;
var sharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
var dfsUri = "https://" + accountName + ".dfs.core.windows.net";
var dataLakeServiceClient = new DataLakeServiceClient(new Uri(dfsUri), sharedKeyCredential);
// Create the filesystem.
var dataLakeFileSystemClient = dataLakeServiceClient.CreateFileSystem(fileSystemName).Value;
// Define file metadata and uploading options.
IDictionary<String, String> metadata = new Dictionary<string, string>();
metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
metadata.Add("kustoIngestionMappingReference", mapping);
var uploadOptions = new DataLakeFileUploadOptions
{
Metadata = metadata,
Close = true // Note: The close option triggers the event being processed by the data connection.
};
// Write to the file.
var dataLakeFileClient = dataLakeFileSystemClient.GetFileClient(fileName);
dataLakeFileClient.Upload(localFileName, uploadOptions);
注意
- 使用 Azure Data Lake SDK 上傳檔案時,初始檔案建立事件的大小為 0,在數據擷取期間,Azure 數據總管會忽略此大小。 若要確保適當的擷取,請將
Close 參數設定為 true。 此參數會導致 upload 方法觸發 FlushAndClose 事件,指出已進行最終更新並關閉檔案數據流。
- 若要在將事件擷取至 Azure 數據總管時減少來自 Event Grid 和後續處理的流量,建議您 篩選data.api 金鑰以只包含 FlushAndClose 事件,藉此移除大小為 0 的檔案建立事件。 如需排清的詳細資訊,請參閱 Azure Data Lake flush 方法。
重新命名 Blob
在ADLSv2中,可以重新命名目錄。 不過,請務必注意,重新命名目錄不會觸發 Blob 重新命名事件,或起始目錄中所包含的 Blob 擷取。 如果您想要在重新命名目錄之後確保 Blob 的擷取,您應該直接重新命名目錄中的個別 Blob。
下列程式代碼範例示範如何在ADLSv2儲存器帳戶中重新命名 Blob。
var accountName = <storage_account_name>;
var accountKey = <storage_account_key>;
var fileSystemName = <file_system_name>;
var sourceFilePath = <source_file_path>;
var destinationFilePath = <destination_file_path>;
var sharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
var dfsUri = "https://" + accountName + ".dfs.core.windows.net";
var dataLakeServiceClient = new DataLakeServiceClient(new Uri(dfsUri), sharedKeyCredential);
// Get a client to the the filesystem
var dataLakeFileSystemClient = dataLakeServiceClient.GetFileSystemClient(fileSystemName);
// Rename a file in the file system
var dataLakeFileClient = dataLakeFileSystemClient.GetFileClient(sourceFilePath);
dataLakeFileClient.Rename(destinationFilePath);
注意
如果您在建立數據連線或手動建立事件方格資源時定義篩選來追蹤特定主題,這些篩選會套用至目的地檔案路徑。
注意
在作業之後 CopyBlob 觸發擷取,不支援已啟用階層命名空間功能的記憶體帳戶。
重要
我們強烈建議您不要從自定義程式代碼產生記憶體事件,並將其傳送至事件中樞。 如果您選擇這樣做,請確定產生的事件嚴格遵守適當的記憶體事件架構和 JSON 格式規格。
拿掉事件方格數據連線
若要移除事件方格連線,請執行下列命令:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);
若要移除事件方格連線,請執行下列命令:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)
相關內容