메모
이 문서에서는 Databricks Runtime 14.1 이상용 Databricks Connect에 대해 설명합니다.
Scala용 Databricks Connect 는 로컬 개발 환경의 Databricks 클러스터에서 UDF(사용자 정의 함수) 실행을 지원합니다.
이 페이지에서는 Scala용 Databricks Connect를 사용하여 사용자 정의 함수를 실행하는 방법을 설명합니다.
Python 버전의 이 문서에 대해서는 Python용 Databricks Connect의 사용자 정의 함수을 참조하세요.
컴파일된 클래스 및 JAR 업로드
UDF가 작동하려면 API를 사용하여 컴파일된 클래스 및 JAR을 클러스터에 addCompiledArtifacts() 업로드해야 합니다.
메모
클라이언트에서 사용하는 Scala는 Azure Databricks 클러스터의 Scala 버전과 일치해야 합니다. 클러스터의 Scala 버전을 확인하려면 Databricks 런타임 릴리스 정보 버전 및 호환성에서 클러스터의 Databricks 런타임 버전에 대한 "시스템 환경" 섹션을 참조하세요.
다음 Scala 프로그램은 열의 값을 제곱하는 간단한 UDF를 설정합니다.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val spark = getSession()
val squared = udf((x: Long) => x * x)
spark.range(3)
.withColumn("squared", squared(col("id")))
.select("squared")
.show()
}
}
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
// On a Databricks cluster — reuse the active session
SparkSession.active
} else {
// Locally with Databricks Connect — upload local JARs and classes
DatabricksSession
.builder()
.addCompiledArtifacts(
Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
)
.getOrCreate()
}
}
}
Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI 는 프로젝트의 컴파일된 출력(예: 대상/클래스 또는 빌드된 JAR)과 동일한 위치를 가리킵니다. 컴파일된 모든 클래스는 단지 Main뿐만 아니라 Databricks에 업로드됩니다.
target/scala-2.13/classes/
├── com/
│ ├── examples/
│ │ ├── Main.class
│ │ └── MyUdfs.class
│ └── utils/
│ └── Helper.class
Spark 세션이 이미 초기화되면 spark.addArtifact() API를 사용하여 추가로 컴파일된 클래스 및 JAR을 업로드할 수 있습니다.
메모
JAR을 업로드할 때 모든 전이적 종속성 JAR을 포함해야 합니다. API는 전이적 종속성의 자동 검색을 수행하지 않습니다.
타사 종속성이 있는 UDF
Maven 종속성을 build.sbt에 추가했지만 UDF에서 사용되고 Databricks 클러스터에서 사용할 수 없는 경우, 예를 들어:
// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
// In your code
import org.apache.commons.text.StringEscapeUtils
// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
StringEscapeUtils.escapeHtml4(text)
})
spark.addArtifact() 및 ivy://를 사용하여 Maven에서 종속성을 다운로드합니다.
라이브러리
oro을build.sbt파일에 추가libraryDependencies ++= Seq( "org.apache.commons" % "commons-text" % "1.10.0" % Provided, "oro" % "oro" % "2.0.8" // Required for ivy:// to work )API를 사용하여 세션을 만든 후 아티팩트를
addArtifact()추가합니다.def getSession(): SparkSession = { if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) { SparkSession.active } else { val spark = DatabricksSession.builder() .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI) .getOrCreate() // Convert Maven coordinates to ivy:// format // From: "org.apache.commons" % "commons-text" % "1.10.0" // To: ivy://org.apache.commons:commons-text:1.10.0 spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0") spark } }
형식화된 데이터 세트 API
형식화된 데이터 세트 API를 사용하면 결과 데이터 세트에 대한 집계와 같은 map()filter()mapPartitions()변환을 실행할 수 있습니다. API를 사용하여 addCompiledArtifacts() 컴파일된 클래스 및 JAR을 클러스터에 업로드하는 것도 이러한 클래스에 적용되므로 코드가 실행되는 위치에 따라 다르게 동작해야 합니다.
- Databricks Connect를 사용한 로컬 개발: 원격 클러스터에 아티팩트 업로드
- 클러스터에서 실행되는 Databricks에 배포됨: 클래스가 이미 있으므로 아무것도 업로드할 필요가 없습니다.
다음 Scala 애플리케이션은 map() API를 사용하여 결과 열의 숫자를 접두사 문자열로 수정합니다.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
외부 JAR 종속성
클러스터에 없는 프라이빗 또는 타사 라이브러리를 사용하는 경우:
import com.mycompany.privatelib.DataProcessor
// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
DataProcessor.process(data)
})
세션을 만들 때 lib/ 폴더에서 외부 JAR 파일을 업로드하세요.
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
val builder = DatabricksSession.builder()
.addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
// Add all JARs from lib/ folder
val libFolder = new java.io.File("lib")
builder.addCompiledArtifacts(libFolder.toURI)
builder.getOrCreate()
}
}
그러면 로컬로 실행할 때 lib/디렉터리의 모든 JAR이 Databricks에 자동으로 업로드됩니다.
여러 모듈이 있는 프로젝트
다중 모듈 SBT 프로젝트에서 getClass.getProtectionDomain.getCodeSource.getLocation.toURI 는 현재 모듈의 위치만 반환합니다. UDF가 다른 모듈의 클래스를 사용하면 ClassNotFoundException를 받습니다.
my-project/
├── module-a/ (main application)
├── module-b/ (utilities - module-a depends on this)
각 모듈의 클래스에서 모든 getClass 위치를 가져와서 별도로 업로드합니다.
// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor // From module-b
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
// Get location using a class FROM module-a
val moduleALocation = Main.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI
// Get location using a class FROM module-b
val moduleBLocation = DataProcessor.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI
DatabricksSession.builder()
.addCompiledArtifacts(moduleALocation) // Upload module-a
.addCompiledArtifacts(moduleBLocation) // Upload module-b
.getOrCreate()
}
}