引入和查询之间需要低延迟时,流式引入可用于加载数据。 请考虑在以下方案中使用流式引入:
- 需要小于一秒的延迟。
- 需要优化多个表的操作处理,其中进入每个表的数据流相对较小(每秒几条记录),但总体数据引入量较大(每秒成千上万条记录)。
如果每个表中的数据流较高(每小时超过 4 GB),请考虑使用 批处理引入。
若要详细了解不同的引入方法,请参阅 数据引入概述。
选择适当的流式引入类型
支持两种流式引入类型:
| 引入类型 |
说明 |
| 数据连接 |
事件中心、IoT 中心和事件网格数据连接可以使用流式引入,前提是它在群集级别启用。 根据目标表上配置的流式引入策略,决定使用流式引入。 有关管理数据连接的信息,请参阅事件中心、IoT 中心和事件网格。 |
|
自定义导入 |
自定义引入要求编写使用其中一个 Azure 数据资源管理器 客户端库的应用程序。 使用本主题中的信息配置自定义数据导入。 还可以发现 C# 流式引入示例应用程序 很有用。 |
使用下表可帮助你选择适合你的环境的引入类型:
| 标准 |
数据连接 |
自定义导入 |
| 引入启动与可用于查询的数据之间的数据延迟 |
更长的延迟 |
更短的延迟 |
| 开发成本 |
快速而简单的设置,无需开发开销 |
建立应用程序以引入数据、确保数据一致性和处理错误需要高昂的开发开销 |
注释
可以使用 Azure 门户或在 C# 中以编程方式管理在群集上 启用 和 禁用 流式引入的过程。 如果对 自定义应用程序使用 C#,则可能发现使用编程方法更方便。
先决条件
影响流式引入的主要参与者包括:
-
VM 和群集大小:随着 VM 和群集大小的增加,流式引入的性能和容量会扩大。 并发引入请求数限制为每个核心 6 个。 例如,对于 16 个核心 SKU(如 D14 和 L16),支持的最大负载为 96 个并发引入请求。 对于两个核心 SKU(例如 D11),支持的最大负载是 12 个并发引入请求。
-
数据大小限制:流式引入请求的数据大小限制为 4 MB。 这包括在引入期间为更新策略创建的任何数据。
-
架构更新:流式引入服务可能需要长达五分钟才能进行架构更新,例如创建和修改表和引入映射。 有关详细信息,请参阅 流式引入和架构更改。
-
SSD 容量:在群集上启用流式引入,即使数据未通过流式引入,也会使用群集计算机的本地 SSD 磁盘的一部分进行流式引入数据,并减少可用于热缓存的存储。
在群集上启用流式引入
在使用流式引入之前,必须在群集上启用该功能并定义 流式引入策略。 可以在 创建群集时启用该功能,或 将其添加到现有群集。
创建新群集时启用流式引入
可以在使用 Azure 门户或 C# 以编程方式创建新群集时启用流式引入。
若要在创建新的 Azure 数据资源管理器群集时启用流式引入,请运行以下代码:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
string location = "<location>";
string skuName = "<skuName>";
string tier = "<tier>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var cluster = new Cluster(location, new AzureSku(skuName, tier), enableStreamingIngest:true);
await kustoManagementClient.Clusters.CreateOrUpdateAsync(resourceGroupName, clusterName, cluster);
}
}
}
在现有群集上启用流式引入
如果有现有群集,则可以使用 Azure 门户或以编程方式在 C# 中启用流式引入。
在 Azure 门户中,转到 Azure 数据资源管理器群集。
在 “设置”中,选择“ 配置”。
在 “配置 ”窗格中,选择 “打开 ”以启用 流式引入。
选择 保存。
创建新的 Azure 数据资源管理器群集时,可以启用流式引入。
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
创建目标表并定义策略
创建一个表来接收流式引入数据,并使用 Azure 门户或 C# 以编程方式定义其相关策略。
在 Azure 门户中,导航到群集。
选择 查询。
若要创建将通过流式引入接收数据的表,请将以下命令复制到 “查询”窗格中 ,然后选择“ 运行”。
.create table TestTable (TimeStamp: datetime, Name: string, Metric: int, Source:string)
将以下命令之一复制到 “查询”窗格中 ,然后选择“ 运行”。 这会定义对创建的表或包含表的数据库的 流式引入策略 。
小提示
在数据库级别定义的策略适用于数据库中的所有现有表和将来表。 在数据库级别启用策略时,无需为每个表启用该策略。
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
创建流式引入应用程序以将数据引入群集
使用首选语言创建应用程序以将数据引入群集。
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
from azure.kusto.data import KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import IngestionProperties, KustoStreamingIngestClient
clusterPath = "https://<clusterName>.<region>.kusto.windows.net"
appId = "<appId>"
appKey = "<appKey>"
appTenant = "<appTenant>"
dbName = "<dbName>"
tableName = "<tableName>"
csb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
clusterPath,
appId,
appKey,
appTenant
)
client = KustoStreamingIngestClient(csb)
ingestionProperties = IngestionProperties(
database=dbName,
table=tableName,
data_format=DataFormat.CSV
)
# Ingest from file
# Automatically detects gz format
client.ingest_from_file("MyFile.gz", ingestion_properties=ingestionProperties)
// Load modules using ES6 import statements:
import { DataFormat, IngestionProperties, StreamingIngestClient } from "azure-kusto-ingest";
import { KustoConnectionStringBuilder } from "azure-kusto-data";
// For earlier version, load modules using require statements:
// const IngestionProperties = require("azure-kusto-ingest").IngestionProperties;
// const KustoConnectionStringBuilder = require("azure-kusto-data").KustoConnectionStringBuilder;
// const {DataFormat} = require("azure-kusto-ingest").IngestionPropertiesEnums;
// const StreamingIngestClient = require("azure-kusto-ingest").StreamingIngestClient;
const clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
const appId = "<appId>";
const appKey = "<appKey>";
const appTenant = "<appTenant>";
const dbName = "<dbName>";
const tableName = "<tableName>";
const mappingName = "<mappingName>"; // Required for JSON formatted files
const ingestionProperties = new IngestionProperties({
database: dbName, // Your database
table: tableName, // Your table
format: DataFormat.JSON,
ingestionMappingReference: mappingName
});
// Initialize client with engine endpoint
const client = new StreamingIngestClient(
KustoConnectionStringBuilder.withAadApplicationKeyAuthentication(
clusterPath,
appId,
appKey,
appTenant
),
ingestionProperties
);
// Automatically detects gz format
await client.ingestFromFile("MyFile.gz", ingestionProperties);
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
在集群上禁用流式引入
在 Azure 数据资源管理器群集上禁用流式引入之前,请从所有相关表和数据库中删除 流式引入策略 。 删除流式引入策略会触发 Azure 数据资源管理器群集内的数据重新排列。 流式引入数据从初始存储移动到列存储(也称为数据分段或分片)中的永久存储。 此过程可能需要几秒钟到几个小时,具体取决于初始存储中的数据量。
删除流式引入策略
可以使用 Azure 门户或 C# 以编程方式删除流式引入策略。
在 Azure 门户中,转到 Azure 数据资源管理器群集并选择 “查询”。
若要从表中删除流式引入策略,请将以下命令复制到 “查询”窗格中 ,然后选择“ 运行”。
.delete table TestTable policy streamingingestion
在 “设置”中,选择“ 配置”。
在 “配置 ”窗格中,选择“ 关闭 ”以禁用 流式引入。
选择 保存。
若要从表中删除流式引入策略,请运行以下代码:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
若要在群集上禁用流式引入,请运行以下代码:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
局限性
- 必须预先创建数据映射,以便在流式引入中使用。 单个流式引入请求不能容纳内联数据映射。
- 无法对流式引入数据设置范围标签。
-
更新策略。 更新策略只能引用源表中新引入的数据,而不能引用数据库中的任何其他数据或表。
- 如果在作为从属数据库领导群集上启用了流式引入,那么在以下群集上也必须启用流式引入,以便处理流式引入数据。 无论群集数据是否通过 Data Share 共享,情况都是一样的。
后续步骤