Compartilhar via


Funções escalares definidas pelo usuário – Scala

Este artigo contém exemplos de UDF (função definida pelo usuário) do Scala. Ele mostra como registrar UDFs, como invocar UDFs e advertências sobre a ordem de avaliação de subexpressões no Spark SQL. Consulte Fnções escalares definidas pelo usuário (UDFs) para obter mais detalhes.

Observação

Os UDFs do Scala em recursos de computação habilitados para Catálogo do Unity com modo de acesso padrão (antigo modo de acesso compartilhado) exigem o Databricks Runtime 14.2 e superior.

Registrar uma função como uma UDF

val squared = (s: Long) => {
  s * s
}
spark.udf.register("square", squared)

Chamar a UDF no Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test

Usar UDF com DataFrames

import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))

Ordem de avaliação e verificação nula

O Spark SQL (inclusive a SQL, as APIs do conjunto de dados e do DataFrame) não garante a ordem de avaliação de subexpressões. Em especial, as entradas de um operador ou função não são necessariamente avaliadas da esquerda para a direita ou em qualquer outra ordem fixa. Por exemplo, as expressões lógicas AND e OR não têm semântica de "curto-circuito" da esquerda para a direita.

Portanto, é perigoso confiar nos efeitos colaterais ou na ordem de avaliação de expressões booleanas, e na ordem das cláusulas WHERE e HAVING, pois essas expressões e cláusulas podem ser reordenadas durante a otimização e o planejamento da consulta. Especificamente, se uma UDF depender de semântica de curto-circuito no SQL para verificação nula, não haverá garantia de que a verificação nula acontecerá antes de invocar a UDF. Por exemplo,

spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee

Essa cláusula WHERE não garante que a UDF strlen seja invocada após a filtragem de nulos.

Para executar a verificação nula adequada, é recomendável que você faça o seguinte:

  • Fazer a própria UDF reconhecer nulos e verificar nulos dentro da própria UDF
  • Usar IF expressões ou CASE WHEN para fazer a verificação nula e invocar a UDF em uma ramificação condicional
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

APIs do conjunto de dados tipado

Observação

Esse recurso tem suporte em clusters com o Catálogo do Unity habilitado, no modo de acesso padrão, no Databricks Runtime 15.4 e superior.

As APIs do conjunto de dados tipado permitem a você executar transformações como mapa, filtro e agregações nos conjuntos de dados resultantes com uma função definida pelo usuário.

Por exemplo, o aplicativo Scala a seguir usa a API map() para modificar um número em uma coluna de resultado para uma sequência de caracteres prefixada.

spark.range(3).map(f => s"row-$f").show()

Embora este exemplo use a API map(), ele também se aplica a outras APIs do conjunto de dados tipado, como filter(), mapPartitions(), foreach(), foreachPartition(), reduce() e flatMap().

Recursos do Scala UDF e compatibilidade do Databricks Runtime

Os recursos do Scala a seguir exigem versões mínimas do Databricks Runtime quando usados em clusters habilitados para o Catálogo do Unity no modo de acesso padrão (compartilhado).

Característica Versão mínima do Databricks Runtime
UDFs escalares Databricks Runtime 14.2
Dataset.map, Dataset.mapPartitions, Dataset.filter, , Dataset.reduceDataset.flatMap Databricks Runtime 15.4
KeyValueGroupedDataset.flatMapGroups, KeyValueGroupedDataset.mapGroups Databricks Runtime 15.4
(Streaming) foreachWriter Sink Databricks Runtime 15.4
(Streaming) foreachBatch Databricks Runtime 16.1
(Streaming) KeyValueGroupedDataset.flatMapGroupsWithState Databricks Runtime 16.2