Udostępnij przez


Tworzenie połączenia danych usługi Event Hubs dla usługi Azure Synapse Data Explorer przy użyciu języka C# (wersja zapoznawcza)

Ważne

Eksplorator danych usługi Azure Synapse Analytics (wersja zapoznawcza) zostanie wycofany 7 października 2025 r. Po tej dacie obciążenia uruchomione w usłudze Synapse Data Explorer zostaną usunięte, a skojarzone dane aplikacji zostaną utracone. Zdecydowanie zalecamy migrację do usługi Eventhouse w usłudze Microsoft Fabric.

Program Microsoft Cloud Migration Factory (CMF) ma na celu pomoc klientom w migracji do sieci szkieletowej. Program oferuje praktyczne zasoby klawiaturowe bez ponoszenia kosztów dla klienta. Te zasoby są przypisywane przez okres 6–8 tygodni ze wstępnie zdefiniowanym i uzgodnionym zakresem. Nominacje klientów są akceptowane przez zespół ds. kont Microsoft lub bezpośrednio, przesyłając wniosek o pomoc zespołowi CMF.

Usługa Azure Synapse Data Explorer to szybka i wysoce skalowalna usługa eksploracji danych na potrzeby danych dzienników i danych telemetrycznych. Usługa Azure Synapse Data Explorer oferuje pozyskiwanie danych (ładowanie danych) z usług Event Hubs, IoT Hubs oraz blobów zapisanych w kontenerach.

W tym artykule utworzysz połączenie danych usługi Event Hubs dla usługi Azure Synapse Data Explorer przy użyciu języka C#.

Wymagania wstępne

  • Subskrypcja platformy Azure. Utwórz bezpłatne konto platformy Azure.

  • Utwórz pulę Eksploratora Danych, korzystając z programu Synapse Studio lub portalu Azure

  • Utwórz bazę danych eksploratora danych.

    1. W programie Synapse Studio w okienku po lewej stronie wybierz pozycję Dane.

    2. Wybierz + (Dodaj nowy zasób) >pulę Eksploratora danych i skorzystaj z poniższych informacji:

      Setting Sugerowana wartość Description
      Nazwa puli contosodataexplorer Nazwa puli Eksploratora danych, której użyć
      Name TestDatabase Nazwa bazy danych musi być unikatowa w obrębie klastra.
      Domyślny okres przechowywania 365 Przedział czasu (w dniach), w którym gwarantowana jest dostępność danych dla zapytania. Przedział czasu jest mierzony od momentu pozyskania danych.
      Domyślny okres pamięci podręcznej 31 Przedział czasu (w dniach), w którym często używane w zapytaniach dane mają być dostępne na dysku SSD lub w pamięci RAM zamiast w magazynie długoterminowym.
    3. Wybierz pozycję Utwórz, aby utworzyć bazę danych. Tworzenie zazwyczaj zajmuje mniej niż minutę.

Uwaga / Notatka

Aportowanie danych z Event Hub do zasobników Eksploratora Danych nie będzie działać, jeśli przestrzeń robocza Synapse używa zarządzanej sieci wirtualnej z włączoną ochroną przed eksfiltracją danych.

Tworzenie tabeli w klastrze testowym

Utwórz tabelę o nazwie StormEvents, która będzie zgodna ze schematem danych w pliku StormEvents.csv.

Wskazówka

Poniższe fragmenty kodu tworzą instancję klienta dla prawie każdego wywołania. Należy to zrobić, aby każdy fragment kodu był uruchamiany indywidualnie. W środowisku produkcyjnym wystąpienia klienta są ponowne i powinny być przechowywane tak długo, jak to konieczne. Pojedyncze wystąpienie klienta na URI wystarcza, nawet podczas pracy z wieloma bazami danych (bazę danych można określić na poziomie komendy).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Zdefiniuj mapowanie przyjmowania

Zamapuj przychodzące dane CSV na nazwy kolumn używane podczas tworzenia tabeli. Utwórz obiekt mapowania kolumn CSV w tej tabeli.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Instalowanie narzędzia NuGet w języku C#

Authentication

Aby uruchomić poniższy przykład, potrzebujesz aplikacji Microsoft Entra i głównego użytkownika usługi, który może uzyskiwać dostęp do zasobów. Aby utworzyć bezpłatną aplikację Microsoft Entra i dodać przypisanie roli na poziomie subskrypcji, zobacz Utworzyć aplikację Microsoft Entra. Potrzebny jest również identyfikator katalogu (dzierżawy), identyfikator aplikacji i sekret klienta.

Dodaj połączenie danych usługi Event Hubs

W poniższym przykładzie pokazano, jak programowo dodać połączenie danych usługi Event Hubs. Aby uzyskać informacje na temat dodawania połączenia danych usługi Event Hubs przy użyciu witryny Azure Portal, zobacz nawiązywanie połączenia z usługą Event Hubs .

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Ustawienie Sugerowana wartość Opis pola
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Identyfikator najemcy. Znany również jako identyfikator katalogu.
Identyfikator subskrypcji xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Identyfikator subskrypcji używany do tworzenia zasobów.
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Identyfikator klienta aplikacji, która może uzyskiwać dostęp do zasobów w Twoim dzierżawcy.
clientSecret xxxxxxxxxxxxxxxx Klucz tajny klienta aplikacji, który może uzyskiwać dostęp do zasobów w dzierżawie.
resourceGroupName testrg Nazwa grupy zasobów zawierającej klaster.
nazwa klastra mykustocluster Nazwa klastra.
databaseName mykustodatabase Nazwa docelowej bazy danych w klastrze.
nazwaPołączeniaDanych myeventhubconnect Żądana nazwa połączenia danych.
tableName StormEvents Nazwa tabeli docelowej w docelowej bazie danych.
NazwaRegułyMapowania Mapowanie_CSV_Zdarzeń_Burzowych Nazwa mapowania kolumn, które odnosi się do tabeli docelowej.
Format danych csv Format danych wiadomości.
eventHubResourceId identyfikator zasobu Identyfikator zasobu centrum zdarzeń, które przechowuje dane do przetwarzania.
grupa konsumencka $Default Grupa odbiorców twojego centrum zdarzeń.
lokalizacja Środkowe stany USA Lokalizacja zasobu połączenia danych.
kompresja Gzip lub Brak Typ kompresji danych.

Generuj dane

Zobacz przykładową aplikację , która generuje dane i wysyła je do centrum zdarzeń.

Zdarzenie może zawierać jeden lub więcej rekordów, do osiągnięcia swojego limitu rozmiaru. W poniższym przykładzie wysyłamy dwa zdarzenia, z których każdy ma pięć dołączonych rekordów:

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

Uprzątnij zasoby

Aby usunąć połączenie danych, użyj następującego polecenia:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

Dalsze kroki