Przechwytywanie danych strumieniowych jest przydatne do ładowania danych, gdy potrzebujesz niskiego opóźnienia pomiędzy przyjmowaniem danych a ich zapytaniem. Rozważ użycie przesyłania danych strumieniowych w następujących scenariuszach:
- Wymagane jest opóźnienie mniejsze niż sekundę.
- Aby zoptymalizować przetwarzanie operacyjne wielu tabel, w których strumień danych do każdej tabeli jest stosunkowo mały (kilka rekordów na sekundę), ale ogólny wolumin pozyskiwania danych jest wysoki (tysiące rekordów na sekundę).
Jeśli strumień danych do każdej tabeli jest wysoki (ponad 4 GB na godzinę), rozważ użycie wsadowego wczytywania danych.
Aby dowiedzieć się więcej na temat różnych metod pozyskiwania danych, zobacz Omówienie pozyskiwania danych.
Wybierz odpowiedni rodzaj pobierania danych strumieniowych
Obsługiwane są dwa rodzaje przesyłania strumieniowego.
| Typ pozyskiwania |
Opis |
| Połączenie danych |
Połączenia danych dla usługi Event Hub, IoT Hub i Event Grid mogą korzystać z przesyłania strumieniowego, pod warunkiem że jest ono włączone na poziomie klastra. Decyzja o zastosowaniu strumieniowego pobierania danych jest podejmowana zgodnie z polityką strumieniowego pobierania danych skonfigurowaną w tabeli docelowej. Aby uzyskać informacje na temat zarządzania połączeniami danych, zobacz Event Hub, IoT Hub i Event Grid. |
|
Niestandardowe przetwarzanie |
Niestandardowe wprowadzanie wymaga napisania aplikacji korzystającej z jednej z bibliotek klienckich usługi Azure Data Explorer. Skorzystaj z informacji w tym temacie, aby skonfigurować niestandardowe pozyskiwanie. Możesz również uznać, że przykładowa aplikacja do pozyskiwania danych przesyłanych strumieniowo w języku C# jest przydatna. |
Użyj poniższej tabeli, aby ułatwić wybór typu pozyskiwania danych, który jest odpowiedni dla twojego środowiska.
| Kryterium |
Połączenie danych |
Niestandardowe importowanie |
| Opóźnienie danych między inicjowaniem pozyskiwania i danymi dostępnymi dla zapytania |
Dłuższe opóźnienie |
Krótsze opóźnienie |
| Obciążenie związane z programowaniem |
Szybka i łatwa konfiguracja, brak obciążeń programistycznych |
Duże obciążenie programistyczne w celu utworzenia aplikacji pozyskiwania danych, obsługi błędów i zapewnienia spójności danych |
Uwaga
Możesz zarządzać procesem włączania i wyłączania strumieniowego przesyłania danych w klastrze, przy użyciu portalu Azure lub programowo w C#. Jeśli używasz języka C# do tworzenia aplikacji niestandardowej, możesz uznać korzystanie z podejścia programowego za bardziej wygodne.
Wymagania wstępne
Główne czynniki, które mogą wpływać na proces przesyłania strumieniowego, to:
-
Rozmiar maszyny wirtualnej i klastra: Wydajność i pojemność przesyłania strumieniowego skalują się wraz ze zwiększeniem rozmiarów maszyn wirtualnych i klastrów. Liczba współbieżnych żądań przesyłania jest ograniczona do sześciu na rdzeń. Na przykład w przypadku 16-rdzeniowych jednostek SKU, takich jak D14 i L16, maksymalne obsługiwane obciążenie to 96 równoczesnych żądań przyjmowania. W przypadku dwóch podstawowych jednostek SKU, takich jak D11, maksymalne obsługiwane obciążenie to 12 współbieżnych żądań przesyłania danych.
-
Limit rozmiaru danych: limit rozmiaru danych dla żądania strumieniowego przesyłania danych wynosi 4 MB. Obejmuje to wszystkie dane utworzone dla zasad aktualizowania w czasie pozyskiwania.
-
Aktualizacje schematu: aktualizacje schematu, takie jak tworzenie i modyfikowanie tabel oraz mapowania danych wejściowych, mogą potrwać do pięciu minut dla usługi transmisji strumieniowej. Aby uzyskać więcej informacji, zobacz Przesyłanie strumieniowe i zmiany w schemacie.
-
Pojemność dysków SSD: Włączenie strumieniowego przesyłania danych na klastrze, nawet gdy dane nie są przesyłane strumieniowo, wykorzystuje część lokalnego dysku SSD maszyn klastra na dane przesyłania strumieniowego i zmniejsza dostępne miejsce dla gorącej pamięci podręcznej.
Włącz strumieniowe gromadzenie danych na klastrze
Aby można było użyć pozyskiwania danych strumieniowych, należy włączyć funkcję w klastrze i zdefiniować zasady pozyskiwania danych strumieniowych. Funkcję można włączyć podczas tworzenia klastra lub dodać do istniejącego klastra.
Ostrzeżenie
Zapoznaj się z ograniczeniami przed włączeniem strumieniowego przesyłania.
Włącz ingestowanie strumieniowe podczas tworzenia nowego klastra
Strumieniowe pozyskiwanie danych można włączyć podczas tworzenia nowego klastra w portalu Azure lub programistycznie w C#.
Aby włączyć przyjmowanie danych przesyłanych strumieniowo podczas tworzenia nowego klastra Azure Data Explorer, uruchom następujący kod:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
string location = "<location>";
string skuName = "<skuName>";
string tier = "<tier>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var cluster = new Cluster(location, new AzureSku(skuName, tier), enableStreamingIngest:true);
await kustoManagementClient.Clusters.CreateOrUpdateAsync(resourceGroupName, clusterName, cluster);
}
}
}
Włącz odbiór danych przesyłanych strumieniowo w istniejącym klastrze
Jeśli masz istniejący klaster, możesz włączyć przesyłanie strumieniowe przy użyciu portalu Azure lub programowo w języku C#.
W witrynie Azure Portal przejdź do klastra usługi Azure Data Explorer.
W obszarze Ustawienia wybierz pozycję Konfiguracje.
W okienku Konfiguracje wybierz pozycję Włączone, aby włączyć strumieniowe pozyskiwanie danych.
Wybierz Zapisz.
Ingestowanie strumieniowe można włączyć podczas tworzenia nowego klastra usługi Azure Data Explorer.
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Tworzenie tabeli docelowej i definiowanie zasad
Utwórz tabelę do odbierania danych z przesyłania strumieniowego i zdefiniuj powiązane z nią zasady przy użyciu portalu Azure lub programowo w języku C#.
W witrynie Azure Portal przejdź do klastra.
Wybierz pozycję Query.
Aby utworzyć tabelę, która będzie odbierać dane za pomocą strumieniowego przesyłania danych, skopiuj następujące polecenie do okienka zapytań i wybierz pozycję Uruchom.
.create table TestTable (TimeStamp: datetime, Name: string, Metric: int, Source:string)
Skopiuj jedno z następujących poleceń do okienka Zapytanie i wybierz pozycję Uruchom. Definiuje politykę strumieniowego pozyskiwania danych w tabeli, którą utworzyłeś, lub w bazie danych zawierającej tę tabelę.
Wskazówka
Zasady zdefiniowane na poziomie bazy danych mają zastosowanie do wszystkich istniejących i przyszłych tabel w bazie danych. Po włączeniu zasad na poziomie bazy danych nie ma potrzeby włączania jej dla każdej tabeli.
Aby zdefiniować zasady w utworzonej tabeli, użyj:
.alter table TestTable policy streamingingestion enable
Aby zdefiniować zasady w bazie danych zawierającej utworzoną tabelę, użyj:
.alter database StreamingTestDb policy streamingingestion enable
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Utwórz aplikację do przesyłania strumieniowego w celu pozyskiwania danych do klastra
Utwórz aplikację na potrzeby pozyskiwania danych do klastra przy użyciu preferowanego języka.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
from azure.kusto.data import KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import IngestionProperties, KustoStreamingIngestClient
clusterPath = "https://<clusterName>.<region>.kusto.windows.net"
appId = "<appId>"
appKey = "<appKey>"
appTenant = "<appTenant>"
dbName = "<dbName>"
tableName = "<tableName>"
csb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
clusterPath,
appId,
appKey,
appTenant
)
client = KustoStreamingIngestClient(csb)
ingestionProperties = IngestionProperties(
database=dbName,
table=tableName,
data_format=DataFormat.CSV
)
# Ingest from file
# Automatically detects gz format
client.ingest_from_file("MyFile.gz", ingestion_properties=ingestionProperties)
// Load modules using ES6 import statements:
import { DataFormat, IngestionProperties, StreamingIngestClient } from "azure-kusto-ingest";
import { KustoConnectionStringBuilder } from "azure-kusto-data";
// For earlier version, load modules using require statements:
// const IngestionProperties = require("azure-kusto-ingest").IngestionProperties;
// const KustoConnectionStringBuilder = require("azure-kusto-data").KustoConnectionStringBuilder;
// const {DataFormat} = require("azure-kusto-ingest").IngestionPropertiesEnums;
// const StreamingIngestClient = require("azure-kusto-ingest").StreamingIngestClient;
const clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
const appId = "<appId>";
const appKey = "<appKey>";
const appTenant = "<appTenant>";
const dbName = "<dbName>";
const tableName = "<tableName>";
const mappingName = "<mappingName>"; // Required for JSON formatted files
const ingestionProperties = new IngestionProperties({
database: dbName, // Your database
table: tableName, // Your table
format: DataFormat.JSON,
ingestionMappingReference: mappingName
});
// Initialize client with engine endpoint
const client = new StreamingIngestClient(
KustoConnectionStringBuilder.withAadApplicationKeyAuthentication(
clusterPath,
appId,
appKey,
appTenant
),
ingestionProperties
);
// Automatically detects gz format
await client.ingestFromFile("MyFile.gz", ingestionProperties);
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Wyłącz pozyskiwanie przesyłania strumieniowego w klastrze
Ostrzeżenie
Wyłączenie pozyskiwania danych strumieniowych może potrwać kilka godzin.
Przed wyłączeniem pozyskiwania danych strumieniowych w klastrze usługi Azure Data Explorer usuń ustawienia zasad pozyskiwania danych strumieniowych ze wszystkich odpowiednich tabel i baz danych. Usunięcie zasad przesyłania danych strumieniowych powoduje przearanżowanie danych w klastrze usługi Azure Data Explorer. Dane przesyłane strumieniowo są przenoszone z początkowego magazynu do trwałego magazynu danych w strukturze kolumnowej (segmenty lub fragmenty). Ten proces może potrwać od kilku sekund do kilku godzin, w zależności od ilości danych w początkowym magazynie.
Anuluj politykę ingestii strumieniowej
Zasady dotyczące pozyskiwania danych strumieniowych można usunąć przy użyciu portalu Azure lub programowo w języku C#.
W witrynie Azure Portal przejdź do klastra usługi Azure Data Explorer i wybierz pozycję Zapytanie.
Aby usunąć politykę przesyłania danych strumieniowych z tabeli, skopiuj następujące polecenie do panelu zapytań i kliknij Uruchom.
.delete table TestTable policy streamingingestion
W obszarze Ustawienia wybierz pozycję Konfiguracje.
W okienku Konfiguracje wybierz pozycję Off, aby wyłączyć przesyłanie strumieniowe.
Wybierz Zapisz.
Aby usunąć zasady przyjmowania danych strumieniowych z tabeli, uruchom następujący kod:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Aby wyłączyć strumieniowe przetwarzanie danych na swoim klastrze, uruchom następujący kod:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Ograniczenia
-
Mapowania danych muszą zostać wstępnie utworzone do użycia w pozyskiwaniu danych przesyłanych strumieniowo. Pojedyncze żądania pozyskiwania danych strumieniowych nie obsługują wbudowanych mapowań danych.
- Nie można ustawić tagów zakresu na danych wprowadzanych strumieniowo.
-
Zaktualizuj zasady. Zasady aktualizacji mogą odwoływać się tylko do nowo pozyskanych danych w tabeli źródłowej, a nie do żadnych innych danych lub tabel w bazie danych.
- Jeśli ingestia strumieniowa jest włączona w klastrze używanym jako lider dla baz danych typu follower, ingestia strumieniowa musi być włączona w następujących klastrach, aby umożliwić śledzenie danych ingestii strumieniowej. To samo dotyczy tego, czy dane klastra są udostępniane za pośrednictwem udziału danych.
Następne kroki