Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
Esse recurso está em uma versão prévia.
No Databricks Runtime 15.3 e superior, você pode usar o tipo VARIANT para ingerir dados semiestruturados. Este artigo descreve o comportamento e fornece padrões de exemplo para ingerir dados do armazenamento de objetos de nuvem usando o Carregador Automático e COPY INTO, os registros de streaming do Kafka e comandos SQL para criar novas tabelas com dados variantes ou inserir novos registros usando o tipo variante. A tabela a seguir resume os formatos de arquivo com suporte e o suporte à versão do Databricks Runtime:
| Formato de arquivo | Versão do Databricks Runtime com suporte |
|---|---|
| JSON | 15.3 e superior |
| XML | 16.4 e superior |
| CSV | 16.4 e superior |
Consulte Dados de variante da consulta.
Criar uma tabela com uma coluna variante
VARIANT é um tipo SQL padrão no Databricks Runtime 15.3 e superior e suportado por tabelas apoiadas pelo Delta Lake. As tabelas gerenciadas no Azure Databricks usam o Delta Lake por padrão, para que você possa criar uma tabela vazia com uma única coluna VARIANT usando a seguinte sintaxe:
CREATE TABLE table_name (variant_column VARIANT)
Como alternativa, você pode usar a PARSE_JSON função em uma cadeia de caracteres JSON ou a FROM_XML função em uma cadeia de caracteres XML para usar uma instrução CTAS para criar uma tabela com uma coluna variante. O exemplo a seguir cria uma tabela com duas colunas:
- A coluna
idextraída da cadeia de caracteres JSON foi do tipoSTRING. - A coluna
variant_columncontém toda a cadeia de caracteres JSON codificada como tipoVARIANT.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Observação
O Databricks recomenda extrair e armazenar campos como colunas não variantes que você planeja usar para acelerar consultas e otimizar o layout de armazenamento.
As colunas VARIANT não podem ser usadas para chaves de clustering, partições ou chaves de ordem Z. O tipo de dados VARIANT não pode ser usado para comparações, agrupamento, ordenação e operações de conjunto. Para ver uma lista completa das limitações, confira Limitações.
Inserir dados usando parse_json
Se a tabela de destino já contiver uma coluna codificada como VARIANT, você poderá usar parse_json para inserir registros de cadeia de caracteres JSON como VARIANT, como no exemplo a seguir:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Inserir dados usando from_xml
Se a tabela de destino já contiver uma coluna codificada como VARIANT, você poderá usar from_xml para inserir registros de cadeia de caracteres XML como VARIANT. Por exemplo:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
Python
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
Inserir dados usando from_csv
Se a tabela de destino já contiver uma coluna codificada como VARIANT, você poderá usar from_csv para inserir registros de cadeia de caracteres XML como VARIANT. Por exemplo:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
Python
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
Ingerir dados do armazenamento de objetos de nuvem como variante
O Carregador Automático pode ser usado para carregar todos os dados das fontes de arquivo com suporte como uma única VARIANT coluna em uma tabela de destino. Como VARIANT é flexível a alterações de esquema e tipo, e mantém a diferenciação de maiúsculas e minúsculas e os valores NULL presentes na fonte de dados, esse padrão é robusto para a maioria dos cenários de ingestão, com as seguintes ressalvas:
- Registros malformados não podem ser codificados usando
VARIANTo tipo. - O tipo
VARIANTsó pode conter registros de até 16 mb de tamanho.
Observação
Variant trata registros excessivamente grandes de forma semelhante a registros corrompidos. No modo de processamento padrão PERMISSIVE , registros excessivamente grandes são capturados no corruptRecordColumn.
Como todo o registro é registrado como uma única VARIANT coluna, nenhuma evolução de esquema ocorre durante a ingestão e rescuedDataColumn não é suportado. O exemplo a seguir pressupõe que a tabela de destino já existe com uma única coluna VARIANT.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Você também pode especificar VARIANT ao definir um esquema ou passar schemaHints. Os dados no campo de origem referenciado devem conter um registro válido. Os exemplos a seguir demonstram essa sintaxe:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Usar COPY INTO com variante
Databricks recomenda usar o Carregador Automático ao invés de COPY INTO quando disponível.
COPY INTO suporta a ingestão de todo o conteúdo de uma fonte de dados compatível como uma única coluna. O exemplo a seguir cria uma nova tabela com uma única coluna VARIANT e, em seguida, usa COPY INTO para ingerir registros de uma fonte de arquivo JSON.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Transmitir dados do Kafka como variante
Muitos fluxos Kafka codificam suas cargas usando JSON. A ingestão de fluxos Kafka usando VARIANT torna essas cargas de trabalho robustas para alterações de esquema.
O exemplo a seguir demonstra a leitura de uma fonte de streaming do Kafka, convertendo o key como STRING e o valuecomo VARIANT, e gravando na tabela de destino.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)