Compartilhar via


Criar uma conexão de dados dos Hubs de Eventos com o Azure Synapse Data Explorer usando C# (versão prévia)

Importante

O Azure Synapse Analytics Data Explorer (versão prévia) será desativado em 7 de outubro de 2025. Após essa data, as cargas de trabalho em execução no Synapse Data Explorer serão excluídas e os dados do aplicativo associado serão perdidos. É altamente recomendável migrar para o Eventhouse no Microsoft Fabric.

O programa CMF (Microsoft Cloud Migration Factory) foi projetado para ajudar os clientes na migração para o Fabric. O programa oferece recursos práticos de teclado sem custo para o cliente. Esses recursos são atribuídos por um período de 6 a 8 semanas, com um escopo predefinido e acordado. As nomeações de clientes são aceitas da equipe de conta da Microsoft ou diretamente enviando uma solicitação de ajuda à equipe do CMF.

O Azure Synapse Data Explorer é um serviço de exploração de dados rápido e altamente escalonável para dados de log e telemetria. O Data Explorer do Azure Synapse oferece ingestão (carregamento de dados) dos Hubs de Eventos, Hubs IoT e blobs gravados nos contêineres de blob.

Neste artigo, você criará uma conexão de dados dos Hubs de Eventos para o Azure Synapse Data Explorer usando C#.

Pré-requisitos

  • Uma assinatura do Azure. Criar uma conta gratuita do Azure.

  • Criar um pool do Data Explorer usando o Synapse Studio ou o portal do Azure

  • Criar um banco de dados do Data Explorer.

    1. No Synapse Studio, no painel esquerdo, selecione Dados.

    2. Selecione + (Adicionar novo recurso) >Pool do Data Explorer e use as seguintes informações:

      Configurações Valor sugerido Description
      Nome da piscina contosodataexplorer O nome do pool do Data Explorer que será utilizado
      Nome TestDatabase O nome do banco de dados deve ser exclusivo dentro do cluster.
      Período de retenção padrão 365 O período de tempo (em dias) durante o qual há a garantia de que os dados serão mantidos disponíveis para consulta. O intervalo de tempo é medido a partir do momento em que os dados são ingeridos.
      Período de cache padrão 31 O período de tempo (em dias) durante o qual os dados consultados com frequência devem ser mantidos disponíveis no armazenamento SSD ou RAM, em vez de no armazenamento de longo prazo.
    3. Selecione Criar para criar o banco de dados. A criação geralmente leva menos de um minuto.

Observação

A ingestão de dados de um Hub de Eventos em pools do Data Explorer não funcionará se o seu espaço de trabalho do Synapse usar uma rede virtual gerenciada com proteção contra exfiltração de dados ativada.

  • Visual Studio 2019, baixe e use o Visual Studio 2019 Community Editiongratuito. Habilite o desenvolvimento do Azure durante a instalação do Visual Studio.

Criar uma tabela em seu cluster de teste

Crie uma tabela chamada StormEvents que corresponde ao esquema dos dados no arquivo StormEvents.csv.

Dica

Os snippets de código a seguir criam uma instância de um cliente para quase todas as chamadas. Isso é feito para tornar cada snippet de código individualmente executável. Na produção, as instâncias do cliente são reentrantes e devem ser mantidas pelo tempo que for necessário. Uma só instância de cliente por URI é suficiente, mesmo quando você trabalha com vários bancos de dados (o banco de dados pode ser especificado em um nível de comando).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definir mapeamento de ingestão

Mapear os dados CSV de entrada para os nomes de colunas usados ao criar a tabela. Provisione um objeto de mapeamento de coluna CSV nessa tabela.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Instalar NuGet do C#

Authentication

Para executar o exemplo a seguir, você precisará de um aplicativo Microsoft Entra e um principal de serviço que possam acessar recursos. Para criar um aplicativo gratuito do Microsoft Entra e adicionar a atribuição de função no nível da assinatura, consulte Como criar um aplicativo do Microsoft Entra. Você também precisará da ID do diretório (locatário), da ID do aplicativo e do segredo do cliente.

Adicionar uma conexão de dados dos Hubs de Eventos

O exemplo a seguir mostra como adicionar uma conexão de dados dos Hubs de Eventos programaticamente. Confira conectar-se aos Hubs de Eventos para obter informações sobre como adicionar uma conexão de dados dos Hubs de Eventos usando o portal do Azure.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Configurações Valor sugerido Descrição do campo
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx ID do locatário. Também conhecida como ID do diretório.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx A ID da assinatura que você usa para a criação de recursos.
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx A ID do cliente do aplicativo que pode acessar recursos em seu locatário.
clientSecret xxxxxxxxxxxxxx O segredo do cliente do aplicativo que pode acessar recursos no seu locatário.
resourceGroupName testrg O nome do grupo de recursos que contém o seu cluster.
nome do cluster mykustocluster O nome do cluster.
databaseName mykustodatabase O nome do banco de dados de destino no cluster.
Nome da Conexão de Dados myeventhubconnect O nome desejado da conexão de dados.
tableName StormEvents O nome da tabela de destino no banco de dados de destino.
Nome da Regra de Mapeamento StormEvents_CSV_Mapping (Mapeamento_StormEvents_CSV) O nome do mapeamento de coluna relacionado à tabela de destino.
formato de dados csv O formato de dados da mensagem.
eventHubResourceId ID do recurso A ID do recurso do hub de eventos que contém os dados para ingestão.
grupo de consumidores $Default O grupo de consumidores do hub de eventos.
local Centro dos EUA A localização do recurso de conexão de dados.
compactação Gzip ou Nenhum O tipo de compactação de dados.

Gerar dados

Confira o aplicativo de exemplo que gera dados e os envia para um hub de eventos.

Um evento pode conter um ou mais registros, até o limite de tamanho. No exemplo a seguir, enviamos dois eventos, cada um tem cinco registros em anexo:

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

Limpar os recursos

Para excluir a conexão de dados, use o seguinte comando:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

Próximas etapas