Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Belangrijk
Deze functie is beschikbaar als openbare preview.
In Databricks Runtime 15.3 en hoger kunt u het VARIANT type gebruiken om semi-gestructureerde gegevens op te nemen. In dit artikel wordt het gedrag beschreven en worden voorbeeldpatronen gegeven voor het opnemen van gegevens uit de opslag van cloudobjecten met behulp van automatisch laden en COPY INTOstreamingrecords uit Kafka en SQL-opdrachten voor het maken van nieuwe tabellen met variantgegevens of het invoegen van nieuwe records met behulp van het varianttype. De volgende tabel bevat een overzicht van de ondersteunde bestandsindelingen en ondersteuning voor databricks Runtime-versies:
| Bestandsformaat | Ondersteunde Databricks Runtime-versie |
|---|---|
| JSON | 15.3 en hoger |
| XML | 16.4 en hoger |
| CSV-bestand | 16.4 en hoger |
Zie Gegevens van query-varianten.
Een tabel met een variantkolom maken
VARIANT is een standaard SQL-type in Databricks Runtime 15.3 en hoger en wordt ondersteund door tabellen die worden ondersteund door Delta Lake. Beheerde tabellen in Azure Databricks maken standaard gebruik van Delta Lake, zodat u een lege tabel met één VARIANT kolom kunt maken met behulp van de volgende syntaxis:
CREATE TABLE table_name (variant_column VARIANT)
U kunt de PARSE_JSON functie ook gebruiken voor een JSON-tekenreeks of de FROM_XML functie in een XML-tekenreeks om een CTAS-instructie te gebruiken om een tabel met een variantkolom te maken. In het volgende voorbeeld wordt een tabel met twee kolommen gemaakt:
- De
id-kolom is uit de JSON-tekenreeks geëxtraheerd als eenSTRING-type. - De
variant_columnkolom bevat de hele JSON-tekenreeks die is gecodeerd alsVARIANTtype.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Notitie
Databricks raadt aan velden te extraheren en op te slaan als niet-variantkolommen die u wilt gebruiken om query's te versnellen en de opslagindeling te optimaliseren.
VARIANT kolommen kunnen niet worden gebruikt voor clustersleutels, partities of Z-volgordesleutels. Het VARIANT gegevenstype kan niet worden gebruikt voor vergelijkingen, groepering, volgorde en setbewerkingen. Zie Beperkingenvoor een volledige lijst met beperkingen.
Gegevens invoegen met behulp van parse_json
Als de doeltabel al een kolom bevat die is gecodeerd als VARIANT, kunt u JSON-tekenreeksrecords invoegen met parse_json als VARIANT in het volgende voorbeeld:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Gegevens invoegen met behulp van from_xml
Als de doeltabel al een kolom bevat die is gecodeerd als VARIANT, kunt u xml-tekenreeksrecords from_xml invoegen als VARIANT. Voorbeeld:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
Python
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
Gegevens invoegen met behulp van from_csv
Als de doeltabel al een kolom bevat die is gecodeerd als VARIANT, kunt u xml-tekenreeksrecords from_csv invoegen als VARIANT. Voorbeeld:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
Python
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
Gegevens opnemen uit cloudobjectopslag als variant
Automatisch laden kan worden gebruikt om alle gegevens uit de ondersteunde bestandsbronnen te laden als één VARIANT kolom in een doeltabel. Aangezien VARIANT flexibel is voor schema- en typewijzigingen en de hoofdlettergevoeligheid behoudt en waarden van NULL in de gegevensbron, is dit patroon robuust voor de meeste opnamescenario's met de volgende kanttekeningen:
- Ongeldige records kunnen niet worden gecodeerd met het
VARIANTtype. -
VARIANTtype kan alleen records tot 16 mb groot bevatten.
Notitie
Variant behandelt te grote records die vergelijkbaar zijn met beschadigde records. In de standaardverwerkingsmodus PERMISSIVE worden te grote records vastgelegd in de corruptRecordColumn.
Omdat de hele record wordt vastgelegd als één VARIANT kolom, treedt er geen schemaontwikkeling op tijdens de opname en rescuedDataColumn wordt deze niet ondersteund. In het volgende voorbeeld wordt aangenomen dat de doel tabel al bestaat met één VARIANT, kolom.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
U kunt ook opgeven VARIANT wanneer u een schema definieert of doorgeeft schemaHints. De gegevens in het bronveld waarnaar wordt verwezen, moeten een geldige record bevatten. In de volgende voorbeelden ziet u deze syntaxis:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Gebruik COPY INTO met variant
Databricks raadt aan Auto Loader te gebruiken COPY INTO wanneer beschikbaar.
COPY INTO ondersteunt het opnemen van de volledige inhoud van een ondersteunde gegevensbron als één kolom. In het volgende voorbeeld wordt een nieuwe tabel met één VARIANT kolom gemaakt en vervolgens gebruikt COPY INTO voor het opnemen van records uit een JSON-bestandsbron.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Kafka-gegevens streamen als variant
Veel Kafka-streams coderen hun nettoladingen met behulp van JSON. Het opnemen van Kafka-streams maakt VARIANT deze workloads robuust voor schemawijzigingen.
In het volgende voorbeeld ziet u hoe u een Kafka-streamingbron leest, de key omzet naar STRING, de value omzet naar VARIANT, en wegschrijft naar een doeltabel.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)