次の方法で共有


C# を使用して Azure Synapse Data Explorer 用の Event Hubs データベース接続を作成する (プレビュー)

Important

Azure Synapse Analytics データ エクスプローラー (プレビュー) は、2025 年 10 月 7 日に廃止されます。 この日以降、Synapse Data Explorer で実行されているワークロードは削除され、関連付けられているアプリケーション データは失われます。 Microsoft Fabric の Eventhouse に移行 することを強くお勧めします。

Microsoft Cloud Migration Factory (CMF) プログラムは、お客様が Fabric に移行できるように設計されています。 このプログラムは、顧客に無料でハンズオン キーボード リソースを提供します。 これらのリソースは、定義済みの合意されたスコープで、6 ~ 8 週間割り当てられます。 顧客の指名は、Microsoft アカウント チームから受け入れられるか、CMF チームに ヘルプの要求 を送信することによって直接受け入れられます。

Azure Synapse Data Explorer は、ログと利用統計情報のための高速で拡張性に優れたデータ探索サービスです。 Azure Synapse Data Explorer では、Event Hubs、IoT Hubs、および blob コンテナーに書き込まれた blob からのデータのインジェスト (読み込み) を提供します。

この記事では、C# を使用して Azure Synapse Data Explorer 用の Event Hubs データベース接続を作成します。

[前提条件]

  • Azure サブスクリプション。 無料の Azure アカウントを作成します。

  • Synapse Studio または Azure portal を使用して Data Explorer プールを作成します

  • Data Explorer データベースを作成します。

    1. Synapse Studio の左側のペインで、 [データ] を選択します。

    2. + (新しいリソースの追加) >[Data Explorer プール] を選択し、次の情報を使用します。

      Setting 推奨値 Description
      プール名 contosodataexplorer 使用する Data Explorer プールの名前
      名前 TestDatabase データベース名はクラスター内で一意である必要があります。
      既定のリテンション期間 365 クエリにデータを使用できることが保証される期間 (日数) です。 期間は、データが取り込まれた時点から測定されます。
      既定のキャッシュ期間 31 頻繁にクエリされるデータが、長期ストレージではなく SSD ストレージまたは RAM で利用できるように保持される期間 (日数) です。
    3. [作成] を選択してデータベースを作成します。 通常、作成にかかる時間は 1 分未満です。

Synapse ワークスペースでデータ流出防止が有効になっているマネージド仮想ネットワークを使用している場合、イベント ハブから Data Explorer プールへのデータの取り込みは機能しません。

  • Visual Studio 2019。無料のVisual Studio 2019 Community Edition をダウンロードして使用します。 Visual Studio のセットアップ中に、 [Azure の開発] を有効にしてください。

テスト クラスターにテーブルを作成する

StormEvents ファイル内のデータのスキーマと一致する、StormEvents.csv という名前のテーブルを作成します。

ヒント

次のコード スニペットは、ほぼすべての呼び出しに対してクライアントのインスタンスを作成します。 これは、各スニペットを個別に実行できるようにするためです。 実稼働環境では、クライアント インスタンスは再入可能であり、必要な限り保持する必要があります。 複数のデータベースを使用する場合でも、URI ごとに 1 つのクライアントインスタンスで十分です (データベースはコマンド レベルで指定できます)。

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

インジェスト マッピングを定義する

受信した CSV データを、テーブル作成時に使用される列名にマップします。 CSV 列マッピング オブジェクトをそのテーブルにプロビジョニングします。

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

C# NuGet をインストールする

Authentication

次の例を実行するには、リソースにアクセスできる Microsoft Entra アプリケーションとサービス プリンシパルが必要です。 無料の Microsoft Entra アプリケーションを作成し、サブスクリプション レベルでロールの割り当てを追加するには、Microsoft Entra アプリケーションの作成に関する記事を参照してください。 また、ディレクトリ (テナント) ID、アプリケーション ID、およびクライアント シークレットも必要です。

Event Hubs データ接続の追加

次の例は、Event Hubs データ接続をプログラムを使用して追加する方法を示しています。 Azure portal を使用して Event Hubs データベース接続を追加する方法については、「Event Hubs への接続」を参照してください。

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
設定 推奨値 フィールドの説明
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx テナント ID。 ディレクトリ ID とも呼ばれます。
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx リソースの作成に使用するサブスクリプション ID。
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx ご利用のテナント内のリソースにアクセスできるアプリケーションのクライアント ID。
clientSecret xxxxxxxxxxxxxx ご利用のテナント内のリソースにアクセスできるアプリケーションのクライアント シークレット。
resourceGroupName testrg ご利用のクラスターを含むリソース グループの名前。
クラスターネーム mykustocluster ご利用のクラスターの名前。
databaseName mykustodatabase ご利用のクラスター内のターゲット データベースの名前。
データ接続名 myeventhubconnect データ接続の任意の名前。
tableName StormEvents ターゲット データベース内のターゲット テーブルの名前。
マッピング規則名 StormEvents_CSV_Mapping ターゲット テーブルに関連付けられている列マッピングの名前。
データフォーマット csv メッセージのデータ形式。
eventHubリソースID リソース ID インジェスト用のデータを保持しているイベント ハブのリソース ID。
コンシューマーグループ $デフォルト ご利用のイベント ハブのコンシューマー グループ。
位置 米国中部 データ接続リソースの場所。
圧縮 Gzip または None データ圧縮の種類。

データの生成

データを生成してイベント ハブに送信するサンプル アプリをご覧ください。

イベントには、そのサイズ制限を上限として、1 つ以上のレコードを含めることができます。 次の例では、2 つのイベントを送信します。それぞれに 5 つのレコードが追加されています。

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

リソースをクリーンアップする

データ接続を削除するには、次のコマンドを使用します。

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

次のステップ