Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Van toepassing op:
Databricks SQL
Databricks Runtime 13.3 LTS en hoger
Leest gegevens uit een Apache Kafka-cluster en retourneert de gegevens in tabelvorm.
Kan gegevens lezen uit een of meer Kafka-onderwerpen. Het ondersteunt zowel batchquery's als streaming-opname.
Syntaxis
read_kafka([option_key => option_value ] [, ...])
Argumenten
Voor deze functie is aanroepen van benoemde parameters vereist.
-
option_key: De naam van de optie die u wilt configureren. U moet backticks () for options that contain dots (.') gebruiken. -
option_value: Een constante expressie om de optie in te stellen. Accepteert letterlijke en scalaire functies.
Retouren
Records die zijn gelezen uit een Apache Kafka-cluster met het volgende schema:
-
key BINARY: De sleutel van het Kafka-record. -
value BINARY NOT NULL: De waarde van de Kafka-record. -
topic STRING NOT NULL: De naam van het Kafka-onderwerp waaruit de record is gelezen. -
partition INT NOT NULL: De ID van de Kafka-partitie waaruit het record is gelezen. -
offset BIGINT NOT NULL: Het offsetnummer van het record in het Kafka-systeemTopicPartition. -
timestamp TIMESTAMP NOT NULL: Een tijdstempelwaarde voor de record. In detimestampTypekolom wordt gedefinieerd waar deze tijdstempel op betrekking heeft. -
timestampType INTEGER NOT NULL: Het type tijdstempel dat is opgegeven in detimestampkolom. -
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Koptekstwaarden die zijn opgegeven als onderdeel van de record (indien ingeschakeld).
Voorbeelden
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opties
U vindt een gedetailleerde lijst met opties in de Apache Spark-documentatie.
Vereiste opties
Geef de onderstaande optie op om verbinding te maken met uw Kafka-cluster.
| Optie |
|---|
bootstrapServersTyp: StringEen door komma's gescheiden lijst met host-/poortparen die verwijzen naar kafka-cluster. Standaardwaarde: Geen |
Geef slechts een van de onderstaande opties op om te configureren uit welke Kafka-onderwerpen gegevens moeten worden opgehaald.
| Optie |
|---|
assignTyp: StringEen JSON-tekenreeks die de specifieke topic-partities bevat waaruit moet worden geconsumeerd. Voor '{"topicA":[0,1],"topicB":[2,4]}' worden bijvoorbeeld de 0e en 1e partities van topicA geconsumeerd.Standaardwaarde: Geen |
subscribeTyp: StringEen door komma's gescheiden lijst met Kafka-onderwerpen waaruit moet worden gelezen. Standaardwaarde: Geen |
subscribePatternTyp: StringEen reguliere expressie die overeenkomt met onderwerpen waarop u zich kunt abonneren. Standaardwaarde: Geen |
Diverse opties
read_kafka kan worden gebruikt in batch- en streamingquery's. Met de onderstaande opties kunt u opgeven op welk type query ze van toepassing zijn.
| Optie |
|---|
endingOffsetsType: String Querytype: alleen batchDe offsets die moeten worden gelezen tot voor een batchquery, "latest" om de meest recente records op te geven, of een JSON-tekenreeks die een eindoffset voor elke TopicPartition specificeert. In de JSON kan -1 als een offset worden gebruikt om te verwijzen naar het nieuwste.
-2 Het gebruik van (vroegst) als offset is niet toegestaan.Standaardwaarde: "latest" |
endingOffsetsByTimestampType: String Querytype: alleen batchEen JSON-tekenreeks die een eindtijdstempel specificeert om tot te lezen voor elke TopicPartition. De tijdstempels moeten worden opgegeven als een lange waarde van de tijdstempel in milliseconden sinds 1970-01-01 00:00:00 UTC, bijvoorbeeld.1686444353000. Zie hieronder informatie over het gedrag met tijdstempels.endingOffsetsByTimestamp heeft voorrang op endingOffsets.Standaardwaarde: Geen |
endingTimestampType: String Querytype: alleen batchEen tekenreekswaarde van de tijdstempel in milliseconden sinds 1970-01-01 00:00:00 UTCbijvoorbeeld "1686444353000". Als Kafka de overeenkomende offset niet retourneert, wordt de offset ingesteld op de meest recente. Zie hieronder informatie over het gedrag met tijdstempels. Opmerking: endingTimestamp heeft voorrang op endingOffsetsByTimestamp enendingOffsets.Standaardwaarde: Geen |
includeHeadersType: Boolean Querytype: streaming en batchOf u de Kafka-headers in de rij wilt opnemen. Standaardwaarde: false |
kafka.<consumer_option>Type: String Querytype: streaming en batchAlle specifieke opties voor Kafka-consumenten kunnen worden doorgegeven met het kafka. voorvoegsel. Deze opties moeten worden omgeven door backticks wanneer deze worden opgegeven, anders krijgt u een parserfout. U vindt de opties in de Kafka-documentatie.Opmerking: U moet de volgende opties niet instellen met deze functie: key.deserializer,value.deserializer,bootstrap.servers,group.idStandaardwaarde: Geen |
maxOffsetsPerTriggerType: Long Querytype: alleen streamingFrequentielimiet voor het maximum aantal offsets of rijen dat per triggerinterval wordt verwerkt. Het opgegeven totale aantal offsets wordt proportioneel verdeeld over TopicPartitions. Standaardwaarde: Geen |
startingOffsetsType: String Querytype: streaming en batchHet beginpunt waarop een query wordt gestart, "earliest" ofwel van de vroegste offsets, "latest" die alleen afkomstig is van de meest recente offsets, of een JSON-tekenreeks die een beginverschil voor elke TopicPartition opgeeft. In de JSON kan -2 als offset worden gebruikt om te verwijzen naar de vroegste, -1 naar de meest recente.Opmerking: voor batchquery's is de meest recente (impliciet of met behulp van -1 in JSON) niet toegestaan. Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streamingquery's worden voortgezet vanaf de offsets die zijn gedefinieerd in het querycontrolepunt. Nieuw ontdekte partities starten tijdens een query op het vroegst mogelijke moment. Standaardwaarde: "latest" voor streaming, "earliest" voor batch |
startingOffsetsByTimestampType: String Querytype: streaming en batchEen JSON-tekenreeks die een begintijdstempel voor elke TopicPartition opgeeft. De tijdstempels moeten worden opgegeven als een lange waarde van de tijdstempel in milliseconden sinds 1970-01-01 00:00:00 UTC, bijvoorbeeld 1686444353000. Zie hieronder informatie over het gedrag met tijdstempels. Als Kafka de overeenkomende offset niet retourneert, richt het gedrag zich op de waarde van de optie startingOffsetsByTimestampStrategy.startingOffsetsByTimestamp heeft voorrang op startingOffsets.Opmerking: Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streamingquery's worden voortgezet vanaf de offsets die zijn gedefinieerd in het querycontrolepunt. Nieuw ontdekte partities starten tijdens een query op het vroegst mogelijke moment. Standaardwaarde: Geen |
startingOffsetsByTimestampStrategyType: String Querytype: streaming en batchDeze strategie wordt gebruikt wanneer het opgegeven startoffset op basis van tijdstempel (hetzij globaal of per partitie) niet overeenstemt met de offset die Kafka retourneerde. De beschikbare strategieƫn zijn:
Standaardwaarde: "error" |
startingTimestampType: String Querytype: streaming en batchEen tekenreekswaarde van de tijdstempel in milliseconden sinds 1970-01-01 00:00:00 UTCbijvoorbeeld "1686444353000". Zie hieronder informatie over het gedrag met tijdstempels. Als Kafka de overeenkomende offset niet retourneert, richt het gedrag zich op de waarde van de optie startingOffsetsByTimestampStrategy.startingTimestamp heeft voorrang op startingOffsetsByTimestamp en startingOffsets.Opmerking: Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streamingquery's worden voortgezet vanaf de offsets die zijn gedefinieerd in het querycontrolepunt. Zojuist gedetecteerde partities tijdens een query worden vroegst gestart. Standaardwaarde: Geen |
Notitie
De geretourneerde offset voor elke partitie is de vroegste offset waarvan de tijdstempel groter is dan of gelijk is aan de opgegeven tijdstempel in de bijbehorende partitie. Het gedrag verschilt per optie als Kafka de overeenkomende offset niet retourneert. Controleer de beschrijving van elke optie.
Spark geeft de tijdstempelgegevens gewoon door aan KafkaConsumer.offsetsForTimesen interpreteert of redeneert niet over de waarde. Raadpleeg de KafkaConsumer.offsetsForTimes voor meer details over . Ook kan de betekenis van tijdstempel hier variƫren afhankelijk van de Kafka-configuratie (log.message.timestamp.type). Zie de Documentatie van Apache Kafka voor meer informatie.