Partager via


Fonctions définies par l’utilisateur dans Databricks Connect pour Python

Remarque

Cet article traite de Databricks Connect pour Databricks Runtime 13.3 et versions ultérieures.

Databricks Connect pour Python prend en charge les fonctions définies par l’utilisateur (UDF). Lorsqu’une opération DataFrame qui inclut des fonctions définies par l’utilisateur est exécutée, les fonctions définies par l’utilisateur sont sérialisées par Databricks Connect et envoyées au serveur dans le cadre de la requête.

Pour plus d’informations sur les UDF pour Databricks Connect pour Scala, consultez Fonctions définies par l’utilisateur dans Databricks Connect pour Scala.

Remarque

Étant donné que la fonction définie par l’utilisateur est sérialisée et désérialisée, la version Python du client doit correspondre à la version Python sur le calcul Azure Databricks. Pour connaître les versions prises en charge, consultez la matrice de prise en charge des versions.

Définir une UDF

Pour créer une fonction UDF dans Databricks Connect pour Python, utilisez l’une des fonctions prises en charge suivantes :

Par exemple, le script Python suivant configure une fonction UDF simple qui élève au carré les valeurs dans une colonne.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

UDF avec dépendances

Importante

Cette fonctionnalité est disponible en préversion publique et nécessite Databricks Connect pour Python 16.4 ou version ultérieure, et un cluster exécutant Databricks Runtime 16.4 ou version ultérieure. Pour utiliser cette fonctionnalité, activez la préversion des UDF Python dans Unity Catalog dans votre espace de travail.

Databricks Connect prend en charge la spécification des dépendances Python requises pour les fonctions UDF. Ces dépendances sont installées sur la capacité de calcul Databricks dans le cadre de l’environnement Python des UDF.

Cette fonctionnalité permet aux utilisateurs de spécifier des dépendances dont l’UDF a besoin en plus des packages fournis dans l’environnement de base. Il peut également être utilisé pour installer une autre version du package à partir de ce qui est fourni dans l’environnement de base.

Les dépendances peuvent être installées à partir des sources suivantes :

  • Packages PyPI
    • Les packages PyPI peuvent être spécifiés en fonction du PEP 508, par exemple, dicepyjokes<1 ou simplejson==3.19.*.
  • Fichiers stockés dans des volumes de catalogue Unity
    • Les packages wheel (.whl) et les fichiers tar.gz (.tar.gz) sont pris en charge. L’utilisateur doit avoir une autorisation READ_FILE sur le fichier dans le volume re:[UC].
    • Lors de l’installation de packages à partir de volumes de catalogue Unity, pour appeler les UDF, les utilisateurs ont besoin READ VOLUME d’autorisations sur le volume source. L’octroi de cette autorisation à tous les utilisateurs de compte active automatiquement cette autorisation pour les nouveaux utilisateurs.
    • Les fichiers de volumes catalogue Unity doivent être spécifiés en tant que dbfs:<path>, par exemple, dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl ou dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.

Pour inclure des dépendances personnalisées dans votre fonction UDF, spécifiez-les dans un environnement à l’aide withDependenciesde cet environnement, puis utilisez cet environnement pour créer une session Spark. Les dépendances sont installées sur votre capacité de calcul Databricks et seront disponibles dans toutes les UDF qui utilisent cette session Spark.

Le code suivant déclare le package dice PyPI en tant que dépendance :

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Ou, pour spécifier une dépendance d’une roue dans un volume :

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Comportement dans les notebooks et tâches Databricks

Dans les notebooks et les travaux, les dépendances UDF doivent être installées directement dans l'environnement REPL. Databricks Connect valide l’environnement Python REPL en vérifiant que toutes les dépendances spécifiées sont déjà installées et lève une exception si elles ne sont pas installées.

La validation de l’environnement de notebook se produit pour les dépendances de volume du catalogue PyPI et Unity. Les dépendances de volume doivent être empaquetées en suivant les spécifications d’empaquetage Python standard de PEP-427 ou version ultérieure pour les fichiers wheel, et PEP-241 ou version ultérieure pour les fichiers de distribution source. Pour plus d’informations sur les normes d’empaquetage Python, consultez la documentation PyPA.

Limites

  • Les fichiers tels que la roulette Python ou la distribution source sur votre ordinateur de développement local ne peuvent pas être spécifiés directement en tant que dépendance. Ils doivent d’abord être chargés dans des volumes de catalogue Unity.
  • Les dépendances UDF pour pyspark.sql.streaming.DataStreamWriter.foreach et pyspark.sql.streaming.DataStreamWriter.foreachBatch nécessitent Databricks Connect pour Python 18.0 ou une version ultérieure, ainsi qu'un cluster exécutant Databricks Runtime 18.0 ou une version ultérieure.
  • Les dépendances UDF ne sont pas prises en charge pour les fonctions UDF d’agrégation Pandas appliquées sur des fonctions de fenêtre.

Exemples

L’exemple suivant définit des dépendances PyPI et volumes dans un environnement, crée une session avec cet environnement, puis définit et appelle des UDF qui utilisent ces dépendances :

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
    # Example library from: https://pypi.org/project/simplejson/#files
    "dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Environnement de base Python

Les UDF sont exécutées sur la capacité de calcul Databricks et non sur le client. L'environnement Python de base dans lequel les UDF sont exécutées dépend de la capacité de calcul Databricks.

Pour les clusters, l’environnement Python de base est l’environnement Python de la version databricks Runtime s’exécutant sur le cluster. La version de Python et la liste des packages de cet environnement de base se trouvent sous l’environnement système et les sections bibliothèques Python installées des notes de publication databricks Runtime.

Pour le calcul serverless, l’environnement Python de base correspond à la version de l’environnement serverless en fonction du tableau suivant.

Version de Databricks Connect Environnement sans serveur UDF
17.0 à 17.3, Python 3.12 Version 4
16.4.1 à moins de 17, Python 3.12 Version 3
15.4.10 à moins de 16, Python 3.12 Version 3
15.4.10 jusqu'à la version inférieure à 16, Python 3.11 Version 2
15.4.0 à 15.4.9 et 16.0 à 16.3 Nouveau service de calcul serverless. Migrez vers 15.4.10 LTS et versions ultérieures ou 16.4.1 LTS et versions ultérieures pour un environnement Python stable.