Partilhar via


read_kafka função que retorna um valor de tabela

Aplica-se a:verificado sim Databricks SQL verificado sim Databricks Runtime 13.3 LTS e versões superiores

Lê dados de um cluster Apache Kafka e retorna os dados em forma de tabela.

Pode ler dados de um ou mais tópicos de Kafka. Suporta tanto consultas em lote como ingestão em fluxo contínuo.

Sintaxe

read_kafka([option_key => option_value ] [, ...])

Argumentos

Esta função requer invocação de parâmetro nomeado.

  • option_key: O nome da opção a ser configurada. Você deve usar backticks () for options that contain dots (.').
  • option_value: Uma expressão constante para definir a opção. Aceita literais e funções escalares.

Devoluções

Registros lidos de um cluster Apache Kafka com o seguinte esquema:

  • key BINARY: A chave do disco de Kafka.
  • value BINARY NOT NULL: O valor do registo de Kafka.
  • topic STRING NOT NULL: O nome do tópico Kafka do qual o registro é lido.
  • partition INT NOT NULL: O ID da partição Kafka a partir da qual o registro é lido.
  • offset BIGINT NOT NULL: O número do offset do registo no TopicPartitiondo Kafka.
  • timestamp TIMESTAMP NOT NULL: Um valor de carimbo de data/hora para o registro. A coluna timestampType define a que corresponde este timestamp.
  • timestampType INTEGER NOT NULL: O tipo da marca temporal especificada na coluna timestamp.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Valores de cabeçalho fornecidos como parte do registro (se habilitado).

Exemplos

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Opções

Você pode encontrar uma lista detalhada de opções na documentação do Apache Spark .

Opções necessárias

Forneça a opção abaixo para se conectar ao seu cluster Kafka.

Opção
bootstrapServers
Tipo: String
Uma lista separada por vírgulas de pares host/porta apontando para o cluster Kafka.
Valor padrão: Nenhum

Forneça apenas uma das opções abaixo para configurar de quais tópicos do Kafka obter dados.

Opção
assign
Tipo: String
Uma string JSON que contém as partições de tópico específicas a serem consumidas. Por exemplo, para '{"topicA":[0,1],"topicB":[2,4]}', as partições 0'th e 1st do topicA serão consumidas a partir de.
Valor padrão: Nenhum
subscribe
Tipo: String
Uma lista separada por vírgulas de tópicos de Kafka para ler.
Valor padrão: Nenhum
subscribePattern
Tipo: String
Uma expressão regular que corresponde a tópicos para assinar.
Valor padrão: Nenhum

Opções diversas

read_kafka pode ser usado em consultas em lote e em consultas de streaming. As opções abaixo especificam a que tipo de consulta se aplicam.

Opção
endingOffsets
Tipo: String Tipo de consulta: apenas lote
Os offsets a serem lidos até para uma consulta em lote, ou "latest" para especificar os registos mais recentes, ou uma string JSON que especifica um offset final para cada TopicPartition. No JSON, -1 como um deslocamento pode ser usado para se referir ao mais recente. -2 (mais cedo) como deslocamento não é permitido.
Valor predefinido: "latest"
endingOffsetsByTimestamp
Tipo: String Tipo de consulta: apenas lote
Uma cadeia de caracteres JSON especificando um carimbo de data/hora final para leitura até para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo
1686444353000. Veja a nota abaixo sobre detalhes do comportamento com carimbos de data e hora.
endingOffsetsByTimestamp tem precedência sobre endingOffsets.
Valor padrão: Nenhum
endingTimestamp
Tipo: String Tipo de consulta: apenas lote
Um valor de texto do timestamp em milissegundos desde
1970-01-01 00:00:00 UTC, por exemplo "1686444353000". Se o Kafka não retornar o offset correspondente, o offset será definido para o mais recente. Veja a nota abaixo sobre detalhes do comportamento com carimbos de data e hora. Nota: endingTimestamp tem precedência sobre endingOffsetsByTimestamp e
endingOffsets.
Valor padrão: Nenhum
includeHeaders
Tipo: Boolean Tipo de consulta: streaming e lote
Se os cabeçalhos de Kafka devem ser incluídos na linha.
Valor predefinido: false
kafka.<consumer_option>
Tipo: String Tipo de consulta: streaming e lote
Qualquer opção específica do consumidor Kafka pode ser passada com o prefixo kafka. . Essas opções precisam ser cercadas por backticks quando fornecidas, caso contrário, você receberá um erro de analisador. Você pode encontrar as opções na documentação de Kafka.
Nota: Não deve definir as seguintes opções com esta função:
key.deserializer, value.deserializer, bootstrap.servers, group.id
Valor padrão: Nenhum
maxOffsetsPerTrigger
Tipo: Long Tipo de consulta: apenas streaming
Limite de taxa para o número máximo de deslocamentos ou linhas processadas por intervalo de ativação. O número total especificado de offsets será dividido proporcionalmente em TopicPartitions.
Valor padrão: Nenhum
startingOffsets
Tipo: String Tipo de consulta: streaming e lote
O ponto de partida quando uma consulta é iniciada é ou "earliest", que começa nos deslocamentos mais antigos, ou "latest", que começa a partir dos deslocamentos mais recentes, ou uma cadeia JSON especificando um deslocamento inicial para cada TopicPartition. No JSON, -2 usado como deslocamento pode referir-se ao mais antigo e -1 ao mais recente.
Nota: Para consultas em lote, a versão mais recente (implicitamente ou usando -1 em JSON) não é permitida. Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas irão continuar a partir dos offsets definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão o mais cedo possível.
Valor padrão: "latest" para streaming, "earliest" para lote
startingOffsetsByTimestamp
Tipo: String Tipo de consulta: streaming e lote
Uma cadeia de caracteres JSON especificando um carimbo de data/hora inicial para cada TopicPartition. Os timestamps precisam ser fornecidos como um valor longo do timestamp em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo 1686444353000. Veja a nota abaixo sobre detalhes do comportamento com carimbos de data e hora. Se Kafka não retornar o offset correspondente, o comportamento será determinado pelo valor da opção startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp tem precedência sobre startingOffsets.
Nota: Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas irão continuar a partir dos offsets definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão o mais cedo possível.
Valor padrão: Nenhum
startingOffsetsByTimestampStrategy
Tipo: String Tipo de consulta: streaming e lote
Essa estratégia é utilizada quando o offset inicial especificado pelo timestamp (global ou por partição) não corresponde ao offset que o Kafka retornou. As estratégias disponíveis são:
  • "error": falha na consulta
  • "latest": atribui o offset mais recente para essas partições para que o Spark possa ler registos mais recentes dessas partições em micro-lotes posteriores.

Valor predefinido: "error"
startingTimestamp
Tipo: String Tipo de consulta: streaming e lote
Um valor de texto do timestamp em milissegundos desde
1970-01-01 00:00:00 UTC, por exemplo "1686444353000". Veja a nota abaixo sobre detalhes do comportamento com carimbos de data e hora. Se Kafka não retornar o offset correspondente, o comportamento será determinado pelo valor da opção startingOffsetsByTimestampStrategy.
startingTimestamp tem precedência sobre startingOffsetsByTimestamp e startingOffsets.
Nota: Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas irão continuar a partir dos offsets definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão primeiro.
Valor padrão: Nenhum

Nota

O deslocamento retornado para cada partição é o deslocamento mais antigo cujo carimbo de data/hora é maior ou igual ao carimbo de data/hora fornecido na partição correspondente. O comportamento varia entre as opções se Kafka não retornar o deslocamento correspondente - verifique a descrição de cada opção.

O Spark simplesmente passa as informações do timestamp para o KafkaConsumer.offsetsForTimes e não interpreta ou raciocina sobre o valor. Para obter mais detalhes sobre KafkaConsumer.offsetsForTimes, consulte a documentação. Além disso, o significado de timestamp aqui pode variar de acordo com a configuração de Kafka (log.message.timestamp.type). Para obter detalhes, consulte a documentação do Apache Kafka.