Freigeben über


Avro-Datei

Apache Avro ist ein Datenserialisierungssystem. Avro bietet:

  • Umfangreiche Datenstrukturen.
  • Ein kompaktes, schnelles binäres Datenformat.
  • Eine Containerdatei zum Speichern persistenter Daten.
  • Remoteprozeduraufruf (RPC).
  • Einfache Integration in dynamische Sprachen. Die Codegenerierung ist weder zum Lesen oder Schreiben von Datendateien noch zum Verwenden oder Implementieren von RPC-Protokollen erforderlich. Codegenerierung als optionale Optimierung, deren Implementierung nur für statisch typisierte Sprachen sinnvoll ist.

Die Avro-Datenquelle unterstützt Folgendes:

  • Schemakonvertierung: Automatische Konvertierung zwischen Apache Spark SQL- und Avro-Datensätzen.
  • Partitionierung: Einfaches Lesen und Schreiben partitionierter Daten ohne zusätzliche Konfiguration.
  • Komprimierung: Komprimierung, die beim Schreiben von Avro auf den Datenträger verwendet werden soll. Die unterstützten Typen sind uncompressed, snappy und deflate. Sie können auch die Deflate-Ebene angeben.
  • Datensatznamen: Datensatzname und Namespace durch Übergabe einer Zuordnung von Parametern mit recordName und recordNamespace.

Weitere Informationen finden Sie auch unter Lesen und Schreiben von Avro-Streamingdaten.

Konfiguration

Sie können das Verhalten einer Avro-Datenquelle mithilfe verschiedener Konfigurationsparameter ändern.

Um Dateien ohne die Erweiterung .avro beim Lesen zu ignorieren, können Sie den Parameter avro.mapred.ignore.inputs.without.extension in der Hadoop-Konfiguration festlegen. Der Standardwert lautet false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Legen Sie die folgenden Spark-Eigenschaften fest, um die Komprimierung beim Schreiben zu konfigurieren:

  • Komprimierungscodec: spark.sql.avro.compression.codec. Unterstützte Codecs sind snappy und deflate. Der Standardcodec ist snappy.
  • Wenn der Komprimierungscodec deflate ist, können Sie die Komprimierungsebene mit spark.sql.avro.deflate.level festlegen. Die Standardebene ist -1.

Sie können diese Eigenschaften in der Spark-Konfiguration des Clusters oder zur Laufzeit mit spark.conf.set() festlegen. Beispiele:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Für Databricks Runtime 9.1 LTS und höher können Sie das standardmäßige Schemarückschlussverhalten in Avro ändern, indem Sie die Option mergeSchema beim Lesen von Dateien bereitstellen. Wenn Sie mergeSchema auf true festlegen, wird ein Schema aus einem Satz von Avro-Dateien im Zielverzeichnis abgeleitet und zusammengeführt, anstatt das Leseschema aus einer einzelnen Datei abzuleiten.

Unterstützte Typen für Avro –> Spark-SQL-Konvertierung

Diese Bibliothek unterstützt das Lesen aller Avro-Typen. Dabei wird die folgende Zuordnung von Avro-Typen zu Spark-SQL-Typen verwendet:

Avro-Typ Spark SQL-Typ
Boolescher Wert Boolescher Typ
INT Integer-Typ
lang LongType
Schweben FloatType
doppelt DoubleType
Byte Binärtyp
Zeichenfolge Zeichenkettentyp
Datensatz (record) StructType
Enumeration Zeichenkettentyp
Array Array-Typ
Karte MapType
behoben Binärtyp
Vereinigung Siehe Union-Typen.

Union-Datentypen

Die Avro-Datenquelle unterstützt das Lesen von union-Typen. Avro betrachtet die folgenden drei Typen als union-Typen:

  • union(int, long) ist LongType zugeordnet.
  • union(float, double) ist DoubleType zugeordnet.
  • union(something, null), wobei something ein beliebiger unterstützter Avro-Typ ist. Dies wird demselben Spark SQL-Typ zugeordnet wie der von something, wobei nullable auf true festgelegt ist.

Alle anderen union-Typen sind komplexe Typen. Sie werden StructType zugeordnet, wobei die Feldnamen in Übereinstimmung mit den Mitgliedern von member0member1, union usw. lauten. Dies entspricht dem Verhalten beim Konvertieren zwischen Avro und Parquet.

Logische Typen

Die Avro-Datenquelle unterstützt das Lesen der folgenden logischen Avro-Typen:

Logischer Avro-Typ Avro-Typ Spark SQL-Typ
Datum INT Datumstyp
timestamp-millis lang Zeitstempeltyp
Zeitstempel-Mikros lang Zeitstempeltyp
Decimal behoben Dezimaltyp
Decimal Byte Dezimaltyp

Hinweis

Die Avro-Datenquelle ignoriert Dokumente, Aliase und andere Eigenschaften, die in der Avro-Datei vorhanden sind.

Unterstützte Typen für Spark SQL – Konvertierung von > Avro

Diese Bibliothek unterstützt das Schreiben aller Spark SQL-Typen in Avro. Bei den meisten Typen ist die Zuordnung von Spark-Typen zu Avro-Typen unkompliziert (z. B. wird IntegerType in int konvertiert). Im Folgenden finden Sie eine Liste der wenigen Sonderfälle:

Spark SQL-Typ Avro-Typ Logischer Avro-Typ
ByteType INT
ShortType INT
Binärtyp Byte
Dezimaltyp behoben Decimal
Zeitstempeltyp lang Zeitstempel-Mikros
Datumstyp INT Datum

Sie können auch das gesamte Avro-Ausgabeschema mit der Option avroSchema festlegen, damit Spark SQL-Typen in andere Avro-Typen konvertiert werden können. Die folgenden Konvertierungen werden standardmäßig nicht angewendet und erfordern ein vom Benutzer angegebenes Avro-Schema:

Spark SQL-Typ Avro-Typ Logischer Avro-Typ
ByteType behoben
Zeichenkettentyp Enumeration
Dezimaltyp Byte Decimal
Zeitstempeltyp lang timestamp-millis

Beispiele

In diesen Beispielen wird die Datei episodes.avro verwendet.

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Dieses Beispiel veranschaulicht ein benutzerdefiniertes Avro-Schema:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Dieses Beispiel veranschaulicht Avro-Komprimierungsoptionen:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Dieses Beispiel veranschaulicht partitionierte Avro-Datensätze:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Dieses Beispiel veranschaulicht den Datensatznamen und den Namespace:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Um Avro-Daten in SQL abfragen zu können, registrieren Sie die Datendatei als Tabelle oder temporäre Ansicht:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Notebookbeispiel: Lesen und Schreiben von Avro-Dateien

Das folgende Notebook veranschaulicht das Lesen und Schreiben von Avro-Dateien.

Notebook zum Lesen und Schreiben von Avro-Dateien

Notebook abrufen