Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Nota:
En este artículo se describe Databricks Connect para Databricks Runtime 13.3 y versiones posteriores.
Databricks Connect para Python admite funciones definidas por el usuario (UDF). Cuando se ejecuta una operación DataFrame que incluye UDF, Databricks Connect serializa las UDF y se envían al servidor como parte de la solicitud.
Para obtener información sobre las UDF para Databricks Connect para Scala, consulte Funciones definidas por el usuario en Databricks Connect para Scala.
Nota:
Dado que la función definida por el usuario se serializa y deserializa, la versión de Python del cliente debe coincidir con la versión de Python en el proceso de Azure Databricks. Para ver las versiones admitidas, consulte la matriz de compatibilidad de versiones.
Definir una UDF
Para crear una UDF en Databricks Connect para Python, use una de las siguientes funciones admitidas:
- Funciones definidas por el usuario de PySpark
- Funciones de streaming de PySpark
Por ejemplo, en el siguiente código Python se configura una UDF simple que cuadra los valores de una columna.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
UDF con dependencias
Importante
Esta característica está en versión preliminar pública y requiere Databricks Connect para Python 16.4 o superior, y un clúster que ejecuta Databricks Runtime 16.4 o superior. Para usar esta característica, habilite la vista previa de UDFs mejoradas de Python en el catálogo de Unity en su espacio de trabajo.
Databricks Connect admite la especificación de dependencias de Python necesarias para las UDF. Estas dependencias se instalan en el proceso de Databricks como parte del entorno de Python de UDF.
Esta característica permite a los usuarios especificar las dependencias que necesita la UDF además de los paquetes proporcionados en el entorno base. También se puede usar para instalar una versión diferente del paquete a partir de lo que se proporciona en el entorno base.
Las dependencias se pueden instalar desde los siguientes orígenes:
- Paquetes PyPI
- Los paquetes PyPI se pueden especificar según PEP 508, por ejemplo,
dice,pyjokes<1osimplejson==3.19.*.
- Los paquetes PyPI se pueden especificar según PEP 508, por ejemplo,
- Archivos almacenados en volúmenes de Catálogo de Unity
- Se admiten los paquetes wheel (
.whl) y los archivos tar comprimidos con gzip (.tar.gz). Se debe conceder al usuario el permisoREAD_FILEen el archivo del volumen re:[UC]. - Cuando se van a instalar paquetes a partir de volúmenes de Unity Catalog, para invocar las UDF, los usuarios necesitan el permiso
READ VOLUMEen el volumen de origen. Conceder este permiso a todos los usuarios de la cuenta permite esto automáticamente para los nuevos usuarios. - Los archivos de volúmenes de catálogo de Unity deben especificarse como
dbfs:<path>, por ejemplo,dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whlodbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.
- Se admiten los paquetes wheel (
Para incluir dependencias personalizadas en la UDF, especifíquelas en un entorno mediante withDependenciesy, a continuación, use ese entorno para crear una sesión de Spark. Las dependencias se instalan en el entorno de Databricks y estarán disponibles en cualquier UDF que use esta sesión de Spark.
El código siguiente declara el paquete dice pyPI como una dependencia:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
O bien, para indicar la dependencia de una rueda en un volumen:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Modo de uso en cuadernos y trabajos de Databricks
En cuadernos y trabajos, las dependencias de UDF deben instalarse directamente en REPL. Databricks Connect valida el entorno de Python de REPL comprobando que todas las dependencias especificadas ya están instaladas y produce una excepción si no están instaladas.
La validación del entorno del cuaderno se lleva a cabo tanto para las dependencias de volumen de datos de PyPI como para Unity Catalog. Las dependencias relacionadas con el volumen deben empaquetarse siguiendo las especificaciones estándar de empaquetado de Python de PEP-427 o posterior para los archivos de rueda (wheel) y PEP-241 o posterior para los archivos de distribución de origen. Para más información sobre los estándares de empaquetado de Python, consulte la documentación de PyPA.
Limitaciones
- Los archivos como la rueda de Python o la distribución de origen en la máquina de desarrollo local no se pueden especificar directamente como una dependencia. Primero deben ser subidos a los volúmenes del Catálogo de Unity.
- La compatibilidad de las dependencias de UDF con
pyspark.sql.streaming.DataStreamWriter.foreachypyspark.sql.streaming.DataStreamWriter.foreachBatchrequiere Databricks Connect para Python 18.0 o posterior, y un clúster en ejecución con Databricks Runtime 18.0 o superior. - Las dependencias de funciones definidas por el usuario (UDF) no se admiten para las UDF de agregación de pandas sobre funciones de ventana.
Ejemplos
En el siguiente ejemplo se definen las dependencias de PyPI y volúmenes en un entorno, se crea una sesión con ese entorno y se definen y llaman a las UDF que usan esas dependencias.
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Entorno base de Python
Las UDF se ejecutan en el proceso de Databricks y no en el cliente. El entorno base de Python en el que se ejecutan las UDFs depende del cómputo de Databricks.
En el caso de los clústeres, el entorno de Python base es el entorno de Python de la versión de Databricks Runtime que se ejecuta en el clúster. La versión de Python y la lista de paquetes de este entorno base se encuentran en las secciones Entorno del sistema y Bibliotecas de Python instaladas de las notas de la versión de Databricks Runtime.
Para el proceso sin servidor, el entorno de Python base corresponde a la versión del entorno sin servidor según la tabla siguiente.
| Versión de Databricks Connect | Entorno sin servidor UDF |
|---|---|
| De 17.0 a 17.3, Python 3.12 | Versión 4 |
| 16.4.1 a menos de 17, Python 3.12 | Versión 3 |
| 15.4.10 a menos de 16, Python 3.12 | Versión 3 |
| 15.4.10 a menos de 16, Python 3.11 | Versión 2 |
| De 15.4.0 a 15.4.9 y de 16.0 a 16.3 | Proceso sin servidor más reciente. Migre a 15.4.10 LTS y versiones posteriores o 16.4.1 LTS y versiones posteriores para garantizar un entorno Python estable. |