重要
此功能目前以公共预览版提供。
本文介绍如何查询和转换存储为 VARIANT的半结构化数据。 Databricks Runtime 15.3 和更高版本中提供了 VARIANT 数据类型。
Databricks 建议对 JSON 字符串使用 VARIANT。 对于当前使用 JSON 字符串且想要迁移的用户,请参阅变体与 JSON 字符串有何不同?。
如果你想查看查询使用 JSON 字符串存储的半结构化数据的示例,请参阅查询 JSON 字符串。
注意
VARIANT 列不能用于聚类键、分区或 Z-顺序键。
VARIANT 数据类型不能用于比较、分组、排序和设置操作。 有关限制的完整列表,请参阅 限制。
创建一个包含变体列的表
若要创建变体列,请使用 parse_json 函数(SQL 或 Python)。
运行以下命令以创建一个表,其中的数据以 VARIANT 形式高度嵌套存储。 (此页的其他示例使用此数据。
SQL
-- Create a table with a variant column
CREATE TABLE store_data AS
SELECT parse_json(
'{
"store":{
"fruit": [
{"weight":8,"type":"apple"},
{"weight":9,"type":"pear"}
],
"basket":[
[1,2,{"b":"y","a":"x"}],
[3,4],
[5,6]
],
"book":[
{
"author":"Nigel Rees",
"title":"Sayings of the Century",
"category":"reference",
"price":8.95
},
{
"author":"Herman Melville",
"title":"Moby Dick",
"category":"fiction",
"price":8.99,
"isbn":"0-553-21311-3"
},
{
"author":"J. R. R. Tolkien",
"title":"The Lord of the Rings",
"category":"fiction",
"reader":[
{"age":25,"name":"bob"},
{"age":26,"name":"jack"}
],
"price":22.99,
"isbn":"0-395-19395-8"
}
],
"bicycle":{
"price":19.95,
"color":"red"
}
},
"owner":"amy",
"zip code":"94025",
"fb:testid":"1234"
}'
) as raw
SELECT * FROM store_data
Python
# Create a table with a variant column
store_data='''
{
"store":{
"fruit":[
{"weight":8,"type":"apple"},
{"weight":9,"type":"pear"}
],
"basket":[
[1,2,{"b":"y","a":"x"}],
[3,4],
[5,6]
],
"book":[
{
"author":"Nigel Rees",
"title":"Sayings of the Century",
"category":"reference",
"price":8.95
},
{
"author":"Herman Melville",
"title":"Moby Dick",
"category":"fiction",
"price":8.99,
"isbn":"0-553-21311-3"
},
{
"author":"J. R. R. Tolkien",
"title":"The Lord of the Rings",
"category":"fiction",
"reader":[
{"age":25,"name":"bob"},
{"age":26,"name":"jack"}
],
"price":22.99,
"isbn":"0-395-19395-8"
}
],
"bicycle":{
"price":19.95,
"color":"red"
}
},
"owner":"amy",
"zip code":"94025",
"fb:testid":"1234"
}
'''
# Create a DataFrame
df = spark.createDataFrame([(store_data,)], ["json"])
# Convert to a variant
df_variant = df.select(parse_json(col("json")).alias("raw"))
# Alternatively, create the DataFrame directly
# df_variant = spark.range(1).select(parse_json(lit(store_data)))
df_variant.display()
# Write out as a table
df_variant.write.saveAsTable("store_data")
查询变体列中的字段
若要从变体列中提取字段,请使用 variant_get 函数(SQL 或 Python)指定提取路径中的 JSON 字段的名称。 字段名称始终区分大小写。
SQL
-- Extract a top-level field
SELECT variant_get(store_data.raw, '$.owner') AS owner FROM store_data
还可以使用 SQL 语法查询变体列中的字段。 请参阅 SQL 中 variant_get 的简写形式。
Python
# Extract a top-level field
df_variant.select(variant_get(col("raw"), "$.owner", "string")).display()
variant_get的 SQL 速记
用于查询 Azure Databricks 上的 JSON 字符串和其他复杂数据类型的 SQL 语法适用于 VARIANT 数据,其中包括:
- 使用
:选择顶级字段。 - 使用
.或[<key>]选择具有命名键的嵌套字段。 - 使用
[<index>]从数组中选择值。
SELECT raw:owner FROM store_data
+-------+
| owner |
+-------+
| "amy" |
+-------+
-- Use backticks to escape special characters.
SELECT raw:`zip code`, raw:`fb:testid` FROM store_data
+----------+-----------+
| zip code | fb:testid |
+----------+-----------+
| "94025" | "1234" |
+----------+-----------+
如果字段名称包含句点 (.),则必须使用方括号 ([ ]) 对其进行转义。 例如,以下查询选择一个名为 zip.code 的字段:
SELECT raw:['zip.code'] FROM store_data
提取变体嵌套字段
若要从变体列中提取嵌套字段,请使用点表示法或括号指定它们。 字段名称始终区分大小写。
SQL
-- Use dot notation
SELECT raw:store.bicycle FROM store_data
-- Use brackets
SELECT raw:store['bicycle'] FROM store_data
如果找不到路径,则结果是类型 NULL 为 VARIANT。
Python
# Use dot notation
df_variant.select(variant_get(col("raw"), "$.store.bicycle", "string")).display()
# Use brackets
df_variant.select(variant_get(col("raw"), "$.store['bicycle']", "string")).display()
如果找不到路径,则结果是类型 null 为 VariantVal。
+-----------------+
| bicycle |
+-----------------+
| { |
| "color":"red", |
| "price":19.95 |
| } |
+-----------------+
从变体数组中提取值
若要从数组中提取元素,请使用括号编制索引。 索引从 0 开始。
SQL
-- Index elements
SELECT raw:store.fruit[0], raw:store.fruit[1] FROM store_data
Python
# Index elements
df_variant.select((variant_get(col("raw"), "$.store.fruit[0]", "string")),(variant_get(col("raw"), "$.store.fruit[1]", "string"))).display()
+-------------------+------------------+
| fruit | fruit |
+-------------------+------------------+
| { | { |
| "type":"apple", | "type":"pear", |
| "weight":8 | "weight":9 |
| } | } |
+-------------------+------------------+
如果找不到路径,或者如果数组索引超出边界,则结果为 null。
在 Python 中使用变体
可以将变体从 Spark 数据帧提取为 Python 中的VariantVal,然后使用toPython 和toJson 方法单独处理它们。
# toPython
data = [
('{"name": "Alice", "age": 25}',),
('["person", "electronic"]',),
('1',)
]
df_person = spark.createDataFrame(data, ["json"])
# Collect variants into a VariantVal
variants = df_person.select(parse_json(col("json")).alias("v")).collect()
输出 VariantVal 为 JSON 字符串:
print(variants[0].v.toJson())
{"age":25,"name":"Alice"}
将 a VariantVal 转换为 Python 对象:
# First element is a dictionary
print(variants[0].v.toPython()["age"])
25
# Second element is a List
print(variants[1].v.toPython()[1])
electronic
# Third element is an Integer
print(variants[2].v.toPython())
1
还可以使用VariantVal函数构造VariantVal.parseJson。
# parseJson to construct VariantVal's in Python
from pyspark.sql.types import VariantVal
variant = VariantVal.parseJson('{"a": 1}')
将变体打印为 JSON 字符串:
print(variant.toJson())
{"a":1}
将变体转换为 Python 对象并输出值:
print(variant.toPython()["a"])
1
返回变体的架构
若要返回变体的架构,请使用 schema_of_variant 函数(SQL 或 Python)。
SQL
-- Return the schema of the variant
SELECT schema_of_variant(raw) FROM store_data;
Python
# Return the schema of the variant
df_variant.select(schema_of_variant(col("raw"))).display()
若要返回组中所有变体的组合架构,请使用 schema_of_variant_agg 函数(SQL 或 Python)。
以下示例返回该架构,然后返回示例数据 json_data的组合架构。
SQL
CREATE OR REPLACE TEMP VIEW json_data AS
SELECT '{"name": "Alice", "age": 25}' AS json UNION ALL
SELECT '{"id": 101, "department": "HR"}' UNION ALL
SELECT '{"product": "Laptop", "price": 1200.50, "in_stock": true}';
-- Return the schema
SELECT schema_of_variant(parse_json(json)) FROM json_data;
Python
json_data = [
('{"name": "Alice", "age": 25}',),
('{"id": 101, "department": "HR"}',),
('{"product": "Laptop", "price": 1200.50, "in_stock": true}',)
]
df_item = spark.createDataFrame(json_data, ["json"])
# Return the schema
df_item.select(parse_json(col("json")).alias("v")).select(schema_of_variant(col("v"))).display()
+-----------------------------------------------------------------+
| schema_of_variant(v) |
+-----------------------------------------------------------------+
| OBJECT<age: BIGINT, name: STRING> |
| OBJECT<department: STRING, id: BIGINT> |
| OBJECT<in_stock: BOOLEAN, price: DECIMAL(5,1), product: STRING> |
+-----------------------------------------------------------------+
SQL
-- Return the combined schema
SELECT schema_of_variant_agg(parse_json(json)) FROM json_data;
Python
# Return the combined schema
df.select(parse_json(col("json")).alias("v")).select(schema_of_variant_agg(col("v"))).display()
+----------------------------------------------------------------------------------------------------------------------------+
| schema_of_variant(v) |
+----------------------------------------------------------------------------------------------------------------------------+
| OBJECT<age: BIGINT, department: STRING, id: BIGINT, in_stock: BOOLEAN, name: STRING, price: DECIMAL(5,1), product: STRING> |
+----------------------------------------------------------------------------------------------------------------------------+
平展变体对象和数组
variant_explode表值生成器函数(SQL 或 Python)可用于平展变体数组和对象。
SQL
由于 variant_explode 是生成器函数,因此可将其用作 FROM 子句的一部分,而不是在 SELECT 列表中使用,如以下示例所示:
SELECT key, value
FROM store_data,
LATERAL variant_explode(store_data.raw);
SELECT pos, value
FROM store_data,
LATERAL variant_explode(store_data.raw:store.basket[0]);
Python
使用 表值函数 (TVF) 数据帧 API 将变体扩展到多个行:
spark.tvf.variant_explode(parse_json(lit(store_data))).display()
# To explode a nested field, first create a DataFrame with just the field
df_store_col = df_variant.select(variant_get(col("raw"), "$.store", "variant").alias("store"))
# Perform the explode with a lateral join and the outer function to return the new exploded DataFrame
df_store_exploded_lj = df_store_col.lateralJoin(spark.tvf.variant_explode(col("store").outer()))
df_store_exploded = df_store_exploded_lj.drop("store")
df_store_exploded.display()
变体类型转换规则
你可以使用 VARIANT 类型存储数组和标量。 尝试将变体类型强制转换为其他类型时,普通强制转换规则适用于单个值和字段,并使用以下附加规则。
注意
variant_get 和 try_variant_get 采用类型参数并遵循这些强制转换规则。
| 源类型 | 行为 |
|---|---|
VOID |
结果是类型 NULL 为 VARIANT。 |
ARRAY<elementType> |
elementType 必须是可强制转换为 VARIANT 的类型。 |
当使用 schema_of_variant 或 schema_of_variant_agg 推断类型时,如果存在无法解决的冲突类型,函数会回退到 VARIANT 类型而不是 STRING 类型。
SQL
使用 try_variant_get 函数(SQL)转换:
-- price is returned as a double, not a string
SELECT try_variant_get(raw, '$.store.bicycle.price', 'double') as price FROM store_data
+------------------+
| price |
+------------------+
| 19.95 |
+------------------+
还可以使用 :: 或 cast 将值转换为支持的数据类型:
-- cast into more complex types
SELECT cast(raw:store.bicycle AS STRUCT<price DOUBLE, color STRING>) bicycle FROM store_data;
-- `::` also supported
SELECT raw:store.bicycle::STRUCT<price DOUBLE, color STRING> bicycle FROM store_data;
+------------------+
| bicycle |
+------------------+
| { |
| "price":19.95, |
| "color":"red" |
| } |
+------------------+
Python
使用 try_variant_get 函数(Python)转换:
# price is returned as a double, not a string
df_variant.select(try_variant_get(col("raw"), "$.store.bicycle.price", "double").alias("price"))
+------------------+
| price |
+------------------+
| 19.95 |
+------------------+
此外,还可使用 try_variant_get 函数(SQL 或 Python)来处理强制转换失败:
SQL
SELECT try_variant_get(
parse_json('{"a" : "c", "b" : 2}'),
'$.a',
'boolean'
)
Python
spark.range(1).select(parse_json(lit('{"a" : "c", "b" : 2}')).alias("v")).select(try_variant_get(col('v'), '$.a', 'boolean')).display()
变体 null 规则
使用 is_variant_null 函数(SQL 或 Python)确定变量值是否为 null 变量。
SQL
变体可以包含两种类型的 null:
-
NULLSQL :SQLNULL表示值缺失。 这些是与处理结构化数据时相同的NULL。 -
NULL变体 :变体NULL表示变体显式包含NULL值。 这些与 SQLNULL不同,因为NULL值存储在数据中。
SELECT
is_variant_null(parse_json(NULL)) AS sql_null,
is_variant_null(parse_json('null')) AS variant_null,
is_variant_null(parse_json('{ "field_a": null }'):field_a) AS variant_null_value,
is_variant_null(parse_json('{ "field_a": null }'):missing) AS missing_sql_value_null
+--------+------------+------------------+----------------------+
|sql_null|variant_null|variant_null_value|missing_sql_value_null|
+--------+------------+------------------+----------------------+
| false| true| true| false|
+--------+------------+------------------+----------------------+
Python
data = [
('null',),
(None,),
('{"field_a" : 1, "field_b" : 2}',)
]
df = spark.createDataFrame(data, ["null_data"])
df.select(parse_json(col("null_data")).alias("v")).select(is_variant_null(col("v"))).display()
+------------------+
|is_variant_null(v)|
+------------------+
| true|
+------------------+
| false|
+------------------+
| false|
+------------------+