Partilhar via


Ingerir dados como variante semiestruturada

Importante

Esta funcionalidade está em Pré-visualização Pública.

No Databricks Runtime 15.3 e superior, você pode usar o VARIANT tipo para ingerir dados semiestruturados. Este artigo descreve o comportamento e fornece padrões de exemplo para a ingestão de dados do armazenamento de objetos na nuvem usando o Auto Loader e COPY INTO, registros de streaming do Kafka e comandos SQL para criar novas tabelas com dados de variantes ou inserir novos registros usando o tipo de variante. A tabela a seguir resume os formatos de arquivo suportados e o suporte à versão do Databricks Runtime:

Formato de ficheiro Versão suportada do Databricks Runtime
JSON 15.3 e superior
XML 16.4 e superior
CSV 16.4 e superior

Consulte Dados de variantes de consulta.

Criar uma tabela com uma coluna variante

VARIANT é um tipo SQL padrão no Databricks Runtime 15.3 e superior e suportado por tabelas apoiadas pelo Delta Lake. As tabelas gerenciadas no Azure Databricks usam o Delta Lake por padrão, para que você possa criar uma tabela vazia com uma única coluna VARIANT usando a seguinte sintaxe:

CREATE TABLE table_name (variant_column VARIANT)

Como alternativa, você pode usar a PARSE_JSON função em uma cadeia de caracteres JSON ou a FROM_XML função em uma cadeia de caracteres XML para usar uma instrução CTAS para criar uma tabela com uma coluna variante. O exemplo a seguir cria uma tabela com duas colunas:

  • A coluna id é extraída da cadeia de caracteres JSON como um tipo STRING.
  • A coluna variant_column contém toda a cadeia de caracteres JSON codificada como VARIANT tipo.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Nota

O Databricks recomenda extrair e armazenar campos como colunas não variantes que você planeja usar para acelerar consultas e otimizar o layout de armazenamento.

VARIANT colunas não podem ser usadas para agrupar chaves, partições ou chaves de ordem Z. O tipo de dados VARIANT não pode ser usado para comparações, agrupamento, ordenação e operações de conjunto. Para obter uma lista completa de limitações, consulte Limitações.

Inserir dados usando parse_json

Se a tabela de destino já contiver uma coluna codificada como VARIANT, você poderá usar parse_json para inserir registros de cadeia de caracteres JSON como VARIANT, como no exemplo a seguir:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Inserir dados usando from_xml

Se a tabela de destino já contiver uma coluna codificada como VARIANT, você poderá usar from_xml para inserir registros de cadeia de caracteres XML como VARIANT. Por exemplo:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_XML(xml_string, 'variant')
  FROM source_data

Python

from pyspark.sql.functions import col, from_xml

(spark.read
  .table("source_data")
  .select(from_xml(col("xml_string"), "variant"))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Inserir dados usando from_csv

Se a tabela de destino já contiver uma coluna codificada como VARIANT, você poderá usar from_csv para inserir registros de cadeia de caracteres XML como VARIANT. Por exemplo:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_CSV(csv_string, 'v variant').v
  FROM source_data

Python

from pyspark.sql.functions import col, from_csv

(spark.read
  .table("source_data")
  .select(from_csv(col("csv_string"), "v variant").v)
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Ingerir dados do armazenamento de objetos na nuvem como variante

O Auto Loader pode ser usado para carregar todos os dados das fontes de arquivos suportadas como uma única VARIANT coluna em uma tabela de destino. Como VARIANT é flexível para alterações de esquema e tipo e mantém a diferenciação de maiúsculas e minúsculas e os valores de NULL presentes na fonte de dados, esse padrão é robusto para a maioria dos cenários de ingestão com as seguintes ressalvas:

  • Registros malformados não podem ser codificados usando VARIANT tipo.
  • VARIANT O tipo só pode conter registros de até 16 MB de tamanho.

Nota

Variant trata registos excessivamente grandes da mesma forma que faz com registos corrompidos. No modo padrão de processamento PERMISSIVE, registos demasiadamente grandes são capturados no corruptRecordColumn.

Como todo o registro é registrado como uma única VARIANT coluna, nenhuma evolução do esquema ocorre durante a ingestão e rescuedDataColumn não é suportada. O exemplo a seguir pressupõe que a tabela de destino já exista com uma única coluna VARIANT.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Você também pode especificar VARIANT ao definir um esquema ou passar schemaHints. Os dados no campo de origem referenciada devem conter um registo válido. Os exemplos a seguir demonstram essa sintaxe:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Utilize COPY INTO com variante

A Databricks recomenda o uso do Auto Loader em vez de COPY INTO quando disponível.

COPY INTO suporta a ingestão de todo o conteúdo de uma fonte de dados suportada como uma única coluna. O exemplo a seguir cria uma nova tabela com uma única coluna VARIANT e, em seguida, usa COPY INTO para ingerir registros de uma fonte de arquivo JSON.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Transmitir dados do Kafka como variante

Muitos fluxos Kafka codificam suas cargas usando JSON. A ingestão de fluxos Kafka usando VARIANT torna essas cargas de trabalho robustas para alterações de esquema.

O exemplo a seguir demonstra a leitura de uma fonte de streaming de Kafka, convertendo o key em um STRING e o value em VARIANT, e escrevendo numa tabela de destino.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)