Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
O Apache Kafka é uma plataforma de streaming distribuída para a construção de pipelines de dados de streaming em tempo real que movem dados de forma confiável entre sistemas ou aplicativos. Kafka Connect é uma ferramenta para streaming escalável e confiável de dados entre o Apache Kafka e outros sistemas de dados. O Kusto Kafka Sink serve como o conector de Kafka e não requer o uso de código. Baixe o jar do conector de saída do repositório Git ou Confluent Connector Hub.
Este artigo mostra como processar dados com o Kafka, usando uma configuração autossuficiente do Docker para simplificar a configuração dos clusters Kafka e do conector Kafka.
Para obter mais informações, consulte o repositório Git do conector e as especificações da versão.
Pré-requisitos
- Uma assinatura do Azure. Crie uma conta do Azure gratuita.
- Um cluster e banco de dados do Azure Data Explorer com as políticas de cache e retenção padrão.
- Azure CLI (Interface de Linha de Comandos).
- Docker e Docker Compose.
Criar um principal de serviço Microsoft Entra
A entidade de serviço do Microsoft Entra pode ser criada através do portal do Azure ou programaticamente, como no exemplo a seguir.
Esta entidade de serviço é a identidade usada pelo conector para gravar dados da sua tabela no Kusto. Você concede permissões para este principal de serviço aceder a recursos do Kusto.
Inicie sessão na sua subscrição do Azure através da CLI do Azure. Em seguida, autentique-se no navegador.
az loginEscolha a assinatura para hospedar o principal. Esta etapa é necessária quando você tem várias assinaturas.
az account set --subscription YOUR_SUBSCRIPTION_GUIDCrie o principal de serviço. Neste exemplo, a entidade de serviço é chamada
my-service-principal.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}Dos dados JSON retornados, copie o
appId,passwordetenantpara uso futuro.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Você criou a sua aplicação e principal de serviço do Microsoft Entra.
Criar uma tabela de destino
No ambiente de consulta, crie uma tabela chamada
Stormsusando o seguinte comando:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)Crie o mapeamento
Storms_CSV_Mappingde tabela correspondente para dados ingeridos usando o seguinte comando:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'Crie uma política de lote de ingestão na tabela para latência configurável de ingestão em fila.
Sugestão
A política de lote de ingestão é um otimizador de desempenho e inclui três parâmetros. A primeira condição satisfeita desencadeia a inserção na tabela.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'Utilize a entidade de serviço de Criar uma entidade de serviço do Microsoft Entra para conceder permissão para interagir com a base de dados.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Iniciar o laboratório
O laboratório a seguir foi projetado para oferecer a experiência de começar a criar dados, configurar o conector Kafka e transmitir esses dados. Você pode então analisar os dados ingeridos.
Clone o repositório git
Clone o git repositório do laboratório.
Crie um diretório local na sua máquina.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-holClone o repositório.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Conteúdo do repositório clonado
Execute o seguinte comando para listar o conteúdo do repositório clonado:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Este resultado desta pesquisa é:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Revise os arquivos no repositório clonado
As seções a seguir explicam as partes importantes dos arquivos na árvore de arquivos.
adx-sink-config.json
Este arquivo contém o arquivo de propriedades do coletor Kusto onde você atualiza detalhes específicos de configuração:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Substitua os valores dos seguintes atributos de acordo com sua configuração: aad.auth.authority, aad.auth.appid, aad.auth.appkey, ( kusto.tables.topics.mapping o nome do banco de dados), kusto.ingestion.urle kusto.query.url.
Conector - Dockerfile
Este arquivo tem os comandos para gerar a imagem docker para a instância do conector. Inclui o download do conector a partir do diretório de lançamento do repositório Git.
Diretório de produtores de fenómenos de tempestade
Este diretório tem um programa Go que lê um arquivo "StormEvents.csv" local e publica os dados em um tópico Kafka.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Iniciar os contêineres
Num terminal, inicie os contentores:
docker-compose upO aplicativo produtor começa a enviar eventos para o
storm-eventstópico. Você verá logs semelhantes aos seguintes..... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....Para verificar os logs, execute o seguinte comando em um terminal separado:
docker-compose logs -f | grep kusto-connect
Inicie o conector
Use uma chamada Kafka Connect REST para iniciar o conector.
Em um terminal separado, inicie a tarefa de coletor com o seguinte comando:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectorsPara verificar o status, execute o seguinte comando em um terminal separado:
curl http://localhost:8083/connectors/storm/status
O conector inicia processos de ingestão em fila.
Observação
Se você tiver problemas com o conector de log, crie um problema.
Identidade gerenciada
Por padrão, o conector Kafka usa o método de aplicativo para autenticação durante a ingestão. Para autenticar usando a identidade gerenciada:
Atribua ao cluster uma identidade gerenciada e conceda permissões de leitura à sua conta de armazenamento. Para obter mais informações, consulte Ingerir dados usando autenticação de identidade gerenciada.
No arquivo adx-sink-config.json , defina
aad.auth.strategycomomanaged_identitye verifique seaad.auth.appidestá definido como o ID do cliente de identidade gerenciada (aplicativo).Use um token de serviço de metadados de instância privada em vez da identidade de serviço da Microsoft Entra.
Observação
Ao usar uma identidade gerenciada, appId e tenant são deduzidos do contexto do site de chamada e password não é necessário.
Consultar e rever dados
Confirmar a ingestão de dados
Assim que os dados chegarem à
Stormstabela, confirme a transferência de dados, verificando a contagem de linhas:Storms | countConfirme que não há falhas no processo de ingestão:
.show ingestion failuresDepois de ver os dados, experimente algumas consultas.
Consultar os dados
Para ver todos os registros, execute a seguinte consulta:
Storms | take 10Use
whereeprojectpara filtrar dados específicos.Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventIdUse o
summarizeoperador:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Para obter mais exemplos de consulta e orientação, consulte Escrever consultas na documentação do KQL e Kusto Query Language.
Reiniciar
Para redefinir, execute as seguintes etapas:
- Parar os contentores (
docker-compose down -v) - Suprimir (
drop table Storms) - Recriar a tabela
Storms - Recriar mapeamento de tabela
- Reiniciar contêineres (
docker-compose up)
Limpeza de recursos
Para excluir os recursos do Azure Data Explorer, use az kusto cluster delete (extensão kusto) ou az kusto database delete (extensão kusto):
az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"
Você também pode excluir seu cluster e banco de dados por meio do portal do Azure. Para obter mais informações, consulte Excluir um cluster do Azure Data Explorer e Excluir um banco de dados no Azure Data Explorer.
Ajustando o conector Kafka Sink
Ajustar o conector Kafka Sink para funcionar com a política de agrupamento de ingestão:
- Ajuste o limite de tamanho do destino de dados
flush.size.bytesKafka a partir de 1 MB, com incrementos de 10 MB ou 100 MB. - Ao usar o Kafka Sink, os dados são agregados duas vezes. No lado do conector, os dados são agregados de acordo com as definições de esvaziamento e no lado do serviço de acordo com a política de agrupamento. Se o tempo de batching for muito curto e os dados não puderem ser ingeridos pelo conector e pelo serviço, o tempo de batching deve ser aumentado. Defina o tamanho do lote em 1 GB e aumente ou diminua em incrementos de 100 MB, conforme necessário. Por exemplo, se o tamanho da liberação for de 1 MB e o tamanho da política de lote for 100 MB, o conector Kafka Sink agregará dados em um lote de 100 MB. Esse lote é então ingerido pelo serviço. Se o tempo da política de processamento em lote for de 20 segundos e o conector Kafka Sink liberar 50 MB em um período de 20 segundos, o serviço ingerirá um lote de 50 MB.
- Você pode dimensionar adicionando instâncias e partições Kafka. Aumente
tasks.maxo número de partições. Crie uma partição se tiver dados suficientes para produzir um blob do tamanho daflush.size.bytesconfiguração. Se o blob for menor, o lote será processado quando atingir o limite de tempo, portanto, a partição não receberá taxa de transferência suficiente. Um grande número de partições significa mais sobrecarga de processamento.
Conteúdo relacionado
- Saiba mais sobre a arquitetura de Big Data.
- Saiba como ingerir dados de exemplo formatados em JSON no Azure Data Explorer.
- Saiba mais com os laboratórios Kafka: