Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Importante
Esta funcionalidade está em Pré-visualização Pública.
No Databricks Runtime 15.3 e superior, você pode usar o VARIANT tipo para ingerir dados semiestruturados. Este artigo descreve o comportamento e fornece padrões de exemplo para a ingestão de dados do armazenamento de objetos na nuvem usando o Auto Loader e COPY INTO, registros de streaming do Kafka e comandos SQL para criar novas tabelas com dados de variantes ou inserir novos registros usando o tipo de variante. A tabela a seguir resume os formatos de arquivo suportados e o suporte à versão do Databricks Runtime:
| Formato de ficheiro | Versão suportada do Databricks Runtime |
|---|---|
| JSON | 15.3 e superior |
| XML | 16.4 e superior |
| CSV | 16.4 e superior |
Consulte Dados de variantes de consulta.
Criar uma tabela com uma coluna variante
VARIANT é um tipo SQL padrão no Databricks Runtime 15.3 e superior e suportado por tabelas apoiadas pelo Delta Lake. As tabelas gerenciadas no Azure Databricks usam o Delta Lake por padrão, para que você possa criar uma tabela vazia com uma única coluna VARIANT usando a seguinte sintaxe:
CREATE TABLE table_name (variant_column VARIANT)
Como alternativa, você pode usar a PARSE_JSON função em uma cadeia de caracteres JSON ou a FROM_XML função em uma cadeia de caracteres XML para usar uma instrução CTAS para criar uma tabela com uma coluna variante. O exemplo a seguir cria uma tabela com duas colunas:
- A coluna
idé extraída da cadeia de caracteres JSON como um tipoSTRING. - A coluna
variant_columncontém toda a cadeia de caracteres JSON codificada comoVARIANTtipo.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Nota
O Databricks recomenda extrair e armazenar campos como colunas não variantes que você planeja usar para acelerar consultas e otimizar o layout de armazenamento.
VARIANT colunas não podem ser usadas para agrupar chaves, partições ou chaves de ordem Z. O tipo de dados VARIANT não pode ser usado para comparações, agrupamento, ordenação e operações de conjunto. Para obter uma lista completa de limitações, consulte Limitações.
Inserir dados usando parse_json
Se a tabela de destino já contiver uma coluna codificada como VARIANT, você poderá usar parse_json para inserir registros de cadeia de caracteres JSON como VARIANT, como no exemplo a seguir:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Inserir dados usando from_xml
Se a tabela de destino já contiver uma coluna codificada como VARIANT, você poderá usar from_xml para inserir registros de cadeia de caracteres XML como VARIANT. Por exemplo:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
Python
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
Inserir dados usando from_csv
Se a tabela de destino já contiver uma coluna codificada como VARIANT, você poderá usar from_csv para inserir registros de cadeia de caracteres XML como VARIANT. Por exemplo:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
Python
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
Ingerir dados do armazenamento de objetos na nuvem como variante
O Auto Loader pode ser usado para carregar todos os dados das fontes de arquivos suportadas como uma única VARIANT coluna em uma tabela de destino. Como VARIANT é flexível para alterações de esquema e tipo e mantém a diferenciação de maiúsculas e minúsculas e os valores de NULL presentes na fonte de dados, esse padrão é robusto para a maioria dos cenários de ingestão com as seguintes ressalvas:
- Registros malformados não podem ser codificados usando
VARIANTtipo. -
VARIANTO tipo só pode conter registros de até 16 MB de tamanho.
Nota
Variant trata registos excessivamente grandes da mesma forma que faz com registos corrompidos. No modo padrão de processamento PERMISSIVE, registos demasiadamente grandes são capturados no corruptRecordColumn.
Como todo o registro é registrado como uma única VARIANT coluna, nenhuma evolução do esquema ocorre durante a ingestão e rescuedDataColumn não é suportada. O exemplo a seguir pressupõe que a tabela de destino já exista com uma única coluna VARIANT.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Você também pode especificar VARIANT ao definir um esquema ou passar schemaHints. Os dados no campo de origem referenciada devem conter um registo válido. Os exemplos a seguir demonstram essa sintaxe:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Utilize COPY INTO com variante
A Databricks recomenda o uso do Auto Loader em vez de COPY INTO quando disponível.
COPY INTO suporta a ingestão de todo o conteúdo de uma fonte de dados suportada como uma única coluna. O exemplo a seguir cria uma nova tabela com uma única coluna VARIANT e, em seguida, usa COPY INTO para ingerir registros de uma fonte de arquivo JSON.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Transmitir dados do Kafka como variante
Muitos fluxos Kafka codificam suas cargas usando JSON. A ingestão de fluxos Kafka usando VARIANT torna essas cargas de trabalho robustas para alterações de esquema.
O exemplo a seguir demonstra a leitura de uma fonte de streaming de Kafka, convertendo o key em um STRING e o value em VARIANT, e escrevendo numa tabela de destino.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)