Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Apache Avro ist ein Datenserialisierungssystem. Avro bietet:
- Umfangreiche Datenstrukturen.
- Ein kompaktes, schnelles binäres Datenformat.
- Eine Containerdatei zum Speichern persistenter Daten.
- Remoteprozeduraufruf (RPC).
- Einfache Integration in dynamische Sprachen. Die Codegenerierung ist weder zum Lesen oder Schreiben von Datendateien noch zum Verwenden oder Implementieren von RPC-Protokollen erforderlich. Codegenerierung als optionale Optimierung, deren Implementierung nur für statisch typisierte Sprachen sinnvoll ist.
Die Avro-Datenquelle unterstützt Folgendes:
- Schemakonvertierung: Automatische Konvertierung zwischen Apache Spark SQL- und Avro-Datensätzen.
- Partitionierung: Einfaches Lesen und Schreiben partitionierter Daten ohne zusätzliche Konfiguration.
- Komprimierung: Komprimierung, die beim Schreiben von Avro auf den Datenträger verwendet werden soll. Die unterstützten Typen sind
uncompressed,snappyunddeflate. Sie können auch die Deflate-Ebene angeben. - Datensatznamen: Datensatzname und Namespace durch Übergabe einer Zuordnung von Parametern mit
recordNameundrecordNamespace.
Weitere Informationen finden Sie auch unter Lesen und Schreiben von Avro-Streamingdaten.
Konfiguration
Sie können das Verhalten einer Avro-Datenquelle mithilfe verschiedener Konfigurationsparameter ändern.
Um Dateien ohne die Erweiterung .avro beim Lesen zu ignorieren, können Sie den Parameter avro.mapred.ignore.inputs.without.extension in der Hadoop-Konfiguration festlegen. Der Standardwert lautet false.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Legen Sie die folgenden Spark-Eigenschaften fest, um die Komprimierung beim Schreiben zu konfigurieren:
- Komprimierungscodec:
spark.sql.avro.compression.codec. Unterstützte Codecs sindsnappyunddeflate. Der Standardcodec istsnappy. - Wenn der Komprimierungscodec
deflateist, können Sie die Komprimierungsebene mitspark.sql.avro.deflate.levelfestlegen. Die Standardebene ist-1.
Sie können diese Eigenschaften in der Spark-Konfiguration des Clusters oder zur Laufzeit mit spark.conf.set() festlegen. Beispiele:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Für Databricks Runtime 9.1 LTS und höher können Sie das standardmäßige Schemarückschlussverhalten in Avro ändern, indem Sie die Option mergeSchema beim Lesen von Dateien bereitstellen. Wenn Sie mergeSchema auf true festlegen, wird ein Schema aus einem Satz von Avro-Dateien im Zielverzeichnis abgeleitet und zusammengeführt, anstatt das Leseschema aus einer einzelnen Datei abzuleiten.
Unterstützte Typen für Avro –> Spark-SQL-Konvertierung
Diese Bibliothek unterstützt das Lesen aller Avro-Typen. Dabei wird die folgende Zuordnung von Avro-Typen zu Spark-SQL-Typen verwendet:
| Avro-Typ | Spark SQL-Typ |
|---|---|
| Boolescher Wert | Boolescher Typ |
| INT | Integer-Typ |
| lang | LongType |
| Schweben | FloatType |
| doppelt | DoubleType |
| Byte | Binärtyp |
| Zeichenfolge | Zeichenkettentyp |
| Datensatz (record) | StructType |
| Enumeration | Zeichenkettentyp |
| Array | Array-Typ |
| Karte | MapType |
| behoben | Binärtyp |
| Vereinigung | Siehe Union-Typen. |
Union-Datentypen
Die Avro-Datenquelle unterstützt das Lesen von union-Typen. Avro betrachtet die folgenden drei Typen als union-Typen:
-
union(int, long)istLongTypezugeordnet. -
union(float, double)istDoubleTypezugeordnet. -
union(something, null), wobeisomethingein beliebiger unterstützter Avro-Typ ist. Dies wird demselben Spark SQL-Typ zugeordnet wie der vonsomething, wobeinullableauftruefestgelegt ist.
Alle anderen union-Typen sind komplexe Typen. Sie werden StructType zugeordnet, wobei die Feldnamen in Übereinstimmung mit den Mitgliedern von member0member1, union usw. lauten. Dies entspricht dem Verhalten beim Konvertieren zwischen Avro und Parquet.
Logische Typen
Die Avro-Datenquelle unterstützt das Lesen der folgenden logischen Avro-Typen:
| Logischer Avro-Typ | Avro-Typ | Spark SQL-Typ |
|---|---|---|
| Datum | INT | Datumstyp |
| timestamp-millis | lang | Zeitstempeltyp |
| Zeitstempel-Mikros | lang | Zeitstempeltyp |
| Decimal | behoben | Dezimaltyp |
| Decimal | Byte | Dezimaltyp |
Hinweis
Die Avro-Datenquelle ignoriert Dokumente, Aliase und andere Eigenschaften, die in der Avro-Datei vorhanden sind.
Unterstützte Typen für Spark SQL – Konvertierung von > Avro
Diese Bibliothek unterstützt das Schreiben aller Spark SQL-Typen in Avro. Bei den meisten Typen ist die Zuordnung von Spark-Typen zu Avro-Typen unkompliziert (z. B. wird IntegerType in int konvertiert). Im Folgenden finden Sie eine Liste der wenigen Sonderfälle:
| Spark SQL-Typ | Avro-Typ | Logischer Avro-Typ |
|---|---|---|
| ByteType | INT | |
| ShortType | INT | |
| Binärtyp | Byte | |
| Dezimaltyp | behoben | Decimal |
| Zeitstempeltyp | lang | Zeitstempel-Mikros |
| Datumstyp | INT | Datum |
Sie können auch das gesamte Avro-Ausgabeschema mit der Option avroSchema festlegen, damit Spark SQL-Typen in andere Avro-Typen konvertiert werden können.
Die folgenden Konvertierungen werden standardmäßig nicht angewendet und erfordern ein vom Benutzer angegebenes Avro-Schema:
| Spark SQL-Typ | Avro-Typ | Logischer Avro-Typ |
|---|---|---|
| ByteType | behoben | |
| Zeichenkettentyp | Enumeration | |
| Dezimaltyp | Byte | Decimal |
| Zeitstempeltyp | lang | timestamp-millis |
Beispiele
In diesen Beispielen wird die Datei episodes.avro verwendet.
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Dieses Beispiel veranschaulicht ein benutzerdefiniertes Avro-Schema:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Dieses Beispiel veranschaulicht Avro-Komprimierungsoptionen:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Dieses Beispiel veranschaulicht partitionierte Avro-Datensätze:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Dieses Beispiel veranschaulicht den Datensatznamen und den Namespace:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Um Avro-Daten in SQL abfragen zu können, registrieren Sie die Datendatei als Tabelle oder temporäre Ansicht:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Notebookbeispiel: Lesen und Schreiben von Avro-Dateien
Das folgende Notebook veranschaulicht das Lesen und Schreiben von Avro-Dateien.