Udostępnij przez


Funkcje zdefiniowane przez użytkownika w narzędziu Databricks Connect dla języka Python

Notatka

W tym artykule opisano program Databricks Connect dla środowiska Databricks Runtime w wersji 13.3 lub nowszej.

Program Databricks Connect dla języka Python obsługuje funkcje zdefiniowane przez użytkownika (UDF). Gdy wykonywana jest operacja DataFrame zawierająca funkcje zdefiniowane przez użytkownika (UDF), są one serializowane przez Databricks Connect i wysyłane do serwera jako część żądania.

Aby uzyskać informacje o funkcjach zdefiniowanych przez użytkownika dla programu Databricks Connect dla języka Scala, zobacz Funkcje zdefiniowane przez użytkownika w programie Databricks Connect dla języka Scala.

Notatka

Ponieważ funkcja zdefiniowana przez użytkownika jest serializowana i deserializowana, wersja języka Python klienta musi być zgodna z wersją języka Python w obliczeniach usługi Azure Databricks. Aby uzyskać informacje o obsługiwanych wersjach, zobacz macierz obsługi wersji.

Zdefiniuj funkcję zdefiniowaną przez użytkownika

Aby utworzyć UDF (funkcję zdefiniowaną przez użytkownika) w Databricks Connect dla języka Python, użyj jednej z następujących obsługiwanych funkcji:

Na przykład poniższy kod w Pythonie konfiguruje prostą funkcję UDF, która kwadratuje wartości w kolumnie.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

Funkcje zdefiniowane przez użytkownika z zależnościami

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej i wymaga programu Databricks Connect dla języka Python w wersji 16.4 lub nowszej oraz klastra z uruchomionym środowiskiem Databricks Runtime 16.4 lub nowszym. Aby użyć tej funkcji, włącz podgląd funkcji ulepszonych funkcji zdefiniowanych przez użytkownika (UDF) języka Python w katalogu Unity w swoim obszarze roboczym.

Usługa Databricks Connect obsługuje określanie zależności języka Python, które są wymagane dla funkcji zdefiniowanych przez użytkownika. Te zależności są instalowane na obliczeniach usługi Databricks w ramach środowiska języka Python usługi UDF.

Ta funkcja umożliwia użytkownikom określanie zależności potrzebnych dla UDF (funkcja definiowana przez użytkownika) oprócz pakietów dostępnych w środowisku podstawowym. Można go również użyć do zainstalowania innej wersji pakietu z tego, co jest dostępne w środowisku podstawowym.

Zależności można instalować z następujących źródeł:

  • Pakiety PyPI
    • Pakiety PyPI można określić zgodnie z PEP 508, na przykład , dicepyjokes<1 lub simplejson==3.19.*.
  • Pliki przechowywane w wolumenach Unity Catalog
    • Obsługiwane są pakiety wheel (.whl) oraz pliki tar skompresowane gzipem (.tar.gz). Użytkownik musi mieć uprawnienie READ_FILE do pliku w woluminie re:[UC].
    • Podczas instalowania pakietów z woluminów katalogu Unity, aby wywołać funkcje zdefiniowane przez użytkownika, użytkownicy muszą mieć READ VOLUME uprawnienia do woluminu źródłowego. Przyznanie tego uprawnienia wszystkim użytkownikom konta powoduje automatyczne włączenie tego ustawienia dla nowych użytkowników.
    • Pliki woluminów katalogu Unity powinny być określone jako dbfs:<path>, na przykład dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl lub dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.

Aby uwzględnić niestandardowe zależności w funkcji zdefiniowanej przez użytkownika, określ je w środowisku przy użyciu withDependencies, a następnie użyj tego środowiska do utworzenia sesji Spark. Zależności są instalowane w środowisku obliczeniowym Databricks i będą dostępne we wszystkich funkcjach użytkownika korzystających z tej sesji Spark.

Poniższy kod deklaruje pakiet dice PyPI jako zależność:

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Lub, aby określić zależność koła w woluminie:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Zachowanie w notesach i zadaniach w usłudze Databricks

W notesach i zadaniach zależności od funkcji definiowanych przez użytkownika muszą być instalowane bezpośrednio w środowisku uruchomieniowym REPL. Usługa Databricks Connect weryfikuje środowisko JĘZYKA Python REPL, sprawdzając, czy wszystkie określone zależności są już zainstalowane i zgłasza wyjątek, jeśli nie zostały zainstalowane.

Walidacja środowiska notesu odbywa się zarówno dla zależności woluminów PyPI, jak i Unity Catalog. Zależności woluminów należy spakować zgodnie ze standardowymi specyfikacjami pakowania języka Python według PEP-427 lub nowszych dla plików typu wheel oraz PEP-241 lub nowszych dla plików dystrybucji źródłowej. Aby uzyskać więcej informacji na temat standardów pakowania języka Python, zobacz dokumentację PyPA.

Ograniczenia

  • Pliki, takie jak koło języka Python lub dystrybucja źródłowa na lokalnej maszynie dewelopera, nie mogą być określane bezpośrednio jako zależność. Najpierw muszą zostać przekazane do woluminów katalogu Unity.
  • Obsługa zależności funkcji UDF dla pyspark.sql.streaming.DataStreamWriter.foreach i pyspark.sql.streaming.DataStreamWriter.foreachBatch wymaga programu Databricks Connect dla języka Python 18.0 lub nowszego oraz klastra z uruchomionym środowiskiem Databricks Runtime 18.0 lub nowszym.
  • Zależności w funkcjach UDF nie są obsługiwane w przypadku funkcji agregujących biblioteki pandas nad funkcjami okien.

Przykłady

W poniższym przykładzie zdefiniowano zależności PyPI i zależności od wolumenów w środowisku, utworzono sesję z tym środowiskiem, a następnie zdefiniowano i wywołano UDF, które wykorzystują te zależności:

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
    # Example library from: https://pypi.org/project/simplejson/#files
    "dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Środowisko podstawowe języka Python

Funkcje zdefiniowane przez użytkownika są wykonywane na platformie obliczeniowej Databricks, a nie na kliencie. Podstawowe środowisko języka Python, w którym wykonywane są funkcje UDF, zależy od środowiska obliczeniowego Databricks.

W przypadku klastrów podstawowe środowisko języka Python to środowisko języka Python w wersji środowiska Databricks Runtime uruchomionej w klastrze. Wersja języka Python i lista pakietów w tym środowisku podstawowym znajdują się w sekcjach Środowisko systemowe i Zainstalowane biblioteki języka Python w informacjach o wersji środowiska Databricks Runtime.

W przypadku obliczeń bezserwerowych podstawowe środowisko języka Python odpowiada wersji środowiska bezserwerowego zgodnie z poniższą tabelą.

Wersja programu Databricks Connect Środowisko bezserwerowe funkcji UDF
Od 17.0 do 17.3, Python 3.12 Wersja 4
16.4.1 do poniżej 17, Python 3.12 Wersja 3
15.4.10 do poniżej 16, Python 3.12 Wersja 3
15.4.10 do poniżej 16, Python 3.11 Wersja 2
Od 15.4.0 do 15.4.9 i 16.0 do 16.3 Najnowsze zasoby obliczeniowe bezserwerowe. Przeprowadź migrację do wersji 15.4.10 LTS lub nowszej lub 16.4.1 LTS i nowszej, aby zapewnić stabilne środowisko języka Python.