Delen via


Gegevens uit Apache Kafka opnemen in Azure Data Explorer

Apache Kafka is een gedistribueerd streamingplatform voor het bouwen van pijplijnen voor realtime streaminggegevens waarmee gegevens betrouwbaar worden verplaatst tussen systemen of toepassingen. Kafka Connect is een hulpprogramma voor schaalbare en betrouwbare streaming van gegevens tussen Apache Kafka en andere gegevenssystemen. De Kusto Kafka-sink fungeert als de connector van Kafka en vereist geen code. Download het sink connector-jarbestand vanuit de Git-repo of Confluent Connector Hub.

In dit artikel wordt beschreven hoe u gegevens opneemt met Kafka, met behulp van een zelfstandige Docker-installatie om de installatie van het Kafka-cluster en het Kafka-connectorcluster te vereenvoudigen.

Zie de git-opslagplaats en versiedetails van de connector voor meer informatie.

Vereiste voorwaarden

Een Microsoft Entra-service-principal maken

De Microsoft Entra-service-principal kan worden gemaakt via Azure Portal of programmatisch, zoals in het volgende voorbeeld.

Deze service-principal is de identiteit die door de connector wordt gebruikt voor het schrijven van gegevens in uw tabel in Kusto. U verleent machtigingen aan deze service-principal om toegang te krijgen tot Kusto-resources.

  1. Meld u aan bij uw Azure-abonnement via Azure CLI. Verifieer vervolgens in de browser.

    az login
    
  2. Kies het abonnement om de principal te hosten. Deze stap is nodig wanneer u meerdere abonnementen hebt.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Maak de service-principal. In dit voorbeeld wordt de service-principal genoemd my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Kopieer de appId, password en tenant voor toekomstig gebruik vanuit de geretourneerde JSON-gegevens.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

U hebt uw Microsoft Entra-toepassing en service-principal gemaakt.

Een doelgericht tabel maken

  1. Maak vanuit uw queryomgeving een tabel met de naam Storms met behulp van de volgende opdracht:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Maak de bijbehorende tabeltoewijzing Storms_CSV_Mapping voor opgenomen gegevens met behulp van de volgende opdracht:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Maak een gegevensinvoerbatchbeleid in de tabel voor configureerbare wachtrijlatentie van gegevensinvoer.

    Aanbeveling

    Het opnamebatchbeleid is een optimalisatie voor prestaties en bevat drie parameters. De eerste voorwaarden die wordt voldaan, leidt tot opname in de tabel.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Gebruik de service-principal van Create a Microsoft Entra service principal om toestemming te verlenen voor het werken met de database.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Start het lab

Het volgende lab is ontworpen om u te laten beginnen met het maken van gegevens, het instellen van de Kafka-connector en het streamen van deze gegevens. Vervolgens kunt u de opgenomen gegevens bekijken.

De Git-opslagplaats klonen

Kloon de Git-opslagplaats van het lab.

  1. Maak een lokale map op uw computer.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Kloon de repository.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Inhoud van de gekloonde repository

Voer de volgende opdracht uit om de inhoud van de gekloonde opslagplaats weer te geven:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Dit resultaat van deze zoekopdracht is:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Controleer de bestanden in de gekloonde opslagplaats

In de volgende secties worden de belangrijke onderdelen van de bestanden in de bestandsstructuur uitgelegd.

adx-sink-config.json

Dit bestand bevat het kusto-sinkeigenschappenbestand waarin u specifieke configuratiedetails bijwerkt:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Vervang de waarden voor de volgende kenmerken op basis van uw installatie: aad.auth.authority, aad.auth.appid, , aad.auth.appkeykusto.tables.topics.mapping(de databasenaam), kusto.ingestion.urlen kusto.query.url.

Connector - Dockerfile

Dit bestand bevat de opdrachten om de Docker-image voor de connector-instantie te genereren. Het omvat de download van de connector vanuit de git-opslagplaats in de releases-map.

Map storm-events-producer

Deze map heeft een Go-programma dat een lokaal bestand 'StormEvents.csv' leest en de gegevens publiceert naar een Kafka-onderwerp.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

De containers starten

  1. Start de containers in een terminal:

    docker-compose up
    

    De producenttoepassing begint met het verzenden van gebeurtenissen naar het storm-events onderwerp. U ziet logboeken die vergelijkbaar zijn met de volgende logboeken:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Voer de volgende opdracht uit in een afzonderlijke terminal om de logboeken te controleren:

    docker-compose logs -f | grep kusto-connect
    

De connector starten

Gebruik een Kafka Connect REST-aanroep om de connector te starten.

  1. Start in een afzonderlijke terminal de sink-taak met de volgende opdracht:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Voer de volgende opdracht uit in een afzonderlijke terminal om de status te controleren:

    curl http://localhost:8083/connectors/storm/status
    

De connector begint met het in wachtrij plaatsen van opnameprocessen.

Opmerking

Als u problemen hebt met de logboekconnector, maak een melding.

Beheerde identiteit

De Kafka-connector maakt standaard gebruik van de toepassingsmethode voor verificatie tijdens de opname. Verifiëren met beheerde identiteit:

  1. Wijs uw cluster een beheerde identiteit toe en verleent uw opslagaccount leesrechten. Zie Gegevens opnemen met behulp van verificatie van beheerde identiteiten voor meer informatie.

  2. Stel aad.auth.strategy in op managed_identity in uw adx-sink-config.json bestand en zorg ervoor dat aad.auth.appid is ingesteld op de client-id (toepassings-id) van de beheerde identiteit.

  3. Gebruik een token voor metagegevens van een privé-exemplaar in plaats van de Microsoft Entra-service-principal.

Opmerking

Wanneer u een beheerde identiteit gebruikt, worden appId en tenant afgeleid uit de context van de oproepsite en is password niet nodig.

Gegevens opvragen en controleren

Gegevensopname bevestigen

  1. Zodra de gegevens in de Storms tabel zijn aangekomen, bevestigt u de overdracht van gegevens door het aantal rijen te controleren:

    Storms 
    | count
    
  2. Controleer of er geen fouten zijn in het opnameproces:

    .show ingestion failures
    

    Zodra u gegevens ziet, kunt u een paar query's uitproberen.

Query’s uitvoeren voor de gegevens

  1. Als u alle records wilt zien, voert u de volgende query uit:

    Storms
    | take 10
    
  2. Gebruik where en project om specifieke gegevens te filteren.

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. Gebruik de summarize operator:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Schermopname van de resultaten van verbonden Kafka-querykolommen.

Zie de documentatie over KQL - en Kusto-querytaal voor meer queryvoorbeelden en richtlijnen.

Opnieuw instellen

Ga als volgt te werk om het opnieuw in te stellen:

  1. De containers stoppen (docker-compose down -v)
  2. Verwijderen (drop table Storms)
  3. De Storms tabel opnieuw maken
  4. Tabeltoewijzing opnieuw maken
  5. Containers opnieuw opstarten (docker-compose up)

De hulpbronnen opschonen

Als u de Azure Data Explorer-resources wilt verwijderen, gebruikt u az kusto cluster delete (kusto extension) of az kusto database delete (kusto extension):

az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"

U kunt uw cluster en database ook verwijderen via Azure Portal. Zie Een Azure Data Explorer-cluster verwijderen en een database verwijderen in Azure Data Explorer voor meer informatie.

Het afstemmen van de Kafka Sink-connector

Configureer de Kafka Sink-connector om te werken met het verzamelbeleid voor gegevensverwerking:

  • Stel de limiet voor de Kafka-sink flush.size.bytes in vanaf 1 MB, waarbij de grootte toeneemt met stappen van 10 MB of 100 MB.
  • Wanneer u Kafka Sink gebruikt, worden gegevens twee keer samengevoegd. Gegevens aan de connectorzijde worden samengevoegd volgens de flush-instellingen en aan de servicezijde volgens het batchverwerkingsbeleid. Als de batchtijd te kort is, zodat gegevens niet kunnen worden opgenomen door zowel de connector als de service, moet de batchtijd worden verhoogd. Stel de batchgrootte in op 1 GB en verhoog of verklein indien nodig met 100 MB stappen. Als de grootte voor flushen bijvoorbeeld 1 MB is en de batchbeleidsgrootte 100 MB is, worden met de Kafka Sink-connector gegevens samengevoegd in een batch van 100 MB. Die batch wordt vervolgens door de service opgenomen. Als de batchverwerkingsbeleidstijd 20 seconden is en de Kafka Sink-connector 50 MB in een periode van 20 seconden leeg maakt, neemt de service een batch van 50 MB op.
  • U kunt schalen door exemplaren en Kafka-partities toe te voegen. Verhoog tasks.max het aantal partities. Maak een partitie als u voldoende gegevens hebt om een blob de grootte van de flush.size.bytes instelling te produceren. Als de blob kleiner is, wordt de batch verwerkt wanneer deze de tijdslimiet bereikt, zodat de partitie niet voldoende doorvoer ontvangt. Een groot aantal partities betekent meer verwerkingsoverhead.