Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Apache Kafka is een gedistribueerd streamingplatform voor het bouwen van pijplijnen voor realtime streaminggegevens waarmee gegevens betrouwbaar worden verplaatst tussen systemen of toepassingen. Kafka Connect is een hulpprogramma voor schaalbare en betrouwbare streaming van gegevens tussen Apache Kafka en andere gegevenssystemen. De Kusto Kafka-sink fungeert als de connector van Kafka en vereist geen code. Download het sink connector-jarbestand vanuit de Git-repo of Confluent Connector Hub.
In dit artikel wordt beschreven hoe u gegevens opneemt met Kafka, met behulp van een zelfstandige Docker-installatie om de installatie van het Kafka-cluster en het Kafka-connectorcluster te vereenvoudigen.
Zie de git-opslagplaats en versiedetails van de connector voor meer informatie.
Vereiste voorwaarden
- Een Azure-abonnement. Maak een gratis Azure-account.
- Een Azure Data Explorer-cluster en -database met het standaardcache- en bewaarbeleid.
- Azure CLI.
- Docker en Docker Compose.
Een Microsoft Entra-service-principal maken
De Microsoft Entra-service-principal kan worden gemaakt via Azure Portal of programmatisch, zoals in het volgende voorbeeld.
Deze service-principal is de identiteit die door de connector wordt gebruikt voor het schrijven van gegevens in uw tabel in Kusto. U verleent machtigingen aan deze service-principal om toegang te krijgen tot Kusto-resources.
Meld u aan bij uw Azure-abonnement via Azure CLI. Verifieer vervolgens in de browser.
az loginKies het abonnement om de principal te hosten. Deze stap is nodig wanneer u meerdere abonnementen hebt.
az account set --subscription YOUR_SUBSCRIPTION_GUIDMaak de service-principal. In dit voorbeeld wordt de service-principal genoemd
my-service-principal.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}Kopieer de
appId,passwordentenantvoor toekomstig gebruik vanuit de geretourneerde JSON-gegevens.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
U hebt uw Microsoft Entra-toepassing en service-principal gemaakt.
Een doelgericht tabel maken
Maak vanuit uw queryomgeving een tabel met de naam
Stormsmet behulp van de volgende opdracht:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)Maak de bijbehorende tabeltoewijzing
Storms_CSV_Mappingvoor opgenomen gegevens met behulp van de volgende opdracht:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'Maak een gegevensinvoerbatchbeleid in de tabel voor configureerbare wachtrijlatentie van gegevensinvoer.
Aanbeveling
Het opnamebatchbeleid is een optimalisatie voor prestaties en bevat drie parameters. De eerste voorwaarden die wordt voldaan, leidt tot opname in de tabel.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'Gebruik de service-principal van Create a Microsoft Entra service principal om toestemming te verlenen voor het werken met de database.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Start het lab
Het volgende lab is ontworpen om u te laten beginnen met het maken van gegevens, het instellen van de Kafka-connector en het streamen van deze gegevens. Vervolgens kunt u de opgenomen gegevens bekijken.
De Git-opslagplaats klonen
Kloon de Git-opslagplaats van het lab.
Maak een lokale map op uw computer.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-holKloon de repository.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Inhoud van de gekloonde repository
Voer de volgende opdracht uit om de inhoud van de gekloonde opslagplaats weer te geven:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Dit resultaat van deze zoekopdracht is:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Controleer de bestanden in de gekloonde opslagplaats
In de volgende secties worden de belangrijke onderdelen van de bestanden in de bestandsstructuur uitgelegd.
adx-sink-config.json
Dit bestand bevat het kusto-sinkeigenschappenbestand waarin u specifieke configuratiedetails bijwerkt:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Vervang de waarden voor de volgende kenmerken op basis van uw installatie: aad.auth.authority, aad.auth.appid, , aad.auth.appkeykusto.tables.topics.mapping(de databasenaam), kusto.ingestion.urlen kusto.query.url.
Connector - Dockerfile
Dit bestand bevat de opdrachten om de Docker-image voor de connector-instantie te genereren. Het omvat de download van de connector vanuit de git-opslagplaats in de releases-map.
Map storm-events-producer
Deze map heeft een Go-programma dat een lokaal bestand 'StormEvents.csv' leest en de gegevens publiceert naar een Kafka-onderwerp.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
De containers starten
Start de containers in een terminal:
docker-compose upDe producenttoepassing begint met het verzenden van gebeurtenissen naar het
storm-eventsonderwerp. U ziet logboeken die vergelijkbaar zijn met de volgende logboeken:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....Voer de volgende opdracht uit in een afzonderlijke terminal om de logboeken te controleren:
docker-compose logs -f | grep kusto-connect
De connector starten
Gebruik een Kafka Connect REST-aanroep om de connector te starten.
Start in een afzonderlijke terminal de sink-taak met de volgende opdracht:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectorsVoer de volgende opdracht uit in een afzonderlijke terminal om de status te controleren:
curl http://localhost:8083/connectors/storm/status
De connector begint met het in wachtrij plaatsen van opnameprocessen.
Opmerking
Als u problemen hebt met de logboekconnector, maak een melding.
Beheerde identiteit
De Kafka-connector maakt standaard gebruik van de toepassingsmethode voor verificatie tijdens de opname. Verifiëren met beheerde identiteit:
Wijs uw cluster een beheerde identiteit toe en verleent uw opslagaccount leesrechten. Zie Gegevens opnemen met behulp van verificatie van beheerde identiteiten voor meer informatie.
Stel
aad.auth.strategyin opmanaged_identityin uw adx-sink-config.json bestand en zorg ervoor dataad.auth.appidis ingesteld op de client-id (toepassings-id) van de beheerde identiteit.Gebruik een token voor metagegevens van een privé-exemplaar in plaats van de Microsoft Entra-service-principal.
Opmerking
Wanneer u een beheerde identiteit gebruikt, worden appId en tenant afgeleid uit de context van de oproepsite en is password niet nodig.
Gegevens opvragen en controleren
Gegevensopname bevestigen
Zodra de gegevens in de
Stormstabel zijn aangekomen, bevestigt u de overdracht van gegevens door het aantal rijen te controleren:Storms | countControleer of er geen fouten zijn in het opnameproces:
.show ingestion failuresZodra u gegevens ziet, kunt u een paar query's uitproberen.
Query’s uitvoeren voor de gegevens
Als u alle records wilt zien, voert u de volgende query uit:
Storms | take 10Gebruik
whereenprojectom specifieke gegevens te filteren.Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventIdGebruik de
summarizeoperator:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Zie de documentatie over KQL - en Kusto-querytaal voor meer queryvoorbeelden en richtlijnen.
Opnieuw instellen
Ga als volgt te werk om het opnieuw in te stellen:
- De containers stoppen (
docker-compose down -v) - Verwijderen (
drop table Storms) - De
Stormstabel opnieuw maken - Tabeltoewijzing opnieuw maken
- Containers opnieuw opstarten (
docker-compose up)
De hulpbronnen opschonen
Als u de Azure Data Explorer-resources wilt verwijderen, gebruikt u az kusto cluster delete (kusto extension) of az kusto database delete (kusto extension):
az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"
U kunt uw cluster en database ook verwijderen via Azure Portal. Zie Een Azure Data Explorer-cluster verwijderen en een database verwijderen in Azure Data Explorer voor meer informatie.
Het afstemmen van de Kafka Sink-connector
Configureer de Kafka Sink-connector om te werken met het verzamelbeleid voor gegevensverwerking:
- Stel de limiet voor de Kafka-sink
flush.size.bytesin vanaf 1 MB, waarbij de grootte toeneemt met stappen van 10 MB of 100 MB. - Wanneer u Kafka Sink gebruikt, worden gegevens twee keer samengevoegd. Gegevens aan de connectorzijde worden samengevoegd volgens de flush-instellingen en aan de servicezijde volgens het batchverwerkingsbeleid. Als de batchtijd te kort is, zodat gegevens niet kunnen worden opgenomen door zowel de connector als de service, moet de batchtijd worden verhoogd. Stel de batchgrootte in op 1 GB en verhoog of verklein indien nodig met 100 MB stappen. Als de grootte voor flushen bijvoorbeeld 1 MB is en de batchbeleidsgrootte 100 MB is, worden met de Kafka Sink-connector gegevens samengevoegd in een batch van 100 MB. Die batch wordt vervolgens door de service opgenomen. Als de batchverwerkingsbeleidstijd 20 seconden is en de Kafka Sink-connector 50 MB in een periode van 20 seconden leeg maakt, neemt de service een batch van 50 MB op.
- U kunt schalen door exemplaren en Kafka-partities toe te voegen. Verhoog
tasks.maxhet aantal partities. Maak een partitie als u voldoende gegevens hebt om een blob de grootte van deflush.size.bytesinstelling te produceren. Als de blob kleiner is, wordt de batch verwerkt wanneer deze de tijdslimiet bereikt, zodat de partitie niet voldoende doorvoer ontvangt. Een groot aantal partities betekent meer verwerkingsoverhead.
Verwante inhoud
- Meer informatie over big data-architectuur.
- Meer informatie over het opnemen van voorbeeldgegevens in JSON-indeling in Azure Data Explorer.
- Meer informatie met Kafka-labs: