Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
O Azure Data Explorer é um serviço de exploração de dados rápido e altamente dimensionável para dados telemétricos e de registo. A biblioteca de cliente Java pode ser utilizada para ingerir dados, comandos de gestão de problemas e consultar dados em clusters do Azure Data Explorer.
Neste artigo, saiba como ingerir dados com a biblioteca Java do Azure Data Explorer. Primeiro, vai criar uma tabela e um mapeamento de dados num cluster de teste. Em seguida, colocará em fila uma ingestão do armazenamento de blobs para o cluster com o SDK Java e validará os resultados.
Pré-requisitos
- Uma conta Microsoft ou uma identidade de utilizador Microsoft Entra. Não é necessária uma subscrição do Azure.
- Um cluster e uma base de dados do Azure Data Explorer. Criar um cluster e uma base de dados.
- Git.
- JDK versão 1.8 ou posterior.
- Maven.
- Crie um Registo de Aplicações e conceda-lhe permissões para a base de dados. Guarde o ID de cliente e o segredo do cliente para utilização posterior.
Rever o código
Esta secção é opcional. Reveja os seguintes fragmentos de código para saber como funciona o código. Para ignorar esta secção, aceda a executar a aplicação.
Autenticação
O programa utiliza Microsoft Entra credenciais de autenticação com ConnectionStringBuilder".
Crie um
com.microsoft.azure.kusto.data.Clientpara consulta e gestão.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }Crie e utilize um
com.microsoft.azure.kusto.ingest.IngestClientpara colocar em fila a ingestão de dados no Azure Data Explorer:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Comandos de gestão
Os comandos de gestão, como .drop e .create, são executados ao chamar execute num com.microsoft.azure.kusto.data.Client objeto.
Por exemplo, a tabela é criada da StormEvents seguinte forma:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Ingestão de dados
Ingestão de filas com um ficheiro de um contentor de Armazenamento de Blobs do Azure existente.
- Utilize
BlobSourceInfopara especificar o caminho do Armazenamento de Blobs. - Utilize
IngestionPropertiespara definir a tabela, a base de dados, o nome do mapeamento e o tipo de dados. No exemplo seguinte, o tipo de dados éCSV.
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
O processo de ingestão começa num thread separado e o main thread aguarda que o thread de ingestão seja concluído. Este processo utiliza CountdownLatch. A API de ingestão (IngestClient#ingestFromBlob) não é assíncrona. É while utilizado um ciclo para consultar o estado atual a cada 5 segundos e aguarda que o estado da ingestão mude de Pending para um estado diferente. O estado final pode ser Succeeded, Failedou PartiallySucceeded.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Dica
Existem outros métodos para processar a ingestão de forma assíncrona para diferentes aplicações. Por exemplo, pode utilizar CompletableFuture para criar um pipeline que defina a ação após a ingestão, como consultar a tabela ou processar exceções comunicadas ao IngestionStatus.
Executar a aplicação
Geral
Quando executa o código de exemplo, são executadas as seguintes ações:
-
Tabela pendente:
StormEventsa tabela é removida (se existir). -
Criação de tabelas:
StormEventsa tabela é criada. -
Criação do mapeamento:
StormEvents_CSV_Mappingo mapeamento é criado. - Ingestão de ficheiros: um ficheiro CSV (no Armazenamento de Blobs do Azure) está em fila de espera para ingestão.
O seguinte código de exemplo é de App.java:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Dica
Para experimentar diferentes combinações de operações, anule o comentário/comente os respetivos métodos em App.java.
Executar a aplicação
Clone o código de exemplo do GitHub:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingestDefina as informações do principal de serviço com as seguintes informações como variáveis de ambiente utilizadas pelo programa:
- Ponto final do cluster
- Nome da base de dados
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"Compilar e executar:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jarO resultado será semelhante a:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Aguarde alguns minutos até que o processo de ingestão seja concluído. Após a conclusão com êxito, verá a seguinte mensagem de registo: Ingestion completed successfully. Pode sair do programa neste momento e avançar para o passo seguinte sem afetar o processo de ingestão, que já foi colocado em fila.
Validação
Aguarde 5 a 10 minutos para que a ingestão em fila agende o processo de ingestão e carregue os dados para o Azure Data Explorer.
Inicie sessão no https://dataexplorer.azure.com e ligue ao cluster.
Execute o seguinte comando para obter a contagem de registos na
StormEventstabela:StormEvents | count
Resolução de problemas
Para ver falhas de ingestão nas últimas quatro horas, execute o seguinte comando na base de dados:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"Para ver o estado de todas as operações de ingestão nas últimas quatro horas, execute o seguinte comando:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Limpar os recursos
Se não planear utilizar os recursos que criou, execute o seguinte comando na base de dados para remover a StormEvents tabela.
.drop table StormEvents