Delen via


Gegevens opnemen als semi-gestructureerd varianttype

Belangrijk

Deze functie is beschikbaar als openbare preview.

In Databricks Runtime 15.3 en hoger kunt u het VARIANT type gebruiken om semi-gestructureerde gegevens op te nemen. In dit artikel wordt het gedrag beschreven en worden voorbeeldpatronen gegeven voor het opnemen van gegevens uit de opslag van cloudobjecten met behulp van automatisch laden en COPY INTOstreamingrecords uit Kafka en SQL-opdrachten voor het maken van nieuwe tabellen met variantgegevens of het invoegen van nieuwe records met behulp van het varianttype. De volgende tabel bevat een overzicht van de ondersteunde bestandsindelingen en ondersteuning voor databricks Runtime-versies:

Bestandsformaat Ondersteunde Databricks Runtime-versie
JSON 15.3 en hoger
XML 16.4 en hoger
CSV-bestand 16.4 en hoger

Zie Gegevens van query-varianten.

Een tabel met een variantkolom maken

VARIANT is een standaard SQL-type in Databricks Runtime 15.3 en hoger en wordt ondersteund door tabellen die worden ondersteund door Delta Lake. Beheerde tabellen in Azure Databricks maken standaard gebruik van Delta Lake, zodat u een lege tabel met één VARIANT kolom kunt maken met behulp van de volgende syntaxis:

CREATE TABLE table_name (variant_column VARIANT)

U kunt de PARSE_JSON functie ook gebruiken voor een JSON-tekenreeks of de FROM_XML functie in een XML-tekenreeks om een CTAS-instructie te gebruiken om een tabel met een variantkolom te maken. In het volgende voorbeeld wordt een tabel met twee kolommen gemaakt:

  • De id-kolom is uit de JSON-tekenreeks geëxtraheerd als een STRING-type.
  • De variant_column kolom bevat de hele JSON-tekenreeks die is gecodeerd als VARIANT type.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Notitie

Databricks raadt aan velden te extraheren en op te slaan als niet-variantkolommen die u wilt gebruiken om query's te versnellen en de opslagindeling te optimaliseren.

VARIANT kolommen kunnen niet worden gebruikt voor clustersleutels, partities of Z-volgordesleutels. Het VARIANT gegevenstype kan niet worden gebruikt voor vergelijkingen, groepering, volgorde en setbewerkingen. Zie Beperkingenvoor een volledige lijst met beperkingen.

Gegevens invoegen met behulp van parse_json

Als de doeltabel al een kolom bevat die is gecodeerd als VARIANT, kunt u JSON-tekenreeksrecords invoegen met parse_json als VARIANT in het volgende voorbeeld:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Gegevens invoegen met behulp van from_xml

Als de doeltabel al een kolom bevat die is gecodeerd als VARIANT, kunt u xml-tekenreeksrecords from_xml invoegen als VARIANT. Voorbeeld:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_XML(xml_string, 'variant')
  FROM source_data

Python

from pyspark.sql.functions import col, from_xml

(spark.read
  .table("source_data")
  .select(from_xml(col("xml_string"), "variant"))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Gegevens invoegen met behulp van from_csv

Als de doeltabel al een kolom bevat die is gecodeerd als VARIANT, kunt u xml-tekenreeksrecords from_csv invoegen als VARIANT. Voorbeeld:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_CSV(csv_string, 'v variant').v
  FROM source_data

Python

from pyspark.sql.functions import col, from_csv

(spark.read
  .table("source_data")
  .select(from_csv(col("csv_string"), "v variant").v)
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Gegevens opnemen uit cloudobjectopslag als variant

Automatisch laden kan worden gebruikt om alle gegevens uit de ondersteunde bestandsbronnen te laden als één VARIANT kolom in een doeltabel. Aangezien VARIANT flexibel is voor schema- en typewijzigingen en de hoofdlettergevoeligheid behoudt en waarden van NULL in de gegevensbron, is dit patroon robuust voor de meeste opnamescenario's met de volgende kanttekeningen:

  • Ongeldige records kunnen niet worden gecodeerd met het VARIANT type.
  • VARIANT type kan alleen records tot 16 mb groot bevatten.

Notitie

Variant behandelt te grote records die vergelijkbaar zijn met beschadigde records. In de standaardverwerkingsmodus PERMISSIVE worden te grote records vastgelegd in de corruptRecordColumn.

Omdat de hele record wordt vastgelegd als één VARIANT kolom, treedt er geen schemaontwikkeling op tijdens de opname en rescuedDataColumn wordt deze niet ondersteund. In het volgende voorbeeld wordt aangenomen dat de doel tabel al bestaat met één VARIANT, kolom.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

U kunt ook opgeven VARIANT wanneer u een schema definieert of doorgeeft schemaHints. De gegevens in het bronveld waarnaar wordt verwezen, moeten een geldige record bevatten. In de volgende voorbeelden ziet u deze syntaxis:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Gebruik COPY INTO met variant

Databricks raadt aan Auto Loader te gebruiken COPY INTO wanneer beschikbaar.

COPY INTO ondersteunt het opnemen van de volledige inhoud van een ondersteunde gegevensbron als één kolom. In het volgende voorbeeld wordt een nieuwe tabel met één VARIANT kolom gemaakt en vervolgens gebruikt COPY INTO voor het opnemen van records uit een JSON-bestandsbron.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Kafka-gegevens streamen als variant

Veel Kafka-streams coderen hun nettoladingen met behulp van JSON. Het opnemen van Kafka-streams maakt VARIANT deze workloads robuust voor schemawijzigingen.

In het volgende voorbeeld ziet u hoe u een Kafka-streamingbron leest, de key omzet naar STRING, de value omzet naar VARIANT, en wegschrijft naar een doeltabel.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)