Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Apache Avro é um sistema de serialização de dados. Avro fornece:
- Estruturas de dados avançadas.
- Um formato de dados compacto, rápido e binário.
- Um arquivo contêiner, para armazenar dados persistentes.
- Chamada de procedimento remoto (RPC).
- Integração simples com linguagens dinâmicas. A geração de código não é necessária para ler ou gravar arquivos de dados nem para usar ou implementar protocolos RPC. Geração de código como uma otimização opcional, só vale a pena implementar para linguagens tipadas estaticamente.
A fonte de dados Avro suporta:
- Conversão de esquema: Conversão automática entre registros Apache Spark SQL e Avro.
- Particionamento: Facilmente ler e gravar dados particionados sem qualquer configuração extra.
- Compressão: Método de compressão a usar ao gravar Avro no disco. Os tipos suportados são
uncompressed,snappyedeflate. Você também pode especificar o nível de deflação. - Registar nomes: Registe o nome e o namespace passando um mapa de parâmetros com
recordNameerecordNamespace.
Consulte também Ler e gravar dados de streaming Avro.
Configuração
Você pode alterar o comportamento de uma fonte de dados Avro usando vários parâmetros de configuração.
Para ignorar ficheiros sem a extensão .avro durante a leitura, pode-se definir o parâmetro avro.mapred.ignore.inputs.without.extension na configuração do Hadoop. A predefinição é false.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Para configurar a compactação durante a gravação, defina as seguintes propriedades do Spark:
- Codec de compressão:
spark.sql.avro.compression.codec. Os codecs suportados sãosnappyedeflate. O codec padrão ésnappy. - Se o codec de compressão for
deflate, você pode definir o nível de compactação com:spark.sql.avro.deflate.level. O nível padrão é-1.
Você pode definir essas propriedades na configuração do cluster Spark ou em tempo de execução usando spark.conf.set(). Por exemplo:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Para o Databricks Runtime 9.1 LTS e superiores, pode-se alterar o comportamento de inferência de esquema padrão no Avro ao fornecer a opção mergeSchema ao ler ficheiros. Definir mergeSchema como true irá inferir um esquema de um conjunto de arquivos Avro no diretório de destino e mesclá-los em vez de inferir o esquema de leitura a partir de um único arquivo.
Tipos suportados para a conversão de Avro em Spark SQL
Esta biblioteca suporta a leitura de todos os tipos Avro. Ele usa o seguinte mapeamento de tipos Avro para tipos Spark SQL:
| Tipo Avro | Tipo Spark SQL |
|---|---|
| Booleano | Tipo Booleano |
| número inteiro | Tipo inteiro |
| longo | Tipo Longo |
| float | Tipo de flutuação |
| duplo | Tipo Duplo |
| Bytes | TipoBinário |
| cadeia (de caracteres) | TipoString |
| registo | Tipo de estrutura |
| enumeração | TipoString |
| matriz | Tipo de matriz |
| mapa | Tipo de mapa |
| fixo | TipoBinário |
| união | Consulte Tipos de União. |
Tipos de União
A fonte de dados Avro suporta a leitura de tipos union. A Avro considera os seguintes três tipos como tipos union :
-
union(int, long)corresponde aLongType. -
union(float, double)corresponde aDoubleType. -
union(something, null), ondesomethingé qualquer tipo Avro suportado. Isso mapeia para o mesmo tipo de SQL do Spark que o desomething, comnullableconfigurado comotrue.
Todos os outros union tipos são tipos complexos. Eles são mapeados para StructType, onde os nomes de campo são member0, member1, e assim por diante, em conformidade com os membros do union. Isso é consistente com o comportamento ao converter entre Avro e Parquet.
Tipos lógicos
A fonte de dados Avro suporta a leitura dos seguintes tipos lógicos Avro:
| Tipo lógico Avro | Tipo Avro | Tipo Spark SQL |
|---|---|---|
| data | número inteiro | Tipo de Data |
| timestamp-millis | longo | Tipo de data/hora |
| carimbo de tempo em microssegundos | longo | Tipo de data/hora |
| decimal | fixo | Tipo decimal |
| decimal | Bytes | Tipo decimal |
Nota
A fonte de dados Avro ignora documentos, aliases e outras propriedades presentes no arquivo Avro.
Tipos suportados para Spark SQL -> Conversão Avro
Esta biblioteca suporta a escrita de todos os tipos de Spark SQL no Avro. Para a maioria dos tipos, o mapeamento de tipos Spark para tipos Avro é simples (por exemplo IntegerType , é convertido em int); a seguir está uma lista dos poucos casos especiais:
| Tipo Spark SQL | Tipo Avro | Tipo lógico Avro |
|---|---|---|
| Tipo de Byte | número inteiro | |
| Tipo abreviado | número inteiro | |
| TipoBinário | Bytes | |
| Tipo decimal | fixo | decimal |
| Tipo de data/hora | longo | carimbo de tempo em microssegundos |
| Tipo de Data | número inteiro | data |
Você também pode especificar todo o esquema Avro de saída com a opção avroSchema, para que os tipos Spark SQL possam ser convertidos em outros tipos Avro.
As conversões a seguir não são aplicadas por padrão e exigem o esquema Avro especificado pelo usuário:
| Tipo Spark SQL | Tipo Avro | Tipo lógico Avro |
|---|---|---|
| Tipo de Byte | fixo | |
| TipoString | enumeração | |
| Tipo decimal | Bytes | decimal |
| Tipo de data/hora | longo | timestamp-millis |
Exemplos
Esses exemplos usam o arquivo episodes.avro .
linguagem de programação Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Este exemplo demonstra um esquema Avro personalizado:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Este exemplo demonstra as opções de compactação Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Este exemplo demonstra registros Avro particionados:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Este exemplo demonstra o nome do registro e o namespace:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Para consultar dados Avro em SQL, registre o arquivo de dados como uma tabela ou exibição temporária:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Exemplo de bloco de notas: ler e escrever ficheiros Avro
O bloco de anotações a seguir demonstra como ler e gravar arquivos Avro.