Freigeben über


read_kafka-Tabellenwertfunktion

Gilt für:Häkchen gesetzt ja Databricks SQL Häkchen gesetzt ja Databricks Runtime 13.3 LTS und höher

Liest Daten aus einem Apache Kafka-Cluster und gibt die Daten in tabellarischer Form zurück.

Kann Daten aus einem oder mehreren Kafka-Themen lesen. Es unterstützt sowohl Batchabfragen als auch Streamingerfassung.

Syntax

read_kafka([option_key => option_value ] [, ...])

Argumente

Diese Funktion erfordert einen Aufruf benannter Parameter für die Optionsschlüssel.

  • option_key: Der Name der zu konfigurierenden Option. Sie müssen Backticks () for options that contain dots (.') verwenden.
  • option_value: Ein konstanter Ausdruck zum Festlegen der Option. Akzeptiert Literale und Skalarfunktionen.

Gibt zurück

Datensätze, die aus einem Apache Kafka-Cluster gelesen werden, mit dem folgenden Schema:

  • key BINARY: Der Schlüssel des Kafka-Datensatzes.
  • value BINARY NOT NULL: Der Wert des Kafka-Datensatzes.
  • topic STRING NOT NULL: Der Name des Kafka-Themas, aus dem der Datensatz gelesen wird.
  • partition INT NOT NULL: Die ID der Kafka-Partition, aus der der Datensatz gelesen wird.
  • offset BIGINT NOT NULL: Die Offsetnummer des Datensatzes im Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: Ein Zeitstempelwert für den Datensatz. Die Spalte timestampType definiert, was dieser Zeitstempel entspricht.
  • timestampType INTEGER NOT NULL: Der Typ des in der Spalte timestamp angegebenen Zeitstempels.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Headerwerte, die als Teil des Datensatzes bereitgestellt werden (sofern aktiviert).

Beispiele

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Optionen

Eine ausführliche Liste der Optionen finden Sie in der Apache Spark-Dokumentation.

Erforderliche Optionen

Geben Sie die folgende Option an, um eine Verbindung mit Ihrem Kafka-Cluster herzustellen.

Auswahlmöglichkeit
bootstrapServers
Typ: String
Eine durch Trennzeichen getrennte Liste von Host/Port-Paaren, die auf den Kafka-Cluster verweisen.
Standardwert: None

Stellen Sie nur eine der folgenden Optionen bereit, um zu konfigurieren, aus welchen Kafka-Themen Daten abgerufen werden sollen.

Auswahlmöglichkeit
assign
Typ: String
Eine JSON-Zeichenfolge, die die spezifischen Themenpartitionen enthält, von denen konsumiert werden soll. Zum Beispiel werden die 0. und 1. Partitionen von ThemaA verwendet.
Standardwert: None
subscribe
Typ: String
Eine durch Kommas getrennte Liste von Kafka-Themen, aus der gelesen werden soll.
Standardwert: None
subscribePattern
Typ: String
Ein regulärer Ausdruck, der themengleich ist, die abonniert werden sollen.
Standardwert: None

Sonstige Optionen

read_kafka kann in Batchabfragen und in Streamingabfragen verwendet werden. Die folgenden Optionen geben an, auf welchen Abfragetyp sie angewendet werden.

Auswahlmöglichkeit
endingOffsets
Typ: String-Abfragetyp: nur Batch
Die Offsets, die für eine Batchabfrage gelesen werden sollen, entweder "latest", um die neuesten Datensätze anzugeben, oder eine JSON-Zeichenfolge, die einen endenden Offset für jede TopicPartition angibt. In der JSON-Zeichenfolge kann mit dem Offset -1 auf den spätestmöglichen Offset verwiesen werden. -2 (frühestens), da ein Offset nicht zulässig ist.
Standardwert: "latest"
endingOffsetsByTimestamp
Typ: String-Abfragetyp: nur Batch
Eine JSON-Zeichenfolge, die einen Endzeitstempel angibt, der für jede TopicPartition gelesen werden soll. Die Zeitstempel müssen als langer Wert des Zeitstempels in Millisekunden angegeben werden, z. B. 1970-01-01 00:00:00 UTC
1686444353000. Weitere Informationen zum Verhalten mit Zeitstempeln finden Sie weiter unten.
endingOffsetsByTimestamp hat Vorrang vor endingOffsets.
Standardwert: None
endingTimestamp
Typ: String-Abfragetyp: nur Batch
Ein Zeichenfolgenwert des Zeitstempels in Millisekunden seit
1970-01-01 00:00:00 UTC, z. B. "1686444353000". Wenn Kafka den abgeglichenen Offset nicht zurückgibt, wird der Offset auf den neuesten Stand festgelegt. Weitere Informationen zum Verhalten mit Zeitstempeln finden Sie weiter unten. Hinweis: endingTimestamp hat Vorrang vor endingOffsetsByTimestamp und
endingOffsets.
Standardwert: None
includeHeaders
Typ: Boolean-Abfragetyp: Streaming und Batch
Gibt an, ob die Kafka-Header in die Zeile eingeschlossen werden sollen.
Standardwert: false
kafka.<consumer_option>
Typ: String-Abfragetyp: Streaming und Batch
Alle verbraucherspezifischen Kafka-Optionen können mit dem Präfix kafka. übergeben werden. Diese Optionen müssen bei der Bereitstellung von Backticks umgeben sein, andernfalls erhalten Sie einen Parserfehler. Die Optionen finden Sie in der Kafka-Dokumentation.
Hinweis: Sie sollten die folgenden Optionen nicht mit dieser Funktion festlegen:
key.deserializer value.deserializer bootstrap.servers group.id
Standardwert: None
maxOffsetsPerTrigger
Typ: Long-Abfragetyp: nur Streaming
Ratenbegrenzung für die maximale Anzahl von Offsets oder Zeilen, die pro Triggerintervall verarbeitet werden. Die angegebene Gesamtanzahl von Offests wird proportional auf TopicPartitions aufgeteilt.
Standardwert: None
startingOffsets
Typ: String-Abfragetyp: Streaming und Batch
Der Startpunkt einer Abfrage – entweder "earliest" (also ab den frühestmöglichen Offsets), "latest" (also ob den spätestmöglichen Offsets) oder eine JSON-Zeichenfolge, die einen Startversatz für jede Themenpartition (TopicPartition) angibt. In der JSON-Zeichenfolge kann mit dem Offset -2 auf den frühestmöglichen und mit -1 auf den spätestmöglichen Offset verwiesen werden.
Hinweis: Bei Batchabfragen ist „latest“ nicht zulässig (weder implizit noch in Form von „-1“ in JSON). Für Streamingabfragen gilt dies nur, wenn eine neue Abfrage gestartet wird. Neu gestartete Streamingabfragen werden von den im Abfrageprüfpunkt definierten Offsets fortgesetzt. Bei einer Abfrage neu entdeckte Partitionen beginnen beim frühestmöglichen Startpunkt.
Standardwert: "latest" für Streaming, "earliest" für Batch
startingOffsetsByTimestamp
Typ: String-Abfragetyp: Streaming und Batch
Eine JSON-Zeichenfolge, die einen Startzeitstempel für jede TopicPartition angibt. Die Zeitstempel müssen als langer Wert des Zeitstempels in Millisekunden seit 1970-01-01 00:00:00 UTC angegeben werden, z. B. 1686444353000 Weitere Informationen zum Verhalten mit Zeitstempeln finden Sie weiter unten. Wenn Kafka den abgeglichenen Offset nicht zurückgibt, folgt das Verhalten dem Wert der Option startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp hat Vorrang vor startingOffsets.
Hinweis: Für Streamingabfragen gilt dies nur, wenn eine neue Abfrage gestartet wird. Neu gestartete Streamingabfragen werden von den im Abfrageprüfpunkt definierten Offsets fortgesetzt. Bei einer Abfrage neu entdeckte Partitionen beginnen beim frühestmöglichen Startpunkt.
Standardwert: None
startingOffsetsByTimestampStrategy
Typ: String-Abfragetyp: Streaming und Batch
Diese Strategie wird verwendet, wenn der angegebene Anfangsoffset gemäß Zeitstempel (entweder global oder pro Partition) nicht mit dem zurückgegebenen Offset von Kafka übereinstimmt. Die verfügbaren Strategien sind:
  • "error": Schlägt die Abfrage fehl
  • "latest": weist den neuesten Offset für diese Partitionen zu, damit Spark neuere Datensätze aus diesen Partitionen in späteren Mikrobatches lesen kann.

Standardwert: "error"
startingTimestamp
Typ: String-Abfragetyp: Streaming und Batch
Ein Zeichenfolgenwert des Zeitstempels in Millisekunden seit
1970-01-01 00:00:00 UTC, z. B. "1686444353000". Weitere Informationen zum Verhalten mit Zeitstempeln finden Sie weiter unten. Wenn Kafka den abgeglichenen Offset nicht zurückgibt, folgt das Verhalten dem Wert der Option startingOffsetsByTimestampStrategy.
startingTimestamp hat Vorrang vor startingOffsetsByTimestamp und startingOffsets.
Hinweis: Für Streamingabfragen gilt dies nur, wenn eine neue Abfrage gestartet wird. Neu gestartete Streamingabfragen werden von den im Abfrageprüfpunkt definierten Offsets fortgesetzt. Bei einer Abfrage neu entdeckte Partitionen beginnen am frühestmöglichen Startpunkt.
Standardwert: None

Hinweis

Der zurückgegebene Offset für jede Partition ist der früheste Offset, dessen Zeitstempel größer oder gleich dem angegebenen Zeitstempel in der entsprechenden Partition ist. Das Verhalten variiert je nach Optionen, wenn Kafka den abgeglichenen Offset nicht zurückgibt. Überprüfen Sie die Beschreibung der einzelnen Optionen.

Spark übergibt einfach die Zeitstempelinformationen an KafkaConsumer.offsetsForTimesund interpretiert oder begründet den Wert nicht. Weitere Informationen zu KafkaConsumer.offsetsForTimes finden Sie in der Dokumentation. Auch die Bedeutung von Zeitstempel kann hier je nach Kafka-Konfiguration (log.message.timestamp.type) variieren. Weitere Informationen finden Sie in der Apache Kafka-Dokumentation.