다음을 통해 공유


Azure Event Hubs를 사용하여 Apache Spark 애플리케이션 연결

이 자습서에서는 실시간 스트리밍을 위해 Spark 애플리케이션을 Event Hubs에 연결하는 방법을 안내합니다. 이 통합을 통해 프로토콜 클라이언트를 변경하거나 고유한 Kafka 또는 Zookeeper 클러스터를 실행할 필요 없이 스트리밍할 수 있습니다. 이 자습서에는 Apache Spark v2.4 이상 및 Apache Kafka v2.0 이상이 필요합니다.

비고

이 샘플은 GitHub에서 사용할 수 있습니다.

이 튜토리얼에서는 다음을 배우게 됩니다:

  • Event Hubs 네임스페이스 만들기
  • 예제 프로젝트 복제
  • Spark 실행
  • Kafka용 Event Hubs에서 읽기
  • Kafka용 Event Hubs에 쓰기

필수 조건

이 자습서를 시작하기 전에 필요한 다음 사항이 있는지 확인하십시오.

비고

Spark-Kafka 어댑터는 Spark v2.4를 기준으로 Kafka v2.0을 지원하도록 업데이트되었습니다. Spark의 이전 릴리스에서 어댑터는 Kafka v0.10 이상을 지원했지만 특히 Kafka v0.10 API에 의존했습니다. Kafka용 Event Hubs는 Kafka v0.10을 지원하지 않으므로 v2.4 이전 버전의 Spark의 Spark-Kafka 어댑터는 Kafka 에코시스템용 Event Hubs에서 지원되지 않습니다.

Event Hubs 네임스페이스 만들기

Event Hubs 서비스를 보내고 받으려면 Event Hubs 네임스페이스가 필요합니다. 네임스페이스 및 이벤트 허브를 만드는 지침은 이벤트 허브 만들기 를 참조하세요. 나중에 사용할 수 있도록 Event Hubs 연결 문자열 및 FQDN(정규화된 도메인 이름)을 가져옵니다. 자세한 내용은 Event Hubs 연결 문자열 가져오기를 참조하세요.

예제 프로젝트 복제

Azure Event Hubs 리포지토리를 복제하고 하위 폴더로 tutorials/spark 이동합니다.

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Kafka용 Event Hubs에서 읽기

몇 가지 구성을 변경하면 Kafka용 Event Hubs에서 읽기를 시작할 수 있습니다. 네임스페이스의 세부 정보로 BOOTSTRAP_SERVERS 업데이트하고 EH_SASL Kafka와 마찬가지로 Event Hubs로 스트리밍을 시작할 수 있습니다. 전체 샘플 코드는 GitHub의 sparkConsumer.scala 파일을 참조하세요.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

다음 오류와 유사한 오류가 표시되면 호출에 .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")를 추가한 후 spark.readStream를 다시 시도합니다.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Kafka용 Event Hubs에 쓰기

Kafka에 쓰는 것과 동일한 방식으로 Event Hubs에 쓸 수도 있습니다. Event Hubs 네임스페이스의 정보로 BOOTSTRAP_SERVERS 변경하고 EH_SASL 구성을 업데이트하는 것을 잊지 마세요. 전체 샘플 코드는 GitHub의 sparkProducer.scala 파일을 참조하세요.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

다음 단계

Kafka용 Event Hubs 및 Event Hubs에 대해 자세한 정보를 원하시면 다음 문서를 참조하세요.