Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ten artykuł zawiera przykłady funkcji zdefiniowanej przez użytkownika (UDF). Przedstawiono w nim sposób rejestrowania funkcji zdefiniowanych przez użytkownika, wywoływania funkcji zdefiniowanych przez użytkownika i zastrzeżeń dotyczących kolejności oceny podexpressionów w usłudze Spark SQL. Aby uzyskać więcej informacji, zobacz Zewnętrzne funkcje skalarne zdefiniowane przez użytkownika (UDF).
Uwaga
Funkcje zdefiniowane przez użytkownika w języku Scala na zasobach obliczeniowych z włączonym katalogiem Unity oraz standardowym trybem dostępu (dawniej trybem współdzielonym) wymagają Databricks Runtime 14.2 lub nowszego.
Rejestrowanie funkcji jako funkcji zdefiniowanej przez użytkownika
val squared = (s: Long) => {
s * s
}
spark.udf.register("square", squared)
Wywoływanie funkcji zdefiniowanej przez użytkownika w usłudze Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test
Używanie funkcji UDF z ramkami danych
import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))
Kolejność oceny i sprawdzanie wartości null
Usługa Spark SQL (w tym sql i interfejsy API ramek danych i zestawów danych) nie gwarantuje kolejności obliczania podexpressionów. W szczególności dane wejściowe operatora lub funkcji nie muszą być oceniane od lewej do prawej ani w innej stałej kolejności. Na przykład wyrażenia logiczne AND i OR nie mają semantyki od lewej do prawej "zwarcie".
W związku z tym niebezpieczne jest poleganie na skutkach ubocznych lub kolejności obliczania wyrażeń logicznych oraz kolejności WHERE klauzul i HAVING , ponieważ takie wyrażenia i klauzule można zmienić kolejność podczas optymalizacji zapytań i planowania. W szczególności, jeśli funkcja UDF wykorzystuje semantykę zwarciową w języku SQL w celu sprawdzania wartości null, nie ma gwarancji, że sprawdzanie wartości null nastąpi przed wywołaniem funkcji UDF. Na przykład:
spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee
Ta WHERE klauzula nie gwarantuje wywołania funkcji zdefiniowanej strlen przez użytkownika po odfiltrowaniu wartości null.
Aby wykonać odpowiednie sprawdzanie wartości null, zalecamy wykonanie jednej z następujących czynności:
- Upewnij się, że funkcja UDF jest świadoma wartości null i sprawdza wartość null wewnątrz samej funkcji zdefiniowanej przez użytkownika
- Używanie
IFwyrażeń lubCASE WHENdo sprawdzania wartości null i wywoływanie funkcji zdefiniowanej przez użytkownika w gałęzi warunkowej
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Typizowane interfejsy API zestawu danych
Uwaga
Ta funkcja jest obsługiwana w klastrach z obsługą Unity Catalog w standardowym trybie dostępu w środowisku Databricks Runtime 15.4 lub nowszym.
Typizowane interfejsy API zestawu danych umożliwiają uruchamianie przekształceń, takich jak mapowanie, filtrowanie i agregacje w wynikowych zestawach danych z funkcją zdefiniowaną przez użytkownika.
Na przykład następująca aplikacja Scala używa interfejsu map() API do modyfikowania liczby w kolumnie wynikowej na prefiksowany ciąg.
spark.range(3).map(f => s"row-$f").show()
W tym przykładzie użyto interfejsu map() API, ale dotyczy to również innych typowych interfejsów API zestawu danych, takich jak filter(), , mapPartitions()foreach(), foreachPartition(), reduce()i flatMap().
Funkcje UDF w języku Scala i zgodność środowiska Databricks Runtime
Poniższe funkcje języka Scala wymagają minimalnych wersji środowiska Databricks Runtime w przypadku użycia w klastrach z włączonym Unity Catalog w trybie standardowego (współdzielonego) dostępu.
| Funkcja | Minimalna wersja Databricks Runtime |
|---|---|
| Skalarne funkcje zdefiniowane przez użytkownika | Databricks Runtime 14.2 |
Dataset.map, , Dataset.mapPartitions, Dataset.filter, , Dataset.reduceDataset.flatMap |
Databricks Runtime 15.4 |
KeyValueGroupedDataset.flatMapGroups, KeyValueGroupedDataset.mapGroups |
Databricks Runtime 15.4 |
(Przesyłanie strumieniowe) foreachWriter Sink |
Databricks Runtime 15.4 |
(Przesyłanie strumieniowe) foreachBatch |
Databricks Runtime 16.1 |
(Przesyłanie strumieniowe) KeyValueGroupedDataset.flatMapGroupsWithState |
Databricks Runtime 16.2 |