Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Nota
W tym artykule opisano program Databricks Connect dla środowiska Databricks Runtime 14.1 lub nowszego.
Program Databricks Connect dla języka Scala obsługuje uruchamianie funkcji zdefiniowanych przez użytkownika (UDF) w klastrach usługi Databricks z lokalnego środowiska deweloperskiego.
Na tej stronie opisano sposób wykonywania funkcji zdefiniowanych przez użytkownika za pomocą programu Databricks Connect dla języka Scala.
Aby uzyskać wersję tego artykułu w języku Python, zobacz funkcje zdefiniowane przez użytkownika w programie Databricks Connect dla języka Python.
Przekazywanie skompilowanej klasy i JAR-ów
Aby funkcje zdefiniowane przez użytkownika (UDF) działały, skompilowane klasy oraz pliki JAR muszą zostać przekazane do klastra przy użyciu interfejsu API addCompiledArtifacts().
Nota
Język Scala używany przez klienta musi być zgodny z wersją języka Scala w klastrze usługi Azure Databricks. Aby sprawdzić wersję Scala klastra, zobacz sekcję "Środowisko systemowe" dla wersji środowiska uruchomieniowego Databricks Runtime klastra w uwagach o wersji i zgodności Databricks Runtime.
Poniższy program Scala konfiguruje prostą funkcję użytkownika (UDF), która kwadratuje wartości w kolumnie.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val spark = getSession()
val squared = udf((x: Long) => x * x)
spark.range(3)
.withColumn("squared", squared(col("id")))
.select("squared")
.show()
}
}
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
// On a Databricks cluster — reuse the active session
SparkSession.active
} else {
// Locally with Databricks Connect — upload local JARs and classes
DatabricksSession
.builder()
.addCompiledArtifacts(
Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
)
.getOrCreate()
}
}
}
Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI wskazuje tę samą lokalizację co skompilowane dane wyjściowe projektu (na przykład target/classes lub skompilowany plik JAR). Wszystkie skompilowane klasy są przekazywane do usługi Databricks, a nie tylko Main.
target/scala-2.13/classes/
├── com/
│ ├── examples/
│ │ ├── Main.class
│ │ └── MyUdfs.class
│ └── utils/
│ └── Helper.class
Po zainicjowaniu sesji Spark można przesyłać kolejne skompilowane klasy i pliki JAR przy użyciu interfejsu API spark.addArtifact().
Nota
Podczas przesyłania plików JAR wszystkie przechodnie zależności JAR muszą być dołączone do załadowania. Interfejsy API nie wykonują żadnego automatycznego wykrywania zależności przechodnich.
Funkcje definiowane przez użytkownika z zależnościami zewnętrznymi
Jeśli dodasz zależność Maven build.sbt, która jest używana w funkcji zdefiniowanej przez użytkownika, ale nie jest dostępna w klastrze usługi Databricks, na przykład:
// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
// In your code
import org.apache.commons.text.StringEscapeUtils
// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
StringEscapeUtils.escapeHtml4(text)
})
Użyj spark.addArtifact() wraz z ivy:// do pobierania zależności z Maven.
Dodawanie biblioteki
orodobuild.sbtplikulibraryDependencies ++= Seq( "org.apache.commons" % "commons-text" % "1.10.0" % Provided, "oro" % "oro" % "2.0.8" // Required for ivy:// to work )Dodaj artefakt po utworzeniu sesji za pomocą interfejsu
addArtifact()API:def getSession(): SparkSession = { if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) { SparkSession.active } else { val spark = DatabricksSession.builder() .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI) .getOrCreate() // Convert Maven coordinates to ivy:// format // From: "org.apache.commons" % "commons-text" % "1.10.0" // To: ivy://org.apache.commons:commons-text:1.10.0 spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0") spark } }
Typizowane interfejsy API zestawu danych
Typizowane interfejsy API zestawu danych umożliwiają uruchamianie przekształceń, takich jak map(), filter(), mapPartitions()i agregacje dla wynikowych zestawów danych. Przekazywanie skompilowanej klasy i plików JAR do klastra za pomocą API addCompiledArtifacts() ma także zastosowanie do tych przypadków, więc kod musi działać inaczej w zależności od miejsca uruchomienia.
- Programowanie lokalne za pomocą usługi Databricks Connect: przekazywanie artefaktów do klastra zdalnego.
- Wdrożone na platformie Databricks działające na klastrze: nie ma potrzeby przesyłania czegokolwiek, ponieważ klasy są już dostępne.
Poniższa aplikacja Scala używa interfejsu API map() do modyfikowania liczby w kolumnie wynikowej na prefiksowany ciąg.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Zewnętrzne zależności JAR
Jeśli używasz prywatnej lub innej biblioteki, która nie znajduje się w klastrze:
import com.mycompany.privatelib.DataProcessor
// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
DataProcessor.process(data)
})
Załaduj zewnętrzne pliki JAR z folderu lib/ podczas tworzenia sesji.
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
val builder = DatabricksSession.builder()
.addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
// Add all JARs from lib/ folder
val libFolder = new java.io.File("lib")
builder.addCompiledArtifacts(libFolder.toURI)
builder.getOrCreate()
}
}
Spowoduje to automatyczne przekazanie wszystkich jednostek JAR w katalogu lib/do usługi Databricks podczas uruchamiania lokalnego.
Projekty z wieloma modułami
W projekcie SBT z wieloma modułami, getClass.getProtectionDomain.getCodeSource.getLocation.toURI zwraca wyłącznie lokalizację aktualnego modułu. Jeśli funkcja UDF używa klas z innych modułów, uzyskasz wartość ClassNotFoundException.
my-project/
├── module-a/ (main application)
├── module-b/ (utilities - module-a depends on this)
Użyj getClass z klasy w każdym module, aby uzyskać ich lokalizacje i przesłać je oddzielnie.
// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor // From module-b
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
// Get location using a class FROM module-a
val moduleALocation = Main.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI
// Get location using a class FROM module-b
val moduleBLocation = DataProcessor.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI
DatabricksSession.builder()
.addCompiledArtifacts(moduleALocation) // Upload module-a
.addCompiledArtifacts(moduleBLocation) // Upload module-b
.getOrCreate()
}
}