Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cet article contient des exemples de fonctions définies par l’utilisateur (UDF) Scala. Il montre comment enregistrer des UDFs, appeler des UDFs, ainsi que des précautions concernant l’ordre d’évaluation des sous-expressions dans Spark SQL. Voir Fonctions scalaires externes définies par l'utilisateur (UDF) pour plus de détails.
Remarque
Les fonctions Scala définies par l'utilisateur nécessitent Databricks Runtime 14.2 et versions ultérieures, lorsqu'elles sont exécutées sur des ressources de calcul compatibles avec le catalogue Unity en mode d'accès standard (anciennement mode d'accès partagé).
Inscrire une fonction en tant que fonction définie par l’utilisateur
val squared = (s: Long) => {
s * s
}
spark.udf.register("square", squared)
Appeler la UDF dans Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test
Utiliser une UDF avec des DataFrames
import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))
Ordre d’évaluation et vérification de valeurs nulles
Spark SQL (incluant SQL et les API TrameDonnées et Jeu de données) ne garantit pas l’ordre d’évaluation des sous-expressions. En particulier, les entrées d’un opérateur ou d’une fonction ne sont pas nécessairement évaluées de gauche à droite ou dans tout autre ordre fixe. Par exemple, les expressions logiques AND et OR n’ont pas de sémantique de « court-circuit » de gauche à droite.
Il est donc dangereux de s’appuyer sur les effets secondaires ou l’ordre d’évaluation des expressions booléennes, ainsi que sur l’ordre des clauses WHERE et HAVING, car ces expressions et clauses peuvent être réordonnées lors de l’optimisation et de la planification des requêtes. Plus précisément, si une fonction UDF s’appuie sur la sémantique de court-circuitage dans SQL pour la vérification null, il n’existe aucune garantie que la vérification null se produit avant d’appeler la fonction UDF. Par exemple,
spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee
Cette clause WHERE ne garantit pas que la fonction définie par l’utilisateur strlen soit appelée après le filtrage des valeurs nulles.
Pour effectuer une vérification de valeur null correcte, nous vous recommandons d’effectuer l’une des opérations suivantes :
- Rendez la fonction définie par l’utilisateur sensible aux valeurs nulles et effectuez une vérification de valeurs nulles au sein de la fonction définie par l’utilisateur
- Utilisez les expressions
IFouCASE WHENpour effectuer la vérification de valeurs nulles et appelez la fonction définie par l’utilisateur dans une branche conditionnelle
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
API des jeux de données typés
Remarque
Cette fonctionnalité est prise en charge sur les clusters avec catalogue Unity avec le mode d’accès standard dans Databricks Runtime 15.4 et versions ultérieures.
Les API de jeux de données typés vous permettent d’exécuter des transformations telles que mappage, filtre et agrégations sur les jeux de données résultants à l’aide d’une fonction définie par l’utilisateur.
Par exemple, l’application Scala suivante utilise l’API map() pour modifier un nombre dans une colonne de résultat en chaîne préfixée.
spark.range(3).map(f => s"row-$f").show()
Bien que cet exemple utilise l’API map(), il s’applique également aux autres API de jeu de données typé telles que filter(), mapPartitions(), foreach(), foreachPartition(), reduce(), et flatMap().
Fonctionnalités UDF Scala et compatibilité databricks Runtime
Les fonctionnalités Scala suivantes nécessitent des versions minimales de Databricks Runtime lorsqu’elles sont utilisées sur les clusters Unity Catalog activés en mode d’accès standard (partagé).
| Caractéristique | Version de Minimimum Databricks Runtime |
|---|---|
| Fonctions définies par l’utilisateur scalaire | Databricks Runtime 14.2 |
Dataset.map, , Dataset.mapPartitionsDataset.filter, , Dataset.reduceDataset.flatMap |
Databricks Runtime 15.4 |
KeyValueGroupedDataset.flatMapGroups, KeyValueGroupedDataset.mapGroups |
Databricks Runtime 15.4 |
(Streaming) foreachWriter Sink |
Databricks Runtime 15.4 |
(Streaming) foreachBatch |
Databricks Runtime 16.1 |
(Diffusion en continu) KeyValueGroupedDataset.flatMapGroupsWithState |
Version 16.2 de Databricks Runtime |