Partilhar via


Criar uma conexão de dados de Hubs de Eventos para o Azure Synapse Data Explorer usando C# (Visualização)

Importante

O Azure Synapse Analytics Data Explorer (Visualização) será desativado em 7 de outubro de 2025. Após essa data, as cargas de trabalho em execução no Synapse Data Explorer serão excluídas e os dados do aplicativo associados serão perdidos. É altamente recomendável migrar para o Eventhouse no Microsoft Fabric.

O programa Microsoft Cloud Migration Factory (CMF) foi projetado para ajudar os clientes na migração para o Fabric. O programa oferece recursos práticos de teclado sem nenhum custo para o cliente. Estes recursos são atribuídos por um período de 6-8 semanas, com um âmbito pré-definido e acordado. As nomeações de clientes são aceites pela equipa da conta Microsoft ou diretamente através do envio de um pedido de ajuda à equipa CMF.

O Azure Synapse Data Explorer é um serviço de exploração de dados rápido e altamente escalável para dados de log e telemetria. O Azure Synapse Data Explorer oferece ingestão (carregamento de dados) de Hubs de Eventos, Hubs IoT e blobs gravados em contentores de blob.

Neste artigo, você cria uma conexão de dados de Hubs de Eventos para o Azure Synapse Data Explorer usando C#.

Pré-requisitos

  • Uma assinatura do Azure. Crie uma conta do Azure gratuita.

  • Criar um pool do Data Explorer usando o Synapse Studio ou o portal do Azure

  • Crie um banco de dados do Data Explorer.

    1. No Synapse Studio, no painel do lado esquerdo, selecione Dados.

    2. Selecione + (Adicionar novo recurso) >Pool do Data Explorer e use as seguintes informações:

      Configuração Valor sugerido Description
      Nome do grupo contosodataexplorer O nome do grupo do Data Explorer a ser usado
      Nome TestDatabase O nome da base de dados tem de ser exclusivo dentro do cluster.
      Período de retenção predefinido 365 O período de tempo (em dias) durante o qual é garantido que os dados são mantidos disponíveis para consulta. O intervalo de tempo é medido desde o momento em que os dados são ingeridos.
      Período de cache padrão 31 O período de tempo (em dias) durante o qual manter os dados frequentemente consultados disponíveis no armazenamento SSD ou RAM, em vez de no armazenamento a longo prazo.
    3. Selecione Criar para criar o banco de dados. Normalmente, a criação demora menos de um minuto.

Observação

A ingestão de dados de um Hub de Eventos em pools do Data Explorer não funcionará se o seu espaço de trabalho Synapse usar uma rede virtual gerida com proteção contra exfiltração de dados ativada.

Criar uma tabela no cluster de teste

Crie uma tabela nomeada StormEvents que corresponda ao esquema dos dados no StormEvents.csv arquivo.

Sugestão

Os trechos de código a seguir criam uma instância de um cliente para quase todas as chamadas. Isso é feito para tornar cada trecho executável individualmente. Na produção, as instâncias do cliente são reentrantes e devem ser mantidas pelo tempo necessário. Uma única instância de cliente por URI é suficiente, mesmo quando se trabalha com vários bancos de dados (o banco de dados pode ser especificado em um nível de comando).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definir o mapeamento de ingestão

Mapeie os dados CSV de entrada para os nomes de coluna usados ao criar a tabela. Provisionar um objeto de mapeamento de coluna CSV nessa tabela.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Instalar o C# NuGet

Authentication

Para executar o exemplo seguinte, necessita-se de uma aplicação Microsoft Entra e um principal de serviço que possa aceder a recursos. Para criar um aplicativo Microsoft Entra gratuito e adicionar atribuição de função no nível da assinatura, consulte Criar um aplicativo Microsoft Entra. Você também precisa do ID do diretório (locatário), do identificador da aplicação e da chave do cliente.

Adicionar uma conexão de dados de Hubs de Eventos

O exemplo a seguir mostra como adicionar uma conexão de dados de Hubs de Eventos programaticamente. Consulte ligar-se aos Hubs de Eventos para obter informações sobre como adicionar uma conexão de dados aos Hubs de Eventos usando o portal do Azure.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Setting Valor sugerido Descrição do campo
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx O ID do inquilino. Também conhecido como ID de diretório.
ID de subscrição xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx A ID de assinatura que você usa para a criação de recursos.
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx A ID do cliente do aplicativo que pode acessar recursos em seu locatário.
clientSecret xxxxxxxxxxxxxx O segredo do cliente do aplicativo que pode acessar recursos em seu locatário.
resourceGroupName Testrg O nome do grupo de recursos que contém o cluster.
nome_do_cluster Mykustocluster O nome do seu cluster.
databaseName mykustodatabase O nome do banco de dados de destino no cluster.
dataConnectionName MyEventHubConnect O nome desejado da sua conexão de dados.
tableName StormEvents O nome da tabela de destino no banco de dados de destino.
nomeDaRegraDeMapeamento StormEvents_CSV_Mapping O nome do mapeamento de colunas relacionado à tabela de destino.
formato de dados CSV O formato de dados da mensagem.
ID do recurso do Event Hub ID do recurso O ID do recurso do hub de eventos que contém os dados para ingestão.
Grupo de consumidores $Default O grupo de consumidores do seu hub de eventos.
localização Centro dos EUA O local do recurso de conexão de dados.
compressão Gzip ou Nenhum O tipo de compressão de dados.

Gerar dados

Veja o aplicativo de exemplo que gera dados e os envia para um hub de eventos.

Um evento pode conter um ou mais registros, até seu limite de tamanho. No exemplo a seguir, enviamos dois eventos, cada um com cinco registros anexados:

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

Limpeza de recursos

Para excluir a conexão de dados, use o seguinte comando:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

Próximos passos