구조적 스트리밍은 Spark SQL 엔진을 기반으로 제작된 확장성 있는 내결함성 스트림 처리 엔진입니다. Spark는 데이터가 계속 도착함에 따라 스트리밍 작업을 증분 및 지속적으로 실행합니다.
구조적 스트리밍은 Spark 2.2에서 사용할 수 있게 되었습니다. 그 이후로 데이터 스트리밍에 권장되는 접근 방식이 되었습니다. 구조적 스트림의 기본 원칙은 라이브 데이터 스트림을 테이블의 새 행처럼 항상 지속적으로 추가되는 테이블로 처리하는 것입니다. CSV, JSON, ORC, Parquet 및 Kafka 및 Event Hubs와 같은 메시징 서비스에 대한 기본 제공 지원과 같은 몇 가지 정의된 기본 제공 스트리밍 파일 원본이 있습니다.
이 문서에서는 처리량이 높은 프로덕션 환경에서 Spark 구조적 스트리밍을 통해 이벤트의 처리 및 수집을 최적화하는 방법에 대한 인사이트를 제공합니다. 제안된 방법은 다음과 같습니다.
- 데이터 스트리밍 처리량 최적화
- 델타 테이블에서의 쓰기 작업 최적화
- 이벤트 일괄 처리
Spark 작업 정의 및 Spark Notebook
Spark Notebook은 아이디어의 유효성을 검사하고 데이터 또는 코드에서 인사이트를 얻기 위한 실험을 수행하기 위한 훌륭한 도구입니다. Notebook은 데이터 준비, 데이터 시각화, 기계 학습 및 기타 빅 데이터 시나리오에서도 널리 사용됩니다. Spark 작업 정의는 오랜 기간 동안 Spark 클러스터에서 실행되는 비대화형 코드 지향 작업입니다. Spark 작업 정의는 견고성과 가용성을 제공합니다.
Spark Notebook은 코드의 논리를 테스트하고 모든 비즈니스 요구 사항을 해결하는 데 탁월한 소스입니다. 그러나 프로덕션 시나리오에서 계속 실행하려면 재시도 정책을 사용하도록 설정된 Spark 작업 정의가 가장 좋은 솔루션입니다.
Spark 작업 정의에 대한 재시도 정책
Microsoft Fabric에서 사용자는 Spark 작업 정의 작업에 대한 재시도 정책을 설정할 수 있습니다. 작업의 스크립트는 무한할 수 있지만 스크립트를 실행하는 인프라에 작업을 중지해야 하는 문제가 발생할 수 있습니다. 또는 기본 인프라 패치 요구 사항으로 인해 작업이 제거될 수 있습니다. 재시도 정책을 사용하면 기본 문제로 인해 작업이 중지되는 경우 작업을 자동으로 다시 시작하는 규칙을 설정할 수 있습니다. 매개 변수는 작업을 다시 시작하는 빈도, 최대 무한 재시도, 재시도 사이의 시간 설정을 지정합니다. 이렇게 하면 사용자가 Spark 작업 정의 작업을 중지하기로 결정할 때까지 Spark 작업 정의 작업이 무한히 계속 실행되도록 할 수 있습니다.
스트리밍 원본
Event Hubs를 사용하여 스트리밍을 설정하려면 Event Hubs 네임스페이스 이름, 허브 이름, 공유 액세스 키 이름 및 소비자 그룹을 포함하는 기본 구성이 필요합니다. 소비자 그룹은 전체 이벤트 허브의 관점입니다. 이를 통해 여러 소비 애플리케이션이 이벤트 스트림에 대한 별도의 보기를 가지며 자체 속도와 오프셋을 사용하여 독립적으로 스트림을 읽을 수 있습니다.
파티션은 대량의 데이터를 처리할 수 있는 필수적인 부분입니다. 단일 프로세서는 초당 이벤트를 처리할 수 있는 용량이 제한되어 있지만 여러 프로세서는 병렬로 실행될 때 더 나은 작업을 수행할 수 있습니다. 파티션을 사용하면 대량의 이벤트를 병렬로 처리할 수 있습니다.
너무 많은 파티션이 낮은 수집 속도로 사용되는 경우 파티션 판독기는 이 데이터의 작은 부분을 처리하므로 최적이 아닌 처리가 발생합니다. 이상적인 파티션 수는 원하는 처리 속도에 따라 직접 달라집니다. 이벤트 처리 크기를 조정하려면 파티션을 더 추가하는 것이 좋습니다. 파티션에는 특정 처리량 제한이 없습니다. 그러나 네임스페이스의 집계 처리량은 처리량 단위 수로 제한됩니다. 네임스페이스의 처리량 단위 수를 늘리면 동시 판독기가 최대 처리량을 달성할 수 있도록 추가 파티션이 필요할 수 있습니다.
처리량 시나리오에 가장 적합한 파티션 수를 조사하고 테스트하는 것이 좋습니다. 그러나 32개 이상의 파티션을 사용하여 처리량이 높은 시나리오를 보는 것이 일반적입니다.
Spark 애플리케이션을 Azure Event Hubs에 연결하려면 Apache Spark용 Azure Event Hubs 커넥터(azure-event-hubs-spark)를 사용하는 것이 좋습니다.
Lakehouse를 스트리밍 싱크로 사용하기
Delta Lake는 빅 데이터 워크로드에 ACID(원자성, 일관성, 격리 및 내구성) 트랜잭션을 제공하는 오픈 소스 스토리지 레이어입니다. Delta Lake는 확장 가능한 메타데이터 처리, 스키마 진화, 시간 이동(데이터 버전 관리), 개방형 형식 및 기타 기능도 지원합니다.
Fabric 데이터 엔지니어에서는 Delta Lake를 사용하여 다음을 수행합니다.
- Spark SQL을 사용하여 데이터를 쉽게 업서트(삽입/업데이트)하고 삭제합니다.
- 데이터를 압축하여 데이터를 쿼리하는 데 소요된 시간을 최소화합니다.
- 작업이 실행되기 전과 후에 테이블의 상태를 봅니다.
- 테이블에서 수행된 작업의 기록을 검색합니다.
Delta는 writeStream에 사용되는 가능한 출력 싱크 형식 중 하나로 추가됩니다. 기존 출력 싱크에 대한 자세한 정보는 Spark 구조적 스트리밍 프로그래밍 가이드를 참조하세요.
다음 예제에서는 데이터를 Delta Lake로 스트리밍하는 방법을 보여 줍니다.
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
Schema = StructType([StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True)])
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.toTable("deltaeventstable")
예제에 있는 코드의 일부에 대한 정보:
- format() 은 데이터의 출력 형식을 정의하는 명령입니다.
- outputMode() 는 스트리밍의 새 행(즉, 추가, 덮어쓰기)을 작성하는 방법을 정의합니다.
- toTable()은 스트리밍된 데이터를 매개변수로 전달된 값을 사용하여 생성된 Delta 테이블에 유지합니다.
Delta 쓰기 최적화
데이터 분할은 강력한 스트리밍 솔루션을 만드는 데 중요한 부분입니다. 분할은 데이터 구성 방식을 개선하고 처리량도 향상시킵니다. Delta 작업 후 파일이 쉽게 조각화되어 너무 많은 작은 파일이 생성됩니다. 디스크에 파일을 쓰는 데 시간이 오래 걸리기 때문에 너무 큰 파일도 문제입니다. 데이터 분할의 과제는 최적의 파일 크기를 생성하는 적절한 균형을 찾는 것입니다. Spark는 메모리 및 디스크의 분할을 지원합니다. 올바르게 분할된 데이터는 Delta Lake에 데이터를 보관하고 Delta Lake에서 데이터를 쿼리할 때 최상의 성능을 제공할 수 있습니다.
- 디스크에서 데이터를 분할할 때 partitionBy()를 사용하여 열을 기준으로 데이터를 분할하는 방법을 선택할 수 있습니다. partitionBy() 는 디스크에 쓰는 동안 제공된 하나 이상의 열을 기반으로 큰 의미 체계 모델을 더 작은 파일로 분할하는 데 사용되는 함수입니다. 분할은 대규모 의미 체계 모델로 작업할 때 쿼리 성능을 향상시키는 방법입니다. 너무 작거나 너무 큰 파티션을 생성하는 열을 선택하지 마십시오. 카디널리티가 좋은 열 집합을 기반으로 파티션을 정의하고 데이터를 최적의 크기의 파일로 분할합니다.
- 메모리의 데이터 분할은 repartition() 또는 coalesce() 변환을 사용하여 수행하고, 여러 작업자 노드에 데이터를 배포하고, RDD(복원 분산 데이터 세트)의 기본 사항을 사용하여 데이터를 병렬로 읽고 처리할 수 있는 여러 작업을 만들 수 있습니다. 이를 통해 의미 체계 모델을 논리 파티션으로 나눌 수 있으며 클러스터의 다른 노드에서 계산할 수 있습니다.
- repartition() 은 메모리의 파티션 수를 늘리거나 줄이는 데 사용됩니다. 리파티션은 네트워크를 통해 전체 데이터를 다시 분산하여 모든 파티션에 균등하게 분배합니다.
- coalesce() 는 파티션 수를 효율적으로 줄이는 데만 사용됩니다. 이는 모든 파티션에서 데이터 이동이 coalesce()를 사용하여 더 낮은 수준으로 이루어지는 최적화된 버전의 repartition()입니다.
두 분할 방법을 결합하는 것은 처리량이 높은 시나리오에서 좋은 솔루션입니다. repartition() 은 메모리에 특정 수의 파티션을 만들고 partitionBy() 는 각 메모리 파티션 및 분할 열에 대한 파일을 디스크에 씁니다. 다음 예제에서는 동일한 Spark 작업에서 두 분할 전략을 모두 사용하는 방법을 보여 줍니다. 데이터는 먼저 메모리의 48개 파티션(총 48개의 CPU 코어가 있다고 가정)으로 분할된 다음 페이로드의 두 기존 열을 기반으로 디스크에 분할됩니다.
import pyspark.sql.functions as f
from pyspark.sql.types import *
import json
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
최적화된 쓰기
Delta Lake에 쓰기를 최적화하는 또 다른 옵션은 최적화된 쓰기를 사용하는 것입니다. 최적화된 쓰기는 Delta 테이블에 데이터를 쓰는 방식을 개선하는 선택적 기능입니다. Spark는 데이터를 쓰기 전에 파티션을 병합하거나 분할하여 디스크에 기록되는 데이터의 처리량을 최대화합니다. 그러나 전체 셔플이 발생하므로 일부 워크로드의 경우 성능 저하가 발생할 수 있습니다. 디스크의 데이터를 분할하기 위해 coalesce() 및/또는 repartition()을 사용하는 작업을 리팩터링하여 최적화된 쓰기를 대신 사용할 수 있습니다.
다음 코드는 최적화된 쓰기를 사용하는 예제입니다. partitionBy()는 여전히 사용됩니다.
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", true)
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
이벤트 일괄 처리
Delta Lake로 데이터를 수집하는 데 소요되는 시간을 개선하기 위해 작업 수를 최소화하기 위해 이벤트를 일괄 처리하는 것이 실용적인 대안입니다.
트리거는 스트리밍 쿼리를 실행(트리거)하고 새 데이터를 내보내는 빈도를 정의합니다. 이를 설정하면 항상 디스크에 기록하는 대신, 데이터를 모으고 이벤트를 그룹화하여 몇 가지 지속적인 작업이 아닌, 임시적으로 실행되는 마이크로배치의 주기적인 처리 시간 간격을 정의합니다.
다음 예제에서는 이벤트가 1분 간격으로 주기적으로 처리되는 스트리밍 쿼리를 보여 줍니다.
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.trigger(processingTime="1 minute") \
.toTable("deltaeventstable")
Delta 테이블 쓰기 작업에서 이벤트 일괄 처리를 결합하면 더 많은 데이터가 포함된 더 큰 Delta 파일을 만들어 작은 파일을 피할 수 있다는 장점이 있습니다. 수집되는 데이터의 양을 분석하고 Delta 라이브러리에서 만든 Parquet 파일의 크기를 최적화하는 가장 적합한 처리 시간을 찾아야 합니다.
모니터링
Spark 3.1 이상 버전에는 다음 스트리밍 메트릭을 포함하는 기본 제공 구조적 스트리밍 UI 가 있습니다.
- 입력 속도
- 프로세스 속도
- 입력 행
- 일괄 처리 기간
- 작업 기간
관련 콘텐츠
- 스트리밍 데이터를 Lakehouse 로 가져와 SQL 분석 엔드포인트를 사용하여 액세스합니다.