Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Wichtig
Dieses Feature befindet sich in der Public Preview.
In Databricks Runtime 15.3 und höher können Sie den VARIANT Typ verwenden, um halbstrukturierte Daten zu erfassen. In diesem Artikel werden das Verhalten beschrieben und Beispielmuster zum Aufnehmen von Daten aus dem Cloudobjektspeicher mithilfe von AutoLadeprogramm und COPY INTO, Streamingdatensätzen von Kafka und SQL-Befehlen zum Erstellen neuer Tabellen mit Variantendaten oder Einfügen neuer Datensätze mithilfe des Variantentyps bereitgestellt. In der folgenden Tabelle sind die unterstützten Dateiformate und die Unterstützung der Databricks-Runtime-Version zusammengefasst:
| Dateiformat | Unterstützte Databricks-Runtime-Version |
|---|---|
| JSON | 15.3 und höher |
| XML | 16.4 und höher |
| CSV-Datei | 16.4 und höher |
Siehe Abfragevariantendaten.
Erstellen einer Tabelle mit einer Variant-Spalte
VARIANT ist ein standardmäßiger SQL-Typ in Databricks Runtime 15.3 und höher und wird von Tabellen unterstützt, die von Delta Lake unterstützt werden. Verwaltete Tabellen in Azure Databricks verwenden Delta Lake standardmäßig, sodass Sie eine leere Tabelle mit einer einzelnen VARIANT Spalte mit der folgenden Syntax erstellen können:
CREATE TABLE table_name (variant_column VARIANT)
Alternativ können Sie die PARSE_JSON Funktion für eine JSON-Zeichenfolge oder die FROM_XML Funktion in einer XML-Zeichenfolge verwenden, um eine CTAS-Anweisung zum Erstellen einer Tabelle mit einer Variant-Spalte zu verwenden. Im folgenden Beispiel wird eine Tabelle mit zwei Spalten erstellt:
- Die
idSpalte wurde aus der JSON-Zeichenfolge extrahiert und alsSTRINGTyp ausgewiesen. - Die
variant_columnSpalte enthält die gesamte JSON-Zeichenfolge, die alsVARIANTTyp codiert ist.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Hinweis
Databricks empfiehlt das Extrahieren und Speichern von Feldern als Nicht-Variant-Spalten, die Sie verwenden möchten, um Abfragen zu beschleunigen und das Speicherlayout zu optimieren.
VARIANT Spalten können nicht für Clusterschlüssel, Partitionen oder Z-Reihenfolge-Schlüssel verwendet werden. Der VARIANT Datentyp kann nicht für Vergleiche, Gruppierung, Sortierung und Festlegen von Vorgängen verwendet werden. Eine vollständige Liste der Einschränkungen finden Sie unter "Einschränkungen".
Einfügen von Daten mithilfe von parse_json
Wenn die Zieltabelle bereits eine Spalte codiert als VARIANT enthält, können Sie parse_json benutzen, um JSON-Zeichenfolgeneinträge als VARIANT einzufügen, wie im folgenden Beispiel:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Einfügen von Daten mithilfe von from_xml
Wenn die Zieltabelle bereits eine Spalte enthält, die als VARIANT codiert ist, können Sie from_xml verwenden, um XML-Zeichenfolgeneinträge als VARIANT einzufügen. Beispiel:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
Python
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
Einfügen von Daten mithilfe von from_csv
Wenn die Zieltabelle bereits eine Spalte enthält, die als VARIANT codiert ist, können Sie from_csv verwenden, um XML-Zeichenfolgeneinträge als VARIANT einzufügen. Beispiel:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
Python
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
Aufnehmen von Daten aus Cloudobjektspeicher als Variante
Das automatische Laden kann verwendet werden, um alle Daten aus den unterstützten Dateiquellen als einzelne VARIANT Spalte in einer Zieltabelle zu laden. Da VARIANT flexibel für Schema- und Typänderungen ist und die Groß-/Kleinschreibung und NULL-Werte in der Datenquelle verwaltet, ist dieses Muster robust für die meisten Aufnahmeszenarios mit den folgenden Einschränkungen:
- Falsch formatierte Datensätze können nicht mithilfe des
VARIANTTyps codiert werden. - Der
VARIANT-Typ kann Datensätze nur bis zu 16 MB groß halten.
Hinweis
Variant behandelt übermäßig große Datensätze, ähnlich wie beschädigte Datensätze. Im Standardmodus PERMISSIVE werden übermäßig große Datensätze im corruptRecordColumn erfasst.
Da der gesamte Datensatz als einzelne VARIANT Spalte aufgezeichnet wird, tritt während der Aufnahme keine Schemaentwicklung auf und rescuedDataColumn wird nicht unterstützt. Im folgenden Beispiel wird davon ausgegangen, dass die Zieltabelle bereits mit einer einzelnen VARIANT Spalte vorhanden ist.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Sie können auch VARIANT angeben, wenn Sie ein Schema definieren oder schemaHints übergeben. Die Daten im Referenzquellfeld müssen einen gültigen Datensatz enthalten. Die folgenden Beispiele veranschaulichen diese Syntax:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Verwendung von COPY INTO mit Variante
Databricks empfiehlt die Verwendung von Auto Loader anstelle von COPY INTO, falls vorhanden.
COPY INTO unterstützt das Aufnehmen des gesamten Inhalts einer unterstützten Datenquelle als einzelne Spalte. Im folgenden Beispiel wird eine neue Tabelle mit einer einzelnen VARIANT Spalte erstellt und dann zum Erfassen von Datensätzen aus einer JSON-Dateiquelle COPY INTO verwendet.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Stream Kafka-Daten als Variante
Viele Kafka-Streams codieren ihre Nutzlasten mithilfe von JSON. Das Aufnehmen von Kafka-Datenströmen mithilfe von VARIANT macht diese Workloads robust gegenüber Schemaänderungen.
Das folgende Beispiel veranschaulicht das Lesen einer Kafka-Streamingquelle, das Umwandeln des key-Typs als STRING und value als VARIANT und das Schreiben in eine Zieltabelle.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)