Freigeben über


Erstellen einer Event-Hub-Datenverbindung für Azure Synapse Data Explorer mit Python (Vorschau)

Von Bedeutung

Azure Synapse Analytics Data Explorer (Vorschau) wird am 7. Oktober 2025 eingestellt. Nach diesem Datum werden Arbeitslasten, die im Synapse-Daten-Explorer ausgeführt werden, gelöscht, und die zugehörigen Anwendungsdaten gehen verloren. Es wird dringend empfohlen , zu Eventhouse in Microsoft Fabric zu migrieren.

Das Microsoft Cloud Migration Factory (CMF)-Programm wurde entwickelt, um Kunden bei der Migration zu Fabric zu unterstützen. Das Programm bietet dem Kunden kostenlos praktische Tastaturressourcen. Diese Ressourcen werden für einen Zeitraum von 6-8 Wochen mit einem vordefinierten und vereinbarten Umfang zugewiesen. Kunden nominierungen werden vom Microsoft-Kontoteam oder direkt durch Senden einer Anfrage zur Hilfe an das CMF-Team akzeptiert.

Azure Synapse Data Explorer ist ein schneller und hochgradig skalierbarer Dienst zur Untersuchung von Protokoll- und Telemetriedaten. Azure Synapse-Daten-Explorer bietet die Erfassung von Daten aus Event Hubs, IoT Hubs und Blobs, die in Blobcontainer geschrieben werden.

In diesem Artikel erstellen Sie eine Event-Hub-Datenverbindung für Azure Synapse Data Explorer mithilfe von Python.

Voraussetzungen

  • Ein Azure-Abonnement. Erstellen Sie ein kostenloses Azure-Konto.

  • Erstellen eines Data Explorer-Pools über Synapse Studio oder im Azure-Portal

  • Erstellen Sie eine Data Explorer-Datenbank.

    1. Wählen Sie in Synapse Studio im linken Bereich Daten aus.

    2. Wählen Sie + (Neue Ressource hinzufügen) >Data Explorer-Pool aus, und verwenden Sie die folgenden Informationen:

      Setting Vorgeschlagener Wert Description
      Poolname contosodataexplorer Name des zu verwendende Data Explorer-Pools
      Name TestDatabase Der Datenbankname muss innerhalb des Clusters eindeutig sein.
      Standardaufbewahrungszeitraum 365 Die Zeitspanne (in Tagen), für die garantiert wird, dass die Daten für Abfragen verfügbar bleiben. Die Zeitspanne wird ab dem Zeitpunkt gemessen, zu dem die Daten erfasst werden.
      Standardcachezeitraum 31 Die Zeitspanne (in Tagen), wie lange häufig abgefragte Daten im SSD-Speicher oder RAM (und nicht im längerfristigen Speicher) verfügbar bleiben.
    3. Wählen Sie Erstellen, um die Datenbank zu erstellen. Die Erstellung dauert in der Regel weniger als eine Minute.

Erstellen einer Tabelle im Testcluster

Erstellen Sie eine Tabelle mit dem Namen StormEvents, die dem Schema der Daten in der Datei StormEvents.csv entspricht.

Tipp

Die folgenden Codeausschnitte erstellen eine Instanz eines Clients für fast jeden Aufruf. Dies wird getan, damit jeder Ausschnitt einzeln ausführbar ist. In der Produktionsumgebung sind die Clientinstanzen wiedereintrittsfähig und sollten so lange wie nötig aufbewahrt werden. Eine einzelne Clientinstanz pro URI ist ausreichend, auch wenn Sie mit mehreren Datenbanken arbeiten (Datenbanken können auf Befehlsebene angegeben werden).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definieren der Erfassungszuordnung

Ordnen Sie die eingehenden CSV-Daten den beim Erstellen der Tabelle verwendeten Spaltennamen zu. Stellen Sie ein Objekt für die CSV-Spaltenzuordnung in dieser Tabelle bereit.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Installieren des Python-Pakets

Um das Python-Paket für Azure Synapse Data Explorer zu installieren, öffnen Sie eine Eingabeaufforderung, deren Pfad Python enthält. Führen Sie den folgenden Befehl aus:

pip install azure-common
pip install azure-mgmt-kusto

Authentifizierung

Um das folgende Beispiel ausführen zu können, benötigen Sie eine Microsoft Entra-Anwendung und einen Dienstprinzipal, der auf Ressourcen zugreifen kann. Informationen zum Erstellen einer kostenlosen Microsoft Entra-Anwendung und Hinzufügen einer Rollenzuweisung auf Abonnementebene finden Sie unter Erstellen einer Microsoft Entra-Anwendung. Außerdem benötigen Sie die Verzeichnis-ID (Mandanten-ID), die Anwendungs-ID und den geheimen Clientschlüssel.

Hinzufügen einer Event Hub-Datenverbindung

Im folgenden Beispiel wird gezeigt, wie eine Event Hub-Datenverbindung programmgesteuert hinzugefügt wird. Weitere Informationen zum Hinzufügen einer Event Hub-Datenverbindung mit dem Azure-Portal finden Sie unter Herstellen einer Verbindung mit dem Event Hub.

from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials

#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
        client_id=client_id,
        secret=client_secret,
        tenant=tenant_id
    )
kusto_management_client = KustoManagementClient(credentials, subscription_id)

resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                        parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
                                                                            table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Einstellung Empfohlener Wert Feldbeschreibung
Mieter_ID xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Ihre Mandanten-ID. Wird auch als Verzeichnis-ID bezeichnet.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Die Abonnement-ID, die Sie für die Ressourcenerstellung verwenden.
Kunden-ID xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Die Client-ID der Anwendung, die auf Ressourcen in Ihrem Mandanten zugreifen kann.
client_secret xxxxxxxxxxxx Das Clientgeheimnis der Anwendung, die auf Ressourcen in Ihrem Mandanten zugreifen kann.
Ressourcengruppenname testrg Der Name der Ressourcengruppe, die Ihren Cluster enthält.
cluster_name mykustocluster Der Name Ihres Clusters.
Datenbankname mykustodatabase Der Name der Zieldatenbank in Ihrem Cluster.
Datenverbindungsname myeventhubconnect Der gewünschte Name Ihrer Datenverbindung.
Tabellenname StormEvents Der Name der Zieltabelle in der Zieldatenbank.
Zuordnungsregel_Name StormEvents_CSV_Mapping Der Name der Spaltenzuordnung, die mit der Zieltabelle verknüpft ist.
Datenformat CSV Das Datenformat der Nachricht.
Event-Hub-Resource-ID Ressourcen-ID Die Ressourcen-ID Ihres Event Hubs mit den Daten für die Erfassung.
Verbrauchergruppe $Default Die Consumergruppe Ihres Event Hubs.
location Zentral-USA Der Speicherort der Datenverbindungsressource.

Bereinigen von Ressourcen

Um die Datenverbindung zu löschen, verwenden Sie den folgenden Befehl:

kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)

Nächste Schritte