Partilhar via


Ingerir dados do Apache Kafka no Azure Data Explorer

O Apache Kafka é uma plataforma de streaming distribuída para a construção de pipelines de dados de streaming em tempo real que movem dados de forma confiável entre sistemas ou aplicativos. Kafka Connect é uma ferramenta para streaming escalável e confiável de dados entre o Apache Kafka e outros sistemas de dados. O Kusto Kafka Sink serve como o conector de Kafka e não requer o uso de código. Baixe o jar do conector de saída do repositório Git ou Confluent Connector Hub.

Este artigo mostra como processar dados com o Kafka, usando uma configuração autossuficiente do Docker para simplificar a configuração dos clusters Kafka e do conector Kafka.

Para obter mais informações, consulte o repositório Git do conector e as especificações da versão.

Pré-requisitos

Criar um principal de serviço Microsoft Entra

A entidade de serviço do Microsoft Entra pode ser criada através do portal do Azure ou programaticamente, como no exemplo a seguir.

Esta entidade de serviço é a identidade usada pelo conector para gravar dados da sua tabela no Kusto. Você concede permissões para este principal de serviço aceder a recursos do Kusto.

  1. Inicie sessão na sua subscrição do Azure através da CLI do Azure. Em seguida, autentique-se no navegador.

    az login
    
  2. Escolha a assinatura para hospedar o principal. Esta etapa é necessária quando você tem várias assinaturas.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Crie o principal de serviço. Neste exemplo, a entidade de serviço é chamada my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Dos dados JSON retornados, copie o appId, passworde tenant para uso futuro.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Você criou a sua aplicação e principal de serviço do Microsoft Entra.

Criar uma tabela de destino

  1. No ambiente de consulta, crie uma tabela chamada Storms usando o seguinte comando:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Crie o mapeamento Storms_CSV_Mapping de tabela correspondente para dados ingeridos usando o seguinte comando:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Crie uma política de lote de ingestão na tabela para latência configurável de ingestão em fila.

    Sugestão

    A política de lote de ingestão é um otimizador de desempenho e inclui três parâmetros. A primeira condição satisfeita desencadeia a inserção na tabela.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Utilize a entidade de serviço de Criar uma entidade de serviço do Microsoft Entra para conceder permissão para interagir com a base de dados.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Iniciar o laboratório

O laboratório a seguir foi projetado para oferecer a experiência de começar a criar dados, configurar o conector Kafka e transmitir esses dados. Você pode então analisar os dados ingeridos.

Clone o repositório git

Clone o git repositório do laboratório.

  1. Crie um diretório local na sua máquina.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Clone o repositório.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Conteúdo do repositório clonado

Execute o seguinte comando para listar o conteúdo do repositório clonado:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Este resultado desta pesquisa é:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Revise os arquivos no repositório clonado

As seções a seguir explicam as partes importantes dos arquivos na árvore de arquivos.

adx-sink-config.json

Este arquivo contém o arquivo de propriedades do coletor Kusto onde você atualiza detalhes específicos de configuração:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Substitua os valores dos seguintes atributos de acordo com sua configuração: aad.auth.authority, aad.auth.appid, aad.auth.appkey, ( kusto.tables.topics.mapping o nome do banco de dados), kusto.ingestion.urle kusto.query.url.

Conector - Dockerfile

Este arquivo tem os comandos para gerar a imagem docker para a instância do conector. Inclui o download do conector a partir do diretório de lançamento do repositório Git.

Diretório de produtores de fenómenos de tempestade

Este diretório tem um programa Go que lê um arquivo "StormEvents.csv" local e publica os dados em um tópico Kafka.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Iniciar os contêineres

  1. Num terminal, inicie os contentores:

    docker-compose up
    

    O aplicativo produtor começa a enviar eventos para o storm-events tópico. Você verá logs semelhantes aos seguintes.

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Para verificar os logs, execute o seguinte comando em um terminal separado:

    docker-compose logs -f | grep kusto-connect
    

Inicie o conector

Use uma chamada Kafka Connect REST para iniciar o conector.

  1. Em um terminal separado, inicie a tarefa de coletor com o seguinte comando:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Para verificar o status, execute o seguinte comando em um terminal separado:

    curl http://localhost:8083/connectors/storm/status
    

O conector inicia processos de ingestão em fila.

Observação

Se você tiver problemas com o conector de log, crie um problema.

Identidade gerenciada

Por padrão, o conector Kafka usa o método de aplicativo para autenticação durante a ingestão. Para autenticar usando a identidade gerenciada:

  1. Atribua ao cluster uma identidade gerenciada e conceda permissões de leitura à sua conta de armazenamento. Para obter mais informações, consulte Ingerir dados usando autenticação de identidade gerenciada.

  2. No arquivo adx-sink-config.json , defina aad.auth.strategy como managed_identity e verifique se aad.auth.appid está definido como o ID do cliente de identidade gerenciada (aplicativo).

  3. Use um token de serviço de metadados de instância privada em vez da identidade de serviço da Microsoft Entra.

Observação

Ao usar uma identidade gerenciada, appId e tenant são deduzidos do contexto do site de chamada e password não é necessário.

Consultar e rever dados

Confirmar a ingestão de dados

  1. Assim que os dados chegarem à Storms tabela, confirme a transferência de dados, verificando a contagem de linhas:

    Storms 
    | count
    
  2. Confirme que não há falhas no processo de ingestão:

    .show ingestion failures
    

    Depois de ver os dados, experimente algumas consultas.

Consultar os dados

  1. Para ver todos os registros, execute a seguinte consulta:

    Storms
    | take 10
    
  2. Use where e project para filtrar dados específicos.

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. Use o summarize operador:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Captura de ecrã dos resultados do gráfico de colunas de consulta Kafka ligado.

Para obter mais exemplos de consulta e orientação, consulte Escrever consultas na documentação do KQL e Kusto Query Language.

Reiniciar

Para redefinir, execute as seguintes etapas:

  1. Parar os contentores (docker-compose down -v)
  2. Suprimir (drop table Storms)
  3. Recriar a tabela Storms
  4. Recriar mapeamento de tabela
  5. Reiniciar contêineres (docker-compose up)

Limpeza de recursos

Para excluir os recursos do Azure Data Explorer, use az kusto cluster delete (extensão kusto) ou az kusto database delete (extensão kusto):

az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"

Você também pode excluir seu cluster e banco de dados por meio do portal do Azure. Para obter mais informações, consulte Excluir um cluster do Azure Data Explorer e Excluir um banco de dados no Azure Data Explorer.

Ajustando o conector Kafka Sink

Ajustar o conector Kafka Sink para funcionar com a política de agrupamento de ingestão:

  • Ajuste o limite de tamanho do destino de dados flush.size.bytes Kafka a partir de 1 MB, com incrementos de 10 MB ou 100 MB.
  • Ao usar o Kafka Sink, os dados são agregados duas vezes. No lado do conector, os dados são agregados de acordo com as definições de esvaziamento e no lado do serviço de acordo com a política de agrupamento. Se o tempo de batching for muito curto e os dados não puderem ser ingeridos pelo conector e pelo serviço, o tempo de batching deve ser aumentado. Defina o tamanho do lote em 1 GB e aumente ou diminua em incrementos de 100 MB, conforme necessário. Por exemplo, se o tamanho da liberação for de 1 MB e o tamanho da política de lote for 100 MB, o conector Kafka Sink agregará dados em um lote de 100 MB. Esse lote é então ingerido pelo serviço. Se o tempo da política de processamento em lote for de 20 segundos e o conector Kafka Sink liberar 50 MB em um período de 20 segundos, o serviço ingerirá um lote de 50 MB.
  • Você pode dimensionar adicionando instâncias e partições Kafka. Aumente tasks.max o número de partições. Crie uma partição se tiver dados suficientes para produzir um blob do tamanho da flush.size.bytes configuração. Se o blob for menor, o lote será processado quando atingir o limite de tempo, portanto, a partição não receberá taxa de transferência suficiente. Um grande número de partições significa mais sobrecarga de processamento.