Streamingopname is handig voor het laden van gegevens wanneer u een lage latentie nodig hebt tussen opname en query. Overweeg het gebruik van streamingingestie in de volgende scenario's:
- Latentie van minder dan een seconde is vereist.
- Om de operationele verwerking van veel tabellen te optimaliseren waarbij de gegevensstroom in elke tabel relatief klein is (een paar records per seconde), maar het totale gegevensopnamevolume hoog is (duizenden records per seconde).
Als de gegevensstroom in elke tabel hoog is (meer dan 4 GB per uur), kunt u overwegen batchopname te gebruiken.
Zie het overzicht van gegevensopname voor meer informatie over verschillende opnamemethoden.
Het juiste type streaminggegevensverwerking kiezen
Er worden twee typen streamingopname ondersteund:
| Opnametype |
Beschrijving |
| Gegevensverbinding |
Event Hub-, IoT Hub- en Event Grid-gegevensverbindingen kunnen streamingopname gebruiken, mits deze is ingeschakeld op clusterniveau. De beslissing om streamingopname te gebruiken, wordt uitgevoerd volgens het streamingopnamebeleid dat is geconfigureerd in de doeltabel. Zie Event Hub, IoT Hub en Event Grid voor informatie over het beheren van gegevensverbindingen. |
|
Aangepaste ingestie |
Voor aangepaste gegevensinname moet u een toepassing schrijven die gebruikmaakt van een van de Azure Data Explorer-clientbibliotheken. Gebruik de informatie in dit onderwerp om aangepaste invoer te configureren. U kunt ook de C#-voorbeeldtoepassing voor streamingopname nuttig vinden. |
Gebruik de volgende tabel, die kan helpen het opnametype te kiezen dat geschikt is voor uw omgeving.
| Criterium |
Gegevensverbinding |
Aangepaste gegevensinvoer |
| Gegevensvertraging tussen het begin van gegevensinname en de beschikbaarheid van de gegevens voor bevraging |
Langere vertraging |
Kortere vertraging |
| Overhead voor ontwikkeling |
Snelle en eenvoudige installatie, geen ontwikkelingsoverhead |
Grote ontwikkelingsmoeite om een applicatie te ontwikkelen die de gegevens verwerkt, het verwerken van fouten en het garanderen van gegevensconsistentie. |
Opmerking
U kunt het proces beheren voor het in- en uitschakelen van streamingopname op uw cluster met behulp van Azure Portal of programmatisch in C#. Als u C# gebruikt voor uw aangepaste toepassing, is het wellicht handiger om de programmatische benadering te gebruiken.
Vereiste voorwaarden
De belangrijkste bijdragers die van invloed kunnen zijn op streamingverwerking zijn:
-
VM- en clustergrootte: de prestaties en capaciteit van streamingopnamen worden geschaald met verhoogde VM- en clustergrootten. Het aantal gelijktijdige opnameaanvragen is beperkt tot zes per kern. Voor bijvoorbeeld 16 kern-SKU's, zoals D14 en L16, is de maximale ondersteunde belasting 96 gelijktijdige opnameaanvragen. Voor twee kern-SKU's, zoals D11, is de maximale ondersteunde belasting 12 gelijktijdige opnameaanvragen.
-
Gegevensgroottelimiet: de gegevensgroottelimiet voor een aanvraag voor streamingopname is 4 MB. Dit omvat alle gegevens die zijn gemaakt voor het updatebeleid tijdens opname.
-
Schema-updates: Schema-updates, zoals het maken en wijzigen van tabellen en aanpassingen in de invoermapping, kunnen tot vijf minuten duren voordat ze in de streaming-ingestieservice zijn verwerkt. Zie Streaming-opname en schemawijzigingen voor meer informatie.
-
SSD-capaciteit: het inschakelen van streamingopname op een cluster, zelfs wanneer gegevens niet worden opgenomen via streaming, maakt gebruik van een deel van de lokale SSD-schijf van de clustercomputers voor het streamen van opnamegegevens en vermindert de opslag die beschikbaar is voor hot cache.
Streamingopname inschakelen op uw cluster
Voordat u streamingopname kunt gebruiken, moet u de mogelijkheid op uw cluster inschakelen en een beleid voor streamingopname definiëren. U kunt de mogelijkheid inschakelen bij het maken van het cluster of deze toevoegen aan een bestaand cluster.
Waarschuwing
Bekijk de beperkingen voordat u streaming-ingestie inschakelt.
Streamingopname inschakelen tijdens het maken van een nieuw cluster
U kunt streamingopname inschakelen tijdens het maken van een nieuw cluster met behulp van Azure Portal of programmatisch in C#.
Voer de volgende code uit om streamingopname in te schakelen tijdens het maken van een nieuw Azure Data Explorer-cluster:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
string location = "<location>";
string skuName = "<skuName>";
string tier = "<tier>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var cluster = new Cluster(location, new AzureSku(skuName, tier), enableStreamingIngest:true);
await kustoManagementClient.Clusters.CreateOrUpdateAsync(resourceGroupName, clusterName, cluster);
}
}
}
Streamingopname inschakelen op een bestaand cluster
Als u een bestaand cluster hebt, kunt u streamingopname inschakelen via Azure Portal of programmatisch in C#.
Ga in Azure Portal naar uw Azure Data Explorer-cluster.
Selecteer Configuraties in Instellingen.
Selecteer in het deelvenster Configuraties de optie Aan om streamingopname in te schakelen.
Selecteer Opslaan.
U kunt streamingopname inschakelen tijdens het maken van een nieuw Azure Data Explorer-cluster.
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Een doeltabel maken en het beleid definiëren
Maak een tabel om de streamingopnamegegevens te ontvangen en definieer het bijbehorende beleid met behulp van Azure Portal of programmatisch in C#.
Navigeer in Azure Portal naar uw cluster.
Selecteer Query.
Als u de tabel wilt maken die de gegevens ontvangt via streamingopname, kopieert u de volgende opdracht naar het deelvenster Query en selecteert u Uitvoeren.
.create table TestTable (TimeStamp: datetime, Name: string, Metric: int, Source:string)
Kopieer een van de volgende opdrachten naar het deelvenster Query en selecteer Uitvoeren. Hiermee definieert u het streamingopnamebeleid voor de tabel die u hebt gemaakt of in de database die de tabel bevat.
Aanbeveling
Een beleid dat op databaseniveau is gedefinieerd, is van toepassing op alle bestaande en toekomstige tabellen in de database. Wanneer u het beleid op databaseniveau inschakelt, hoeft u het niet per tabel in te schakelen.
Als u het beleid wilt definiëren voor de tabel die u hebt gemaakt, gebruikt u:
.alter table TestTable policy streamingingestion enable
Als u het beleid wilt definiëren voor de database met de tabel die u hebt gemaakt, gebruikt u:
.alter database StreamingTestDb policy streamingingestion enable
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Een streamingopnametoepassing maken om gegevens op te nemen in uw cluster
Maak uw toepassing voor het opnemen van gegevens in uw cluster met behulp van uw voorkeurstaal.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
from azure.kusto.data import KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import IngestionProperties, KustoStreamingIngestClient
clusterPath = "https://<clusterName>.<region>.kusto.windows.net"
appId = "<appId>"
appKey = "<appKey>"
appTenant = "<appTenant>"
dbName = "<dbName>"
tableName = "<tableName>"
csb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
clusterPath,
appId,
appKey,
appTenant
)
client = KustoStreamingIngestClient(csb)
ingestionProperties = IngestionProperties(
database=dbName,
table=tableName,
data_format=DataFormat.CSV
)
# Ingest from file
# Automatically detects gz format
client.ingest_from_file("MyFile.gz", ingestion_properties=ingestionProperties)
// Load modules using ES6 import statements:
import { DataFormat, IngestionProperties, StreamingIngestClient } from "azure-kusto-ingest";
import { KustoConnectionStringBuilder } from "azure-kusto-data";
// For earlier version, load modules using require statements:
// const IngestionProperties = require("azure-kusto-ingest").IngestionProperties;
// const KustoConnectionStringBuilder = require("azure-kusto-data").KustoConnectionStringBuilder;
// const {DataFormat} = require("azure-kusto-ingest").IngestionPropertiesEnums;
// const StreamingIngestClient = require("azure-kusto-ingest").StreamingIngestClient;
const clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
const appId = "<appId>";
const appKey = "<appKey>";
const appTenant = "<appTenant>";
const dbName = "<dbName>";
const tableName = "<tableName>";
const mappingName = "<mappingName>"; // Required for JSON formatted files
const ingestionProperties = new IngestionProperties({
database: dbName, // Your database
table: tableName, // Your table
format: DataFormat.JSON,
ingestionMappingReference: mappingName
});
// Initialize client with engine endpoint
const client = new StreamingIngestClient(
KustoConnectionStringBuilder.withAadApplicationKeyAuthentication(
clusterPath,
appId,
appKey,
appTenant
),
ingestionProperties
);
// Automatically detects gz format
await client.ingestFromFile("MyFile.gz", ingestionProperties);
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Streamingopname op uw cluster uitschakelen
Waarschuwing
Het uitschakelen van streaming-ingestie kan enkele uren duren.
Voordat u streamingopname uitschakelt op uw Azure Data Explorer-cluster, verwijdert u het streamingopnamebeleid uit alle relevante tabellen en databases. Het verwijderen van het streamingopnamebeleid activeert gegevensherschikking in uw Azure Data Explorer-cluster. De streaming-gegevens worden verplaatst van de initiële opslag naar permanente opslag in de kolomopslag (uitsneden of shards). Dit proces kan enkele seconden tot enkele uren duren, afhankelijk van de hoeveelheid gegevens in de eerste opslag.
Het streamingopnamebeleid verwijderen
U kunt het streamingopnamebeleid verwijderen met behulp van Azure Portal of programmatisch in C#.
Ga in Azure Portal naar uw Azure Data Explorer-cluster en selecteer Query.
Als u het streamingopnamebeleid uit de tabel wilt verwijderen, kopieert u de volgende opdracht naar het deelvenster Query en selecteert u Uitvoeren.
.delete table TestTable policy streamingingestion
Selecteer Configuraties in Instellingen.
Selecteer in het deelvenster Configuraties de optie Uit om streamingopname uit te schakelen.
Selecteer Opslaan.
Voer de volgende code uit om de beleidsregels voor streaminginvoer uit de tabel te verwijderen.
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Voer de volgende code uit om streamingopname op uw cluster uit te schakelen:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Beperkingen
-
Gegevenstoewijzingen moeten vooraf worden gemaakt voor gebruik in streaming ingestie. Afzonderlijke verzoeken voor streaminggegevensopname ondersteunen geen inline-gegevensmappingen.
-
Uitbreidingstags kunnen niet worden ingesteld voor de streamingopnamegegevens.
-
Beleid bijwerken. Het updatebeleid kan alleen verwijzen naar de zojuist opgenomen gegevens in de brontabel en niet naar andere gegevens of tabellen in de database.
- Als streamingopname is ingeschakeld voor een cluster dat wordt gebruikt als leider voor volgdatabases, moet streamingopname zijn ingeschakeld op de volgende clusters om streamingopnamegegevens te volgen. Hetzelfde geldt of de clustergegevens worden gedeeld via Data Share.
Volgende stappen