Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Dotyczy:
Databricks SQL
Databricks Runtime 13.3 LTS i nowsze
Odczytuje dane z klastra platformy Apache Kafka i zwraca dane w postaci tabelarycznej.
Może odczytywać dane z co najmniej jednego tematu platformy Kafka. Obsługuje zarówno zapytania wsadowe, jak i przetwarzanie strumieniowe.
Składnia
read_kafka([option_key => option_value ] [, ...])
Argumenty
Ta funkcja wymaga wywołania nazwanego parametru.
-
option_key: nazwa opcji do skonfigurowania. Należy użyć backticks () for options that contain dots (.'). -
option_value: wyrażenie stałe do ustawienia opcji. Akceptuje literały i funkcje skalarne.
Zwraca
Rekordy odczytane z klastra platformy Apache Kafka z następującym schematem:
-
key BINARY: Klucz rekordu Kafka. -
value BINARY NOT NULL: wartość rekordu Kafka. -
topic STRING NOT NULL: nazwa tematu w Kafka, z którego jest odczytywany rekord. -
partition INT NOT NULL: Identyfikator partycji Kafka, z której jest odczytywany rekord. -
offset BIGINT NOT NULL: numer offsetu rekordu w KafkaTopicPartition. -
timestamp TIMESTAMP NOT NULL: wartość znacznika czasu dla rekordu. KolumnatimestampTypedefiniuje, co odpowiada znacznikowi czasu. -
timestampType INTEGER NOT NULL: typ znacznika czasu określonego w kolumnietimestamp. -
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: wartości nagłówka podane jako część rekordu (jeśli jest włączone).
Przykłady
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opcje
Szczegółową listę opcji można znaleźć w dokumentacji platformy Apache Spark .
Wymagane opcje
Podaj poniższą opcję nawiązywania połączenia z klastrem platformy Kafka.
| Opcja |
|---|
bootstrapServersTyp: StringRozdzielona przecinkami lista par hostów/portów wskazująca klaster platformy Kafka. Wartość domyślna: Brak |
Podaj tylko jedną z poniższych opcji, aby skonfigurować tematy platformy Kafka do ściągania danych.
| Opcja |
|---|
assignTyp: StringCiąg JSON zawierający określone partycje tematu, z których mają być konsumowane. Na przykład, w przypadku '{"topicA":[0,1],"topicB":[2,4]}', partycje 0 i 1 tematu "topicA" będą konsumowane.Wartość domyślna: Brak |
subscribeTyp: StringRozdzielona przecinkami lista tematów Kafka do odczytu. Wartość domyślna: Brak |
subscribePatternTyp: StringWyrażenie regularne pasujące do tematów do subskrybowania. Wartość domyślna: Brak |
Różne opcje
read_kafka można używać w zapytaniach wsadowych i w zapytaniach przesyłanych strumieniowo. Poniższe opcje określają, do którego typu zapytania mają zastosowanie.
| Opcja |
|---|
endingOffsetsTyp zapytania: String tylko zbiorczyPrzesunięcia do odczytu w zapytaniu wsadowym, albo "latest", aby wybrać najnowsze rekordy, albo ciąg JSON określający przesunięcie końcowe dla każdego TopicPartition. W formacie JSON -1 jako przesunięcie może służyć do odwoływania się do najnowszej wersji.
-2 (najwcześniejsze) jako przesunięcie jest niedozwolone.Wartość domyślna: "latest" |
endingOffsetsByTimestampTyp zapytania: String tylko zbiorczyCiąg JSON określający znacznik czasu zakończenia do odczytania do każdego elementu TopicPartition. Znaczniki czasu muszą być podane jako długa wartość znacznika czasu w milisekundach od 1970-01-01 00:00:00 UTC, na przykład1686444353000. Zobacz uwagę poniżej na temat szczegółów dotyczących zachowania ze znacznikami czasu.endingOffsetsByTimestamp ma pierwszeństwo przed endingOffsets.Wartość domyślna: Brak |
endingTimestampTyp zapytania: String tylko zbiorczyWartość ciągu znacznika czasu w milisekundach od 1970-01-01 00:00:00 UTC, na przykład "1686444353000". Jeśli Kafka nie zwróci dopasowanego offsetu, offset zostanie ustawiony na najnowszy. Zobacz uwagę poniżej na temat szczegółów dotyczących zachowania ze znacznikami czasu. Uwaga: endingTimestamp ma pierwszeństwo przed endingOffsetsByTimestamp iendingOffsets.Wartość domyślna: Brak |
includeHeadersTyp: Boolean Typ zapytania: przesyłanie strumieniowe i wsadoweOkreślenie, czy uwzględnić nagłówki Kafka w wierszu. Wartość domyślna: false |
kafka.<consumer_option>Typ: String Typ zapytania: przesyłanie strumieniowe i wsadoweWszystkie opcje specyficzne dla konsumentów platformy Kafka można przekazać za pomocą prefiksu kafka. . Te opcje muszą być otoczone apostrofami odwrotnymi, w przeciwnym razie pojawi się błąd parsera. Opcje można znaleźć w dokumentacji platformy Kafka.Uwaga: nie należy ustawiać następujących opcji za pomocą tej funkcji: key.deserializer, , value.deserializer, , bootstrap.serversgroup.idWartość domyślna: Brak |
maxOffsetsPerTriggerTyp: Long Typ zapytania: tylko przesyłanie strumienioweLimit liczby przesunięć lub wierszy przetworzonych w każdym interwale wyzwalania. Określona łączna liczba przesunięć zostanie proporcjonalnie podzielona między elementy TopicPartitions. Wartość domyślna: Brak |
startingOffsetsTyp: String Typ zapytania: przesyłanie strumieniowe i wsadowePunkt początkowy, kiedy zapytanie jest uruchamiane: "earliest", czyli od pierwszych przesunięć, "latest", jako od najnowszych przesunięć lub ciąg JSON określający przesunięcie początkowe dla każdej partycji tematycznej. W formacie JSON -2 jako punkt odniesienia może służyć do odwoływania się do najwcześniejszego zapisu, a -1 do najnowszego.Uwaga: w przypadku zapytań wsadowych najnowsza wersja (niejawnie lub przy użyciu -1 w formacie JSON) jest niedozwolona. W przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie. Ponownie uruchomione zapytania strumieniowe będą kontynuowane od przesunięć zdefiniowanych w punkcie kontrolnym zapytania. Nowo odnalezione partycje podczas zapytania będą uruchamiane najwcześniej. Wartość domyślna: "latest" dla przesyłania strumieniowego dla "earliest" partii |
startingOffsetsByTimestampTyp: String Typ zapytania: przesyłanie strumieniowe i wsadoweCiąg JSON określający znacznik czasu rozpoczęcia dla każdego elementu TopicPartition. Znaczniki czasu muszą być podane jako długa wartość znacznika czasu w milisekundach od 1970-01-01 00:00:00 UTC, na przykład 1686444353000. Zobacz uwagę poniżej na temat szczegółów dotyczących zachowania ze znacznikami czasu. Jeśli platforma Kafka nie zwróci dopasowanego przesunięcia, zachowanie będzie zgodne z wartością opcji startingOffsetsByTimestampStrategy.startingOffsetsByTimestamp ma pierwszeństwo przed startingOffsets.Uwaga: w przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie. Ponownie uruchomione zapytania strumieniowe będą kontynuowane od przesunięć zdefiniowanych w punkcie kontrolnym zapytania. Nowo odnalezione partycje podczas zapytania będą uruchamiane najwcześniej. Wartość domyślna: Brak |
startingOffsetsByTimestampStrategyTyp: String Typ zapytania: przesyłanie strumieniowe i wsadoweTa strategia jest używana, gdy określone początkowe przesunięcie według znacznika czasu (globalne lub dla partycji) nie jest zgodne ze zwróconym przesunięciem przez Kafka. Dostępne strategie to:
Wartość domyślna: "error" |
startingTimestampTyp: String Typ zapytania: przesyłanie strumieniowe i wsadoweWartość ciągu znacznika czasu w milisekundach od 1970-01-01 00:00:00 UTC, na przykład "1686444353000". Zobacz uwagę poniżej na temat szczegółów dotyczących zachowania ze znacznikami czasu. Jeśli platforma Kafka nie zwróci dopasowanego przesunięcia, zachowanie będzie zgodne z wartością opcji startingOffsetsByTimestampStrategy.startingTimestamp ma pierwszeństwo przed startingOffsetsByTimestamp i startingOffsets.Uwaga: w przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie. Ponownie uruchomione zapytania strumieniowe będą kontynuowane od przesunięć zdefiniowanych w punkcie kontrolnym zapytania. Nowo odnalezione partycje podczas zapytania będą uruchamiane najwcześniej. Wartość domyślna: Brak |
Uwaga
Zwrócone przesunięcie dla każdej partycji to najwcześniejsze przesunięcie, którego znacznik czasu jest większy lub równy podanemu znacznikowi czasu w odpowiedniej partycji. Zachowanie różni się w zależności od opcji, jeśli Kafka nie zwraca dopasowanego przesunięcia — zapoznaj się z opisem każdej opcji.
Platforma Spark po prostu przekazuje informacje znacznika czasu do elementu KafkaConsumer.offsetsForTimes, i nie interpretuje ani nie analizuje wartości. Aby uzyskać więcej informacji na temat KafkaConsumer.offsetsForTimes, zapoznaj się z dokumentacją. Ponadto znaczenie znacznika czasu w tym miejscu może się różnić w zależności od konfiguracji platformy Kafka (log.message.timestamp.type). Aby uzyskać szczegółowe informacje, zobacz dokumentację platformy Apache Kafka.