Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Função com valor de tabela
Aplica-se a:
SQL do Databricks
Databricks Runtime 13.3 LTS e versões posteriores
Lê dados de um cluster do Apache Kafka e retorna os dados em formato tabular.
Pode ler dados de um ou mais tópicos do Kafka. É compatível com consultas em lote e ingestão de streaming.
Sintaxe
read_kafka([option_key => option_value ] [, ...])
Argumentos
Essa função requer invocação de parâmetro nomeada.
-
option_key: o nome da opção a ser configurada. Você deve usar "backticks" (`) for options that contain dots (.`). -
option_value: uma expressão constante para definir a opção. Aceita literais e funções escalares.
Retornos
Registros lidos de um cluster do Apache Kafka com o seguinte esquema:
-
key BINARY: a chave do registro Kafka. -
value BINARY NOT NULL: o valor do registro Kafka. -
topic STRING NOT NULL: o nome do tópico Kafka do qual o registro é lido. -
partition INT NOT NULL: a ID da partição Kafka da qual o registro é lido. -
offset BIGINT NOT NULL: o número de deslocamento do registro no KafkaTopicPartition. -
timestamp TIMESTAMP NOT NULL: um valor de carimbo de data/hora para o registro. A colunatimestampTypedefine ao que esse carimbo de data/hora corresponde. -
timestampType INTEGER NOT NULL: o tipo do carimbo de data/hora especificado na colunatimestamp. -
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: valores de cabeçalho fornecidos como parte do registro (se habilitado).
Exemplos
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opções
Você pode encontrar uma lista detalhada de opções na Documentação do Apache Spark.
Opções necessárias
Forneça a opção abaixo para se conectar ao cluster do Kafka.
| Opção |
|---|
bootstrapServersDigite: StringUma lista separada por vírgulas de pares de host/porta apontando para o cluster do Kafka. Valor padrão: nenhum |
Forneça apenas uma das opções abaixo para configurar quais tópicos do Kafka serão usados para efetuar pull de dados.
| Opção |
|---|
assignDigite: StringUma cadeia de caracteres JSON que contém as partições de tópico específicas a serem consumidas. Por exemplo, para '{"topicA":[0,1],"topicB":[2,4]}', as partições 0 e 1 do topicA serão consumidas a partir de.Valor padrão: nenhum |
subscribeDigite: StringUma lista separada por vírgulas de tópicos do Kafka para leitura. Valor padrão: nenhum |
subscribePatternDigite: StringUma expressão regular que corresponde a tópicos a serem assinados. Valor padrão: nenhum |
Opções diversas
read_kafka podem ser usados nas consultas em lote e nas consultas de streaming. As opções a seguir especificam a que tipo de consulta elas se aplicam.
| Opção |
|---|
endingOffsetsTipo: String Tipo de Consulta: somente loteOs deslocamentos a serem lidos até para uma consulta em lote, seja "latest" para especificar os registros mais recentes ou uma cadeia de caracteres JSON especificando um deslocamento final para cada TopicPartition. No JSON, -1 como um deslocamento pode ser usado para se referir ao mais recente.
-2 (mais antigo) como um deslocamento não é permitido.Valor padrão: "latest" |
endingOffsetsByTimestampTipo: String Tipo de Consulta: somente loteUma cadeia de caracteres JSON que especifica um carimbo de data/hora final para leitura até para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo1686444353000. Veja a observação abaixo sobre os detalhes do comportamento com carimbos de data/hora.endingOffsetsByTimestamp tem precedência sobre endingOffsets.Valor padrão: nenhum |
endingTimestampTipo: String Tipo de Consulta: somente loteUm valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo "1686444353000". Se o Kafka não retornar o deslocamento correspondente, o deslocamento será definido como mais recente. Veja a observação abaixo sobre os detalhes do comportamento com carimbos de data/hora. Observação: endingTimestamp tem precedência sobre endingOffsetsByTimestamp eendingOffsets.Valor padrão: nenhum |
includeHeadersTipo: Boolean Tipo de Consulta: streaming e loteSe os cabeçalhos Kafka devem ser incluídos na linha. Valor padrão: false |
kafka.<consumer_option>Tipo: String Tipo de Consulta: streaming e loteTodas as opções específicas do consumidor do Kafka podem ser passadas com o prefixo kafka.. Essas opções precisam ser acompanhadas por acentos graves quando fornecidas; caso contrário, você receberá um erro do analisador. Você pode encontrar as opções na documentação do Kafka.Observação: você não deve definir as seguintes opções com essa função: key.deserializer, value.deserializer, , bootstrap.serversgroup.idValor padrão: nenhum |
maxOffsetsPerTriggerTipo: Long Tipo de Consulta: somente streamingLimite de taxa do número máximo de deslocamentos ou linhas processadas por intervalo de acionamento. O número total especificado de deslocamentos será dividido proporcionalmente entre TopicPartitions. Valor padrão: nenhum |
startingOffsetsTipo: String Tipo de Consulta: streaming e loteO ponto de partida quando uma consulta é iniciada, seja "earliest", que é a partir dos primeiros deslocamentos, ou "latest", que é apenas a partir dos últimos deslocamentos, ou uma cadeia de caracteres JSON especificando um deslocamento inicial para cada TopicPartition. No JSON, -2 como um deslocamento pode ser usado para se referir ao mais antigo, e -1 ao mais recente.Observação: para as consultas em lote, a opção "mais recente" (implicitamente ou usando -1 em JSON) não é permitida. Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta serão iniciadas na opção “mais antigo”. Valor padrão: "latest" para streaming, "earliest" para lote |
startingOffsetsByTimestampTipo: String Tipo de Consulta: streaming e loteUma cadeia de caracteres JSON especificando um carimbo de data/hora inicial para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo 1686444353000. Veja a observação abaixo sobre os detalhes do comportamento com carimbos de data/hora. Se o Kafka não retornar o deslocamento correspondente, o comportamento será definido pelo valor da opção startingOffsetsByTimestampStrategy.startingOffsetsByTimestamp tem precedência sobre startingOffsets.Observação: para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta serão iniciadas na opção “mais antigo”. Valor padrão: nenhum |
startingOffsetsByTimestampStrategyTipo: String Tipo de Consulta: streaming e loteEssa estratégia é usada quando o deslocamento inicial especificado por timestamp (global ou por partição) não corresponde ao deslocamento retornado pelo Kafka. As estratégias disponíveis são:
Valor padrão: "error" |
startingTimestampTipo: String Tipo de Consulta: streaming e loteUm valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo "1686444353000". Veja a observação abaixo sobre os detalhes do comportamento com carimbos de data/hora. Se o Kafka não retornar o deslocamento correspondente, o comportamento será definido pelo valor da opção startingOffsetsByTimestampStrategy.startingTimestamp tem precedência sobre startingOffsetsByTimestamp e startingOffsets.Observação: para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas em uma consulta serão iniciadas mais cedo. Valor padrão: nenhum |
Observação
O deslocamento retornado para cada partição é o deslocamento mais antigo cujo carimbo de data/hora é maior ou igual ao carimbo de data/hora fornecido na partição correspondente. O comportamento varia entre as opções se o Kafka não retornar o deslocamento correspondente – verifique a descrição de cada opção.
O Spark simplesmente passa as informações do carimbo de data/hora para KafkaConsumer.offsetsForTimes, e não interpreta ou raciocina sobre o valor. Para obter mais detalhes sobre KafkaConsumer.offsetsForTimes, confira a documentação. Além disso, o significado do carimbo de data/hora pode variar de acordo com a configuração do Kafka (log.message.timestamp.type). Para obter detalhes, confira a Documentação do Apache Kafka.