Partager via


Fonctions définies par l’utilisateur dans Databricks Connect pour Scala

Remarque

Cet article traite de Databricks Connect pour Databricks Runtime 14.1 et versions ultérieures.

Databricks Connect pour Scala prend en charge l’exécution de fonctions définies par l’utilisateur sur des clusters Databricks à partir de votre environnement de développement local.

Cette page explique comment exécuter des fonctions définies par l’utilisateur avec Databricks Connect pour Scala.

Pour obtenir la version Python de cet article, consultez fonctions définies par l’utilisateur dans Databricks Connect pour Python.

Téléverser les classes compilées et les JARs

Pour que les fonctions définies par l’utilisateur fonctionnent, les classes compilées et les JARs doivent être chargés sur le cluster à l’aide de l’API addCompiledArtifacts().

Remarque

La scala utilisée par le client doit correspondre à la version Scala sur le cluster Azure Databricks. Pour vérifier la version Scala du cluster, consultez la section « Environnement système » de la version Databricks Runtime du cluster dans les notes de publication sur les versions et la compatibilité de Databricks Runtime.

Le programme Scala suivant configure une fonction UDF simple qui place les valeurs dans une colonne.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val spark = getSession()

    val squared = udf((x: Long) => x * x)

    spark.range(3)
      .withColumn("squared", squared(col("id")))
      .select("squared")
      .show()

    }
  }

  def getSession(): SparkSession = {
    if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
      // On a Databricks cluster — reuse the active session
      SparkSession.active
    } else {
      // Locally with Databricks Connect — upload local JARs and classes
      DatabricksSession
        .builder()
        .addCompiledArtifacts(
          Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
        )
        .getOrCreate()
    }
  }
}

Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI pointe vers le même emplacement que la sortie compilée du projet (par exemple, target/classes ou le JAR compilé). Toutes les classes compilées sont chargées dans Databricks, pas seulement Main.

target/scala-2.13/classes/
├── com/
│   ├── examples/
│   │   ├── Main.class
│   │   └── MyUdfs.class
│   └── utils/
│       └── Helper.class

Lorsque la session Spark est déjà initialisée, des classes compilées et des fichiers JAR supplémentaires peuvent être chargés à l’aide de l’API spark.addArtifact().

Remarque

Lors de l'envoi de JARs, tous les JARs de dépendance transitive doivent être inclus dans le téléchargement. Les API n’effectuent aucune détection automatique des dépendances transitives.

Fonctions définies par l'utilisateur avec des dépendances tierces

Si vous avez ajouté une dépendance Maven dans build.sbt qui est utilisée dans une fonction UDF mais qui n'est pas disponible sur le cluster Databricks, par exemple :

// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
// In your code
import org.apache.commons.text.StringEscapeUtils

// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
  StringEscapeUtils.escapeHtml4(text)
})

Utilisez spark.addArtifact() avec ivy:// pour télécharger des dépendances à partir de Maven.

  1. Ajouter la oro bibliothèque à votre build.sbt fichier

    libraryDependencies ++= Seq(
      "org.apache.commons" % "commons-text" % "1.10.0" % Provided,
      "oro" % "oro" % "2.0.8"  // Required for ivy:// to work
    )
    
  2. Ajoutez l’artefact après avoir créé la session avec l’API addArtifact() :

    def getSession(): SparkSession = {
      if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
        SparkSession.active
      } else {
        val spark = DatabricksSession.builder()
          .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
          .getOrCreate()
    
        // Convert Maven coordinates to ivy:// format
        // From: "org.apache.commons" % "commons-text" % "1.10.0"
        // To:   ivy://org.apache.commons:commons-text:1.10.0
        spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0")
    
        spark
      }
    }
    

API de jeux de données typés

Les API de jeu de données typées permettent d’exécuter des transformations telles que map(), filter(), mapPartitions()et des agrégations sur les jeux de données résultants. Le chargement de la classe compilée et des R JAR sur le cluster à l’aide de l’API addCompiledArtifacts() s’applique également à ceux-ci. Votre code doit donc se comporter différemment en fonction de l’emplacement où il s’exécute :

  • Développement local avec Databricks Connect : charger des artefacts sur le cluster distant.
  • Déployé sur Databricks en cours d’exécution sur le cluster : il n’est pas nécessaire de charger quoi que ce soit, car les classes sont déjà là.

L’application Scala suivante utilise l’API map() pour modifier un nombre dans une colonne de résultat en chaîne préfixée.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Dépendances JAR externes

Si vous utilisez une bibliothèque privée ou tierce qui n’est pas sur le cluster :

import com.mycompany.privatelib.DataProcessor

// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
  DataProcessor.process(data)
})

Chargez des fichiers JAR externes à partir de votre lib/ dossier lors de la création de la session :

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    val builder = DatabricksSession.builder()
      .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)

     // Add all JARs from lib/ folder
     val libFolder = new java.io.File("lib")
     builder.addCompiledArtifacts(libFolder.toURI)


   builder.getOrCreate()
  }
}

Cela charge automatiquement tous les fichiers JAR de votre répertoire lib/répertoire dans Databricks lors de l’exécution locale.

Projets avec plusieurs modules

Dans un projet SBT multimodèle, getClass.getProtectionDomain.getCodeSource.getLocation.toURI retourne uniquement l’emplacement du module actuel. Si votre UDF utilise des classes à partir d’autres modules, vous obtiendrez ClassNotFoundException.

my-project/
├── module-a/  (main application)
├── module-b/  (utilities - module-a depends on this)

Utilisez getClass à partir d'une classe dans chaque module pour obtenir tous leurs emplacements et les télécharger séparément.

// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor  // From module-b

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    // Get location using a class FROM module-a
    val moduleALocation = Main.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    // Get location using a class FROM module-b
    val moduleBLocation = DataProcessor.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    DatabricksSession.builder()
      .addCompiledArtifacts(moduleALocation)  // Upload module-a
      .addCompiledArtifacts(moduleBLocation)  // Upload module-b
      .getOrCreate()
  }
}