Azure Data Explorer は、ログと利用統計情報データのための高速で拡張性に優れたデータ探索サービスです。 Azure Data Explorer サービスとやり取りするための Go SDK クライアント ライブラリが提供されます。 Go SDK を使用して、Azure Data Explorer クラスター内のデータを取り込み、制御し、クエリを実行できます。
この記事ではまず、テスト クラスター内にテーブルとデータ マッピングを作成します。 その後、Go SDK を使用してクラスターに対するインジェストをキューに登録し、結果を確認します。
前提条件
- Microsoft アカウントまたは Microsoft Entra ユーザー ID。 Azure サブスクリプションは不要です。
- Azure Data Explorer クラスターとデータベース。 クラスターとデータベースを作成します。
- Git のインストール。
- この Go SDK の最小要件に従って、Go をインストールします。
- アプリの登録を作成し、データベースに対するアクセス許可を付与します。 後で使用するために、クライアント ID とクライアント シークレットを保存します。
Go SDK をインストールする
Go モジュールを使用するサンプル アプリケーションを実行すると、Azure Data Explorer Go SDK が自動的にインストールされます。 別のアプリケーション用の Go SDK をインストールした場合は、次の例のように、Go モジュールを作成し、(go get を使用して) Azure Data Explorer パッケージをフェッチします。
go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto
パッケージの依存関係は go.mod ファイルに追加されます。 これは Go アプリケーションで使用します。
コードの確認
この「コードの確認」セクションは省略可能です。 コードのしくみに関心がある場合は、次のコード スニペットで確認できます。 それ以外の場合は、「アプリケーションの実行」に進んでください。
認証
何らかの操作を実行する前に、プログラムから Azure Data Explorer サービスに対して認証を行う必要があります。
auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)
kusto.Authorization のインスタンスは、サービス プリンシパルの資格情報を使用して作成されます。 その後、kusto.Client を作成するために使用されます。その際に、クラスター エンドポイントも受け入れる新しい関数が使用されます。
テーブルを作成する
create table コマンドは、Kusto ステートメントによって表されます。Mgmt 関数は、管理コマンドを実行するために使用されます。 これは、コマンドを実行してテーブルを作成するために使用されます。
func createTable(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
if err != nil {
log.Fatal("failed to create table", err)
}
log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}
ヒント
セキュリティを強化するため、既定では、Kusto ステートメントは定数となります。 NewStmt で文字列定数が受け入れられます。 UnsafeStmt API で非定数のステートメント セグメントを使用できますが、推奨されません。
Kusto の create table コマンドは、次のようになります。
.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)
マッピングを作成する
インジェスト中にデータ マッピングが使用され、受信データが Azure Data Explorer テーブル内の列にマップされます。 詳細については、「データ マッピング」を参照してください。 テーブルと同じ方法でマッピングが作成されます。その場合、Mgmt 関数を使用し、データベース名と適切なコマンドを指定します。 完全なコマンドは、サンプルの GitHub リポジトリで入手できます。
func createMapping(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
if err != nil {
log.Fatal("failed to create mapping - ", err)
}
log.Printf("Mapping %s created\n", kustoMappingRefName)
}
データを取り込む
インジェストは、既存の Azure Blob Storage コンテナーのファイルを使用してキューに登録されます。
func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
kIngest, err := ingest.New(kc, kustoDB, kustoTable)
if err != nil {
log.Fatal("failed to create ingestion client", err)
}
blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))
if err != nil {
log.Fatal("failed to ingest file", err)
}
log.Println("Ingested file from -", blobStorePath)
}
インジェスト クライアントは、ingest.New を使用して作成されます。 FromFile 関数は、Azure Blob Storage URI を参照するために使用されます。 マッピング参照名とデータ型は FileOption の形式で渡されます。
アプリケーションの実行
GitHub から次のサンプル コードをクローンします。
git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git cd Azure-Data-Explorer-Go-SDK-example-to-ingest-datamain.goのこのスニペットに示されているサンプル コードを実行します。func main { ... dropTable(kc, kustoDB) createTable(kc, kustoDB) createMapping(kc, kustoDB) ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable) ... }ヒント
操作のさまざまな組み合わせを試すために、
main.go内の各関数をコメント解除したり、コメント化したりすることができます。サンプル コードを実行すると、次のアクションが行われます。
- テーブルを削除する:
StormEventsテーブルが削除されます (存在する場合)。 - テーブルの作成:
StormEventsテーブルが作成されます。 - マッピングの作成:
StormEvents_CSV_Mappingマッピングが作成されます。 - ファイル インジェスト: (Azure Blob Storage 内の) CSV ファイルがインジェストのためにキューに入れられます。
- テーブルを削除する:
認証用のサービス プリンシパルを作成するには、Azure CLI を使用して az ad sp create-for-rbac コマンドを指定します。 プログラムによって使用される環境変数の形式で、クラスター エンドポイントとデータベース名を指定してサービス プリンシパル情報を設定します。
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export AZURE_SP_TENANT_ID="<replace with tenant>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"以下のプログラムを実行します。
go run main.go次のような出力が表示されます。
Connected to Azure Data Explorer Using database - testkustodb Failed to drop StormEvents table. Maybe it does not exist? Table StormEvents created in DB testkustodb Mapping StormEvents_CSV_Mapping created Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
確認してトラブルシューティングを行う
キューに登録されたインジェストでインジェスト プロセスがスケジュールされ、Azure Data Explorer にデータが読み込まれるまで、5 分から 10 分待ちます。
https://dataexplorer.azure.com にサインインして、クラスターに接続します。 その後、次のコマンドを実行して、
StormEventsテーブル内のレコードの数を取得します。StormEvents | countデータベースで次のコマンドを実行し、過去 4 時間以内にインジェスト エラーがあったかどうかを調べます。 実行する前にデータベース名を置き換えてください。
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"次のコマンドを実行し、過去 4 時間以内のすべてのインジェスト操作の状態を表示します。 実行する前にデータベース名を置き換えてください。
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
リソースをクリーンアップする
他の記事に進む場合は、作成したリソースをそのままにします。 そのようにしない場合は、データベースで次のコマンドを実行し、StormEvents テーブルを削除します。
.drop table StormEvents