Apache Avro 는 데이터 직렬화 시스템입니다. Avro는 다음을 제공합니다.
- 풍부한 데이터 구조.
- 작고 빠른 이진 데이터 형식입니다.
- 영구 데이터를 저장하기 위한 컨테이너 파일입니다.
- RPC(원격 프로시저 호출)입니다.
- 동적 언어와의 간단한 통합. 데이터 파일을 읽거나 쓰거나 RPC 프로토콜을 사용하거나 구현하기 위해 코드 생성이 필요하지 않습니다. 선택적인 최적화로서의 코드 생성은 정적으로 유형이 지정된 언어에 대해서만 구현할 가치가 있습니다.
Avro 데이터 원본은 다음을 지원합니다.
- 스키마 변환: Apache Spark SQL과 Avro 레코드 간의 자동 변환.
- 분할: 추가 구성 없이 파티션된 데이터를 쉽게 읽고 쓸 수 있습니다.
- 압축: Avro를 디스크에 쓸 때 사용할 압축입니다. 지원되는 형식은
uncompressed,snappy및deflate입니다. 수축 수준을 지정할 수도 있습니다. - 레코드 이름:
recordName및recordNamespace를 사용하여 매개 변수 맵을 전달하여 이름 및 네임스페이스를 기록합니다.
스트리밍 Avro 데이터 읽기 및 쓰기도 참조하세요.
구성
다양한 구성 매개 변수를 사용하여 Avro 데이터 원본의 동작을 변경할 수 있습니다.
읽을 때 .avro 확장자가 없는 파일을 무시하려면 Hadoop 구성에서 매개 변수 avro.mapred.ignore.inputs.without.extension를 설정할 수 있습니다. 기본값은 false입니다.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
작성 시 압축을 구성하려면 다음 Spark 속성을 설정합니다.
- 압축 코덱:
spark.sql.avro.compression.codec. 지원되는 코덱은snappy및deflate입니다. 기본 코덱은snappy입니다. - 압축 코덱이
deflate인 경우spark.sql.avro.deflate.level을 사용하여 압축 수준을 설정할 수 있습니다. 기본 수준은-1입니다.
클러스터 Spark 구성 또는 런타임 시 .를 사용하여 spark.conf.set()이러한 속성을 설정할 수 있습니다. 예시:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Databricks Runtime 9.1 LTS 이상의 경우 파일을 읽을 때 옵션을 제공하여 Avro에서 기본 스키마 유추 동작을 mergeSchema 변경할 수 있습니다.
mergeSchema를 true로 설정하면 단일 파일에서 읽기 스키마를 유추하는 대신 대상 디렉터리의 Avro 파일 집합에서 스키마를 유추하고 병합합니다.
Avro -> Spark SQL 변환에 지원되는 유형
이 라이브러리는 모든 Avro 유형 읽기를 지원합니다. Avro 유형에서 Spark SQL 형식으로의 다음 매핑을 사용합니다.
| Avro 유형 | Spark SQL 형식 |
|---|---|
| 부울형 | 불린 타입 |
| 정수 (int) | 인터저타입 |
| 긴 | 롱타입 |
| 부동 소수점 | 플롯타입 |
| 두 배 | 더블타입 |
| 바이트 | 바이너리 타입 |
| 문자열 | 문자열 타입 |
| 녹화 | 구조형 타입 |
| enum | 문자열 타입 |
| 배열 | 배열 유형 |
| 지도 | 맵타입 |
| 고정된 | 바이너리 타입 |
| 노조 | 유니언 타입을 참조하세요. |
유니언 타입
Avro 데이터 원본은 읽기 union 유형을 지원합니다. Avro는 다음 세 가지 유형을 union 유형으로 간주합니다.
-
union(int, long)는LongType에 매핑됩니다. -
union(float, double)는DoubleType에 매핑됩니다. -
union(something, null), 여기서something은 지원되는 Avro 유형입니다. 이는something과 동일한 Spark SQL 형식에 매핑되며nullable은true로 설정됩니다.
다른 모든 union 유형은 복합 형식입니다. 필드 이름이 StructType, member0 등인 항목을 member1의 멤버에 따라 union에 매핑합니다. 이는 Avro와 Parquet 간에 변환할 때의 동작과 일치합니다.
논리적 형식
Avro 데이터 원본은 다음 Avro 논리 형식 읽기를 지원합니다.
| Avro 논리 유형 | Avro 유형 | Spark SQL 형식 |
|---|---|---|
| 날짜 | 정수 (int) | 데이트타입 |
| timestamp-millis (시간 표시 - 밀리초) | 긴 | 타임스탬프 타입 |
| 마이크로초 타임스탬프 | 긴 | 타임스탬프 타입 |
| 십진법 | 고정된 | 데시말타입 |
| 십진법 | 바이트 | 데시말타입 |
참고
Avro 데이터 원본은 Avro 파일에 있는 문서, 별칭 및 기타 속성을 무시합니다.
Spark SQL에 지원되는 유형 -> Avro 변환
이 라이브러리는 Avro에 모든 Spark SQL 형식 쓰기를 지원합니다. 대부분의 유형에서 Spark 유형에서 Avro 유형으로의 매핑은 간단합니다(예: IntegerType은 int로 변환됨). 다음은 몇 가지 특별한 경우의 목록입니다.
| Spark SQL 형식 | Avro 유형 | Avro 논리 유형 |
|---|---|---|
| 바이트타입 | 정수 (int) | |
| 쇼트타입 | 정수 (int) | |
| 바이너리 타입 | 바이트 | |
| 데시말타입 | 고정된 | 십진법 |
| 타임스탬프 타입 | 긴 | 마이크로초 타임스탬프 |
| 데이트타입 | 정수 (int) | 날짜 |
Spark SQL 형식을 다른 Avro 유형으로 변환할 수 있도록 avroSchema 옵션을 사용하여 전체 출력 Avro 스키마를 지정할 수도 있습니다.
다음 변환은 기본적으로 적용되지 않으며 사용자 지정 Avro 스키마가 필요합니다.
| Spark SQL 형식 | Avro 유형 | Avro 논리 유형 |
|---|---|---|
| 바이트타입 | 고정된 | |
| 문자열 타입 | enum | |
| 데시말타입 | 바이트 | 십진법 |
| 타임스탬프 타입 | 긴 | timestamp-millis (시간 표시 - 밀리초) |
예제
이러한 예제에서는 episodes.avro 파일을 사용합니다.
스칼라
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
이 예에서는 사용자 지정 Avro 스키마를 보여 줍니다.
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
이 예에서는 Avro 압축 옵션을 보여 줍니다.
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
이 예는 분할된 Avro 레코드를 보여 줍니다.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
이 예에서는 레코드 이름과 네임스페이스를 보여 줍니다.
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
파이썬
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
SQL에서 Avro 데이터를 쿼리하려면 데이터 파일을 테이블 또는 임시 보기로 등록합니다.
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Notebook 예제: Avro 파일 읽기 및 쓰기
다음 Notebook은 Avro 파일을 읽고 쓰는 방법을 보여 줍니다.