Compartir a través de


Creación de una conexión de datos del centro de eventos en Azure Synapse Data Explorer con Python (versión preliminar)

Importante

El Explorador de datos de Azure Synapse Analytics (versión preliminar) se retirará el 7 de octubre de 2025. Después de esta fecha, se eliminarán las cargas de trabajo que se ejecutan en el Explorador de datos de Synapse y se perderán los datos de la aplicación asociados. Se recomienda migrar a Eventhouse en Microsoft Fabric.

El programa Microsoft Cloud Migration Factory (CMF) está diseñado para ayudar a los clientes a migrar a Fabric. El programa ofrece recursos de teclado prácticos sin costo alguno al cliente. Estos recursos se asignan durante un período de 6 a 8 semanas, con un ámbito predefinido y acordado. Las nominaciones de clientes se aceptan por parte del equipo de la cuenta de Microsoft o directamente enviando una solicitud de ayuda al equipo de CMF.

Azure Synapse Data Explorer es un servicio de exploración de datos rápido y altamente escalable para datos de telemetría y de registro. Data Explorer de Azure Synapse permite ingerir (cargar) datos procedentes de Event Hubs, IoT Hubs y blobs escritos en contenedores de blobs.

En este artículo va a crear una conexión de datos del centro de eventos en Azure Synapse Data Explorer con Python.

Prerrequisitos

  • Una suscripción de Azure. Cree una cuenta de Azure gratuita.

  • Creación de un grupo de Data Explorer mediante Synapse Studio o Azure Portal

  • Cree una base de datos de Data Explorer.

    1. En Synapse Studio, en el panel izquierdo, seleccione Datos.

    2. Seleccione + (Agregar un recurso nuevo) >Grupo de explorador de datos, y use la siguiente información:

      Configuración Valor sugerido Description
      Nombre de la piscina contosodataexplorer Nombre del grupo de Data Explorer que se usará.
      Nombre TestDatabase El nombre de la base de datos debe ser único dentro del clúster.
      Período de retención predeterminado 365 El intervalo de tiempo (en días) para el que se garantiza que los datos se mantengan disponibles para consultarlos. El intervalo de tiempo se mide desde el momento en que se ingieren los datos.
      Período de caché predeterminado 31 El intervalo de tiempo (en días) durante el que los datos consultados con frecuencia se van a mantener disponibles en el almacenamiento SSD o en la RAM, en lugar de en el almacenamiento a largo plazo.
    3. Seleccione Crear para crear la base de datos. La creación normalmente tarda menos de un minuto.

Creación de una tabla en el clúster de prueba

Cree una tabla que denominada StormEvents que coincida con el esquema de los datos del archivo StormEvents.csv.

Sugerencia

Los fragmentos de código siguientes crean una instancia de un cliente para casi todas las llamadas. Esto se hace para que cada fragmento de código se pueda ejecutar de forma individual. En producción, las instancias de cliente son de reentrada y deben mantenerse todo el tiempo que sea necesario. Una única instancia de cliente por URI es suficiente, aunque se trabaje con varias bases de datos (la base de datos se puede especificar en el nivel de comando).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definir el mapeo de ingestión

Asigna los datos de CSV entrantes a los nombres de columna utilizados al crear la tabla. Aprovisione un objeto de asignación de columnas de CSV en esa tabla.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Instalación del paquete de Python

Para instalar el paquete de Python correspondiente a Azure Synapse Data Explorer, abra un símbolo del sistema que tenga Python en su ruta de acceso. Ejecute el siguiente comando:

pip install azure-common
pip install azure-mgmt-kusto

Autenticación

Para ejecutar el siguiente ejemplo, necesita una aplicación de Microsoft Entra y un principal de servicio que pueda acceder a los recursos. Para crear una aplicación de Microsoft Entra gratuita y agregar una asignación de roles en el nivel de la suscripción, consulte Creación de una aplicación de Microsoft Entra. También necesita el identificador de directorio (inquilino), el identificador de aplicación y el secreto de cliente.

Incorporación de una conexión de datos de Event Hubs

En el ejemplo siguiente, se explica cómo se agrega una conexión de datos de Event Hubs mediante programación. Consulte Conexión a Event Hubs para agregar una conexión de datos de Event Hubs en Azure Portal.

from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials

#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
        client_id=client_id,
        secret=client_secret,
        tenant=tenant_id
    )
kusto_management_client = KustoManagementClient(credentials, subscription_id)

resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                        parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
                                                                            table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Configuración Valor sugerido Descripción del campo
identificador_de_inquilino xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx El identificador de inquilino. También conocido como identificador de directorio.
ID de suscripción xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Identificador de suscripción que se usa para la creación de recursos.
ID de cliente xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Identificador del cliente de la aplicación que puede acceder a los recursos del inquilino.
secreto_del_cliente xxxxxxxxxxx Secreto del cliente de la aplicación que puede acceder a los recursos del inquilino.
nombre_del_grupo_de_recursos testrg Nombre del grupo de recursos que contiene el clúster.
nombre_del_cluster mykustocluster Nombre del clúster.
nombre de la base de datos mykustodatabase Nombre de la base de datos de destino del clúster.
nombre_de_conexión_de_datos myeventhubconnect Nombre que desea asignar a la conexión de datos.
nombre_de_tabla StormEvents Nombre de la tabla de destino de la base de datos de destino.
nombre_de_regla_de_mapeo Mapeo_CSV_EventosDeTormenta Nombre de la asignación de columnas asociada a la tabla de destino.
formato_de_datos csv Formato de datos del mensaje.
centro_de_eventos_recurso_id Identificador del recurso Identificador del recurso del centro de eventos que contiene los datos que se van a ingerir.
grupo de consumidores $Default Grupo de consumidores del centro de eventos.
location Centro de EE. UU. Ubicación del recurso de conexión de datos.

Limpieza de recursos

Para eliminar la conexión de datos, use el siguiente comando:

kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)

Pasos siguientes