Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Aplica-se a:
Databricks SQL
Databricks Runtime 13.3 LTS e versões superiores
Lê dados de um cluster Apache Kafka e retorna os dados em forma de tabela.
Pode ler dados de um ou mais tópicos de Kafka. Suporta tanto consultas em lote como ingestão em fluxo contínuo.
Sintaxe
read_kafka([option_key => option_value ] [, ...])
Argumentos
Esta função requer invocação de parâmetro nomeado.
-
option_key: O nome da opção a ser configurada. Você deve usar backticks () for options that contain dots (.'). -
option_value: Uma expressão constante para definir a opção. Aceita literais e funções escalares.
Devoluções
Registros lidos de um cluster Apache Kafka com o seguinte esquema:
-
key BINARY: A chave do disco de Kafka. -
value BINARY NOT NULL: O valor do registo de Kafka. -
topic STRING NOT NULL: O nome do tópico Kafka do qual o registro é lido. -
partition INT NOT NULL: O ID da partição Kafka a partir da qual o registro é lido. -
offset BIGINT NOT NULL: O número do offset do registo noTopicPartitiondo Kafka. -
timestamp TIMESTAMP NOT NULL: Um valor de carimbo de data/hora para o registro. A colunatimestampTypedefine a que corresponde este timestamp. -
timestampType INTEGER NOT NULL: O tipo da marca temporal especificada na colunatimestamp. -
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Valores de cabeçalho fornecidos como parte do registro (se habilitado).
Exemplos
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opções
Você pode encontrar uma lista detalhada de opções na documentação do Apache Spark .
Opções necessárias
Forneça a opção abaixo para se conectar ao seu cluster Kafka.
| Opção |
|---|
bootstrapServersTipo: StringUma lista separada por vírgulas de pares host/porta apontando para o cluster Kafka. Valor padrão: Nenhum |
Forneça apenas uma das opções abaixo para configurar de quais tópicos do Kafka obter dados.
| Opção |
|---|
assignTipo: StringUma string JSON que contém as partições de tópico específicas a serem consumidas. Por exemplo, para '{"topicA":[0,1],"topicB":[2,4]}', as partições 0'th e 1st do topicA serão consumidas a partir de.Valor padrão: Nenhum |
subscribeTipo: StringUma lista separada por vírgulas de tópicos de Kafka para ler. Valor padrão: Nenhum |
subscribePatternTipo: StringUma expressão regular que corresponde a tópicos para assinar. Valor padrão: Nenhum |
Opções diversas
read_kafka pode ser usado em consultas em lote e em consultas de streaming. As opções abaixo especificam a que tipo de consulta se aplicam.
| Opção |
|---|
endingOffsetsTipo: String Tipo de consulta: apenas loteOs offsets a serem lidos até para uma consulta em lote, ou "latest" para especificar os registos mais recentes, ou uma string JSON que especifica um offset final para cada TopicPartition. No JSON, -1 como um deslocamento pode ser usado para se referir ao mais recente.
-2 (mais cedo) como deslocamento não é permitido.Valor predefinido: "latest" |
endingOffsetsByTimestampTipo: String Tipo de consulta: apenas loteUma cadeia de caracteres JSON especificando um carimbo de data/hora final para leitura até para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo1686444353000. Veja a nota abaixo sobre detalhes do comportamento com carimbos de data e hora.endingOffsetsByTimestamp tem precedência sobre endingOffsets.Valor padrão: Nenhum |
endingTimestampTipo: String Tipo de consulta: apenas loteUm valor de texto do timestamp em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo "1686444353000". Se o Kafka não retornar o offset correspondente, o offset será definido para o mais recente. Veja a nota abaixo sobre detalhes do comportamento com carimbos de data e hora. Nota: endingTimestamp tem precedência sobre endingOffsetsByTimestamp eendingOffsets.Valor padrão: Nenhum |
includeHeadersTipo: Boolean Tipo de consulta: streaming e loteSe os cabeçalhos de Kafka devem ser incluídos na linha. Valor predefinido: false |
kafka.<consumer_option>Tipo: String Tipo de consulta: streaming e loteQualquer opção específica do consumidor Kafka pode ser passada com o prefixo kafka. . Essas opções precisam ser cercadas por backticks quando fornecidas, caso contrário, você receberá um erro de analisador. Você pode encontrar as opções na documentação de Kafka.Nota: Não deve definir as seguintes opções com esta função: key.deserializer, value.deserializer, bootstrap.servers, group.idValor padrão: Nenhum |
maxOffsetsPerTriggerTipo: Long Tipo de consulta: apenas streamingLimite de taxa para o número máximo de deslocamentos ou linhas processadas por intervalo de ativação. O número total especificado de offsets será dividido proporcionalmente em TopicPartitions. Valor padrão: Nenhum |
startingOffsetsTipo: String Tipo de consulta: streaming e loteO ponto de partida quando uma consulta é iniciada é ou "earliest", que começa nos deslocamentos mais antigos, ou "latest", que começa a partir dos deslocamentos mais recentes, ou uma cadeia JSON especificando um deslocamento inicial para cada TopicPartition. No JSON, -2 usado como deslocamento pode referir-se ao mais antigo e -1 ao mais recente.Nota: Para consultas em lote, a versão mais recente (implicitamente ou usando -1 em JSON) não é permitida. Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas irão continuar a partir dos offsets definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão o mais cedo possível. Valor padrão: "latest" para streaming, "earliest" para lote |
startingOffsetsByTimestampTipo: String Tipo de consulta: streaming e loteUma cadeia de caracteres JSON especificando um carimbo de data/hora inicial para cada TopicPartition. Os timestamps precisam ser fornecidos como um valor longo do timestamp em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo 1686444353000. Veja a nota abaixo sobre detalhes do comportamento com carimbos de data e hora. Se Kafka não retornar o offset correspondente, o comportamento será determinado pelo valor da opção startingOffsetsByTimestampStrategy.startingOffsetsByTimestamp tem precedência sobre startingOffsets.Nota: Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas irão continuar a partir dos offsets definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão o mais cedo possível. Valor padrão: Nenhum |
startingOffsetsByTimestampStrategyTipo: String Tipo de consulta: streaming e loteEssa estratégia é utilizada quando o offset inicial especificado pelo timestamp (global ou por partição) não corresponde ao offset que o Kafka retornou. As estratégias disponíveis são:
Valor predefinido: "error" |
startingTimestampTipo: String Tipo de consulta: streaming e loteUm valor de texto do timestamp em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo "1686444353000". Veja a nota abaixo sobre detalhes do comportamento com carimbos de data e hora. Se Kafka não retornar o offset correspondente, o comportamento será determinado pelo valor da opção startingOffsetsByTimestampStrategy.startingTimestamp tem precedência sobre startingOffsetsByTimestamp e startingOffsets.Nota: Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas irão continuar a partir dos offsets definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão primeiro. Valor padrão: Nenhum |
Nota
O deslocamento retornado para cada partição é o deslocamento mais antigo cujo carimbo de data/hora é maior ou igual ao carimbo de data/hora fornecido na partição correspondente. O comportamento varia entre as opções se Kafka não retornar o deslocamento correspondente - verifique a descrição de cada opção.
O Spark simplesmente passa as informações do timestamp para o KafkaConsumer.offsetsForTimes e não interpreta ou raciocina sobre o valor. Para obter mais detalhes sobre KafkaConsumer.offsetsForTimes, consulte a documentação. Além disso, o significado de timestamp aqui pode variar de acordo com a configuração de Kafka (log.message.timestamp.type). Para obter detalhes, consulte a documentação do Apache Kafka.