Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
O registro de UDTFs do Python no Catálogo do Unity está em versão prévia pública.
Uma UDTF (função de tabela definida pelo usuário) do Catálogo do Unity registra funções que retornam tabelas completas em vez de valores escalares. Ao contrário das funções escalares que retornam um único valor de resultado de cada chamada, UDTFs são invocados na cláusula de FROM uma instrução SQL e podem retornar várias linhas e colunas.
UDTFs são particularmente úteis para:
- Transformando matrizes ou estruturas de dados complexas em várias linhas
- Integrando APIs ou serviços externos em fluxos de trabalho do SQL
- Implementando a lógica de geração ou enriquecimento de dados personalizados
- Processamento de dados que exigem operações com estado entre linhas
Cada chamada UDTF aceita zero ou mais argumentos. Esses argumentos podem ser expressões escalares ou argumentos de tabela que representam tabelas de entrada inteiras.
UDTFs podem ser registrados de duas maneiras:
- Catálogo do Unity: registre o UDTF como um objeto controlado no Catálogo do Unity.
- Escopo da sessão: registre-se no local
SparkSession, isolado no notebook ou trabalho atual. Consulte UDTFs (funções de tabela definidas pelo usuário) do Python.
Requirements
Os UDTFs do Python do Catálogo do Unity têm suporte nos seguintes tipos de computação:
- Computação clássica com modo de acesso padrão (Databricks Runtime 17.1 e superior)
- SQL Warehouse (sem servidor ou profissional)
Criar um UDTF no Catálogo do Unity
Use a DDL do SQL para criar uma UDTF governada no Unity Catalog. UDTFs são invocados usando a cláusula FROM de uma instrução SQL.
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;
SELECT * FROM square_numbers(1, 5);
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+
Azure Databricks implementa UDTFs do Python como classes Python que incluem um método obrigatório eval que gera linhas de saída.
Argumentos de tabela
Observação
TABLE há suporte para argumentos no Databricks Runtime 17.2 e superior.
Os UDTFs podem aceitar tabelas inteiras como argumentos de entrada, permitindo transformações e agregações complexas com estado.
eval() e terminate() métodos de ciclo de vida
Os argumentos de tabela em UDTFs usam as seguintes funções para processar cada linha:
-
eval(): chamado uma vez para cada linha na tabela de entrada. Esse é o principal método de processamento e é necessário. -
terminate(): chamado uma vez no final de cada partição, depois que todas as linhas tiverem sido processadas poreval(). Use esse método para produzir resultados agregados finais ou executar operações de limpeza. Esse método é opcional, mas essencial para operações com estado, como agregações, contagem ou processamento em lote.
Para obter mais informações sobre eval() e terminate() métodos, consulte a documentação do Apache Spark: UDTF do Python.
Padrões de acesso a linhas
eval() recebe linhas dos argumentos de TABLE como objetos pyspark.sql.Row. Você pode acessar valores por nome de coluna (row['id'], row['name']) ou por índice (row[0], row[1]).
-
Flexibilidade de esquema: declarar TABLE argumentos sem definições de esquema (por exemplo,
data TABLE, ).t TABLEA função aceita qualquer estrutura de tabela, portanto, seu código deve validar se as colunas necessárias existem.
Veja Exemplo: Correspondência de endereços IP com blocos de rede CIDR e Exemplo: Legendagem em lote de imagens usando endpoints de visão do Azure Databricks.
Isolamento de ambiente
Observação
Ambientes de isolamento compartilhado exigem o Databricks Runtime 17.2 e superior. Em versões anteriores, todos os UDTFs do Python do Catálogo do Unity são executados no modo de isolamento estrito.
UDTFs do Python do Catálogo do Unity com o mesmo proprietário e sessão podem compartilhar um ambiente de isolamento por padrão. Isso melhora o desempenho e reduz o uso de memória reduzindo o número de ambientes separados que precisam ser iniciados.
Isolamento estrito
Para garantir que um UDTF sempre seja executado em seu próprio ambiente totalmente isolado, adicione a STRICT ISOLATION cláusula característica.
A maioria dos UDTFs não precisa de isolamento estrito. Os UDTFs de processamento de dados padrão se beneficiam do ambiente de isolamento compartilhado padrão e são executados mais rapidamente com menor consumo de memória.
Adicione a STRICT ISOLATION cláusula característica a UDTFs que:
- Execute a entrada como código usando
eval(),exec()ou funções semelhantes. - Gravar arquivos no sistema de arquivos local.
- Modificar variáveis globais ou estado do sistema.
- Acesse ou modifique variáveis de ambiente.
O exemplo de UDTF a seguir define uma variável de ambiente personalizada, lê a variável novamente e multiplica um conjunto de números usando a variável. Como o UDTF modifica o ambiente do processo, execute-o em STRICT ISOLATION. Caso contrário, ele poderá vazar ou substituir variáveis de ambiente para outros UDFs/UDTFs no mesmo ambiente, causando um comportamento incorreto.
CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os
class Multiplier:
def eval(self, factor: str):
# Save the factor as an environment variable
os.environ["FACTOR"] = factor
# Read it back and convert it to a number
scale = int(os.getenv("FACTOR", "1"))
# Multiply 0 through 4 by the factor
for i in range(5):
yield (i, i * scale)
$$;
SELECT * FROM multiply_numbers("3");
Definir DETERMINISTIC se sua função produz resultados consistentes
Adicione DETERMINISTIC à sua definição de função se ela produzir as mesmas saídas para as mesmas entradas. Isso permite otimizações de consulta para melhorar o desempenho.
Por padrão, as UDTFs do Python do Unity Catalog em lote são consideradas não determinísticas, a menos que explicitamente declaradas como tais. Exemplos de funções não determinísticas incluem: gerar valores aleatórios, acessar datas ou horários atuais ou fazer chamadas à API externa.
Consulte CREATE FUNCTION (SQL e Python).
Exemplos práticos
Os exemplos a seguir demonstram casos de uso do mundo real para UDTFs do Python do Catálogo do Unity, progredindo de transformações de dados simples para integrações externas complexas.
Exemplo: implementação novamente explode
Embora o Spark forneça uma função interna explode , criar sua própria versão demonstra o padrão UDTF fundamental de usar uma única entrada e produzir várias linhas de saída.
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;
Use a função diretamente em uma consulta SQL:
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
| element |
+---------+
| apple |
| banana |
| cherry |
+---------+
Ou aplique-o aos dados de tabela existentes com um joinLATERAL:
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;
Exemplo: localização geográfica de endereço IP via API REST
Este exemplo demonstra como os UDTFs podem integrar APIs externas diretamente ao fluxo de trabalho do SQL. Os analistas podem enriquecer dados com chamadas à API em tempo real usando sintaxe SQL familiar, sem a necessidade de processos ETL separados.
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Return nothing if the API request fails
return
$$;
Observação
Os UDTFs do Python permitem o tráfego de rede TCP/UDP nas portas 80, 443 e 53 ao usar computação sem servidor ou computação configurada com o modo de acesso padrão.
Use a função para enriquecer dados de log da Web com informações geográficas:
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;
Essa abordagem permite a análise geográfica em tempo real sem exigir tabelas de pesquisa pré-processadas ou pipelines de dados separados. O UDTF lida com solicitações HTTP, análise JSON e tratamento de erros, tornando as fontes de dados externas acessíveis por meio de consultas SQL padrão.
Exemplo: corresponder endereços IP com blocos de rede CIDR
Este exemplo demonstra a correspondência de endereços IP em blocos de rede CIDR, uma tarefa de engenharia de dados comum que requer lógica SQL complexa.
Primeiro, crie dados de exemplo com endereços IPv4 e IPv6:
-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
('log1', '192.168.1.100'),
('log2', '10.0.0.5'),
('log3', '172.16.0.10'),
('log4', '8.8.8.8'),
('log5', '2001:db8::1'),
('log6', '2001:db8:85a3::8a2e:370:7334'),
('log7', 'fe80::1'),
('log8', '::1'),
('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);
Em seguida, defina e registre o UDTF. Observe a estrutura da classe Python:
- O
t TABLEparâmetro aceita uma tabela de entrada com qualquer esquema. O UDTF se adapta automaticamente para processar as colunas fornecidas. Essa flexibilidade significa que você pode usar a mesma função em tabelas diferentes sem modificar a assinatura da função. No entanto, você deve verificar cuidadosamente o esquema das linhas para garantir a compatibilidade. - O método
__init__é usado para uma configuração pesada e de único uso, como carregar a grande lista de redes. Esse trabalho ocorre uma vez por partição da tabela de entrada. - O
evalmétodo processa cada linha e contém a lógica de correspondência principal. Esse método é executado exatamente uma vez para cada linha na partição de entrada e cada execução é executada pela instância correspondente daIpMatcherclasse UDTF para essa partição. - A
HANDLERcláusula especifica o nome da classe Python que implementa a lógica UDTF.
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
def __init__(self):
import ipaddress
# Heavy initialization - load networks once per partition
self.nets = []
cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
'2001:db8::/32', 'fe80::/10', '::1/128']
for cidr in cidrs:
self.nets.append(ipaddress.ip_network(cidr))
def eval(self, row):
import ipaddress
# Validate that required fields exist
required_fields = ['log_id', 'ip_address']
for field in required_fields:
if field not in row:
raise ValueError(f"Missing required field: {field}")
try:
ip = ipaddress.ip_address(row['ip_address'])
for net in self.nets:
if ip in net:
yield (row['log_id'], row['ip_address'], str(net), ip.version)
return
yield (row['log_id'], row['ip_address'], None, ip.version)
except ValueError:
yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;
Agora que ip_cidr_matcher está registrado no Catálogo do Unity, chame-o diretamente do SQL usando a TABLE() sintaxe:
-- Process all IP addresses
SELECT
*
FROM
ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
log_id;
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address | network | ip_version |
+--------+-------------------------------+-----------------+-------------+
| log1 | 192.168.1.100 | 192.168.0.0/16 | 4 |
| log2 | 10.0.0.5 | 10.0.0.0/8 | 4 |
| log3 | 172.16.0.10 | 172.16.0.0/12 | 4 |
| log4 | 8.8.8.8 | null | 4 |
| log5 | 2001:db8::1 | 2001:db8::/32 | 6 |
| log6 | 2001:db8:85a3::8a2e:370:7334 | 2001:db8::/32 | 6 |
| log7 | fe80::1 | fe80::/10 | 6 |
| log8 | ::1 | ::1/128 | 6 |
| log9 | 2001:db8:1234:5678::1 | 2001:db8::/32 | 6 |
+--------+-------------------------------+-----------------+-------------+
Exemplo: Legenda de imagem em lote usando pontos de extremidade de visão do Azure Databricks
Este exemplo demonstra a geração de legendas para imagens em lote usando um ponto de extremidade de serviço de modelos de visão computacional do Azure Databricks. Ele mostra o uso terminate() para processamento em lote e execução baseada em partição.
Crie uma tabela com URLs de imagem pública:
CREATE OR REPLACE TEMPORARY VIEW sample_images AS VALUES ('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery') images(image_url, category);Crie um UDTF do Catálogo do Unity em Python para gerar legendas de imagem.
- Inicialize o UDTF com as configurações, incluindo o tamanho do lote, o token de API do Azure Databricks, o ponto de extremidade do modelo de visão computacional e a URL do workspace.
- No método
eval, colete as URLs das imagens em um buffer. Quando o buffer atingir o tamanho do lote, dispare o processamento em lote. Isso garante que várias imagens sejam processadas juntas em uma única chamada à API em vez de chamadas individuais por imagem. - No método de processamento em lote, baixe todas as imagens em buffer, codifique-as como base64 e envie-as para uma única solicitação de API para o Databricks VisionModel. O modelo processa todas as imagens simultaneamente e retorna legendas para todo o lote.
- O
terminatemétodo é executado exatamente uma vez no final de cada partição. No método de término, processe as imagens restantes no buffer e produza todas as legendas coletadas como resultados.
Observação
Substitua <workspace-url> pela URL real do workspace do Azure Databricks (https://your-workspace.cloud.databricks.com).
CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
def __init__(self):
self.batch_size = 3
self.vision_endpoint = "databricks-claude-sonnet-4-5"
self.workspace_url = "<workspace-url>"
self.image_buffer = []
self.results = []
def eval(self, row, api_token):
self.image_buffer.append((str(row[0]), api_token))
if len(self.image_buffer) >= self.batch_size:
self._process_batch()
def terminate(self):
if self.image_buffer:
self._process_batch()
for caption in self.results:
yield (caption,)
def _process_batch(self):
batch_data = self.image_buffer.copy()
self.image_buffer.clear()
import base64
import httpx
import requests
# API request timeout in seconds
api_timeout = 60
# Maximum tokens for vision model response
max_response_tokens = 300
# Temperature controls randomness (lower = more deterministic)
model_temperature = 0.3
# create a batch for the images
batch_images = []
api_token = batch_data[0][1] if batch_data else None
for image_url, _ in batch_data:
image_response = httpx.get(image_url, timeout=15)
image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
batch_images.append(image_data)
content_items = [{
"type": "text",
"text": "Provide brief captions for these images, one per line."
}]
for img_data in batch_images:
content_items.append({
"type": "image_url",
"image_url": {
"url": "data:image/jpeg;base64," + img_data
}
})
payload = {
"messages": [{
"role": "user",
"content": content_items
}],
"max_tokens": max_response_tokens,
"temperature": model_temperature
}
response = requests.post(
self.workspace_url + "/serving-endpoints/" +
self.vision_endpoint + "/invocations",
headers={
'Authorization': 'Bearer ' + api_token,
'Content-Type': 'application/json'
},
json=payload,
timeout=api_timeout
)
result = response.json()
batch_response = result['choices'][0]['message']['content'].strip()
lines = batch_response.split('\n')
captions = [line.strip() for line in lines if line.strip()]
while len(captions) < len(batch_data):
captions.append(batch_response)
self.results.extend(captions[:len(batch_data)])
$$;
Para usar a legenda de imagem do lote UDTF, chame-a usando a tabela de imagens de exemplo:
Observação
Substitua your_secret_scope e api_token pelo escopo do segredo e pelo nome da chave reais para o token de API do Databricks.
SELECT
caption
FROM
batch_inference_image_caption(
data => TABLE(sample_images),
api_token => secret('your_secret_scope', 'api_token')
)
+---------------------------------------------------------------------------------------------------------------+
| caption |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies |
| Black ant in detailed macro photography standing on a textured surface |
| Tabby cat lounging comfortably on a white ledge against a white wall |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+
Você também pode gerar legendas de imagem categoria por categoria:
SELECT
*
FROM
batch_inference_image_caption(
TABLE(sample_images)
PARTITION BY category ORDER BY (category),
secret('your_secret_scope', 'api_token')
)
+------------------------------------------------------------------------------------------------------+
| caption |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space. |
| Tabby cat lounging comfortably on white ledge against white wall |
| Wooden boardwalk cutting through lush wetland grasses under blue skies |
+------------------------------------------------------------------------------------------------------+
Exemplo: curva ROC e computação AUC para avaliação de modelo de ML
Este exemplo demonstra o cálculo de curvas ROC (receiver operating characteristic) e as pontuações da área sob a curva (AUC) para a avaliação de modelos de classificação binária usando scikit-learn.
Este exemplo mostra vários padrões importantes:
- Uso da biblioteca externa: integra o scikit-learn para computação de curva ROC
- Agregação stateful: acumula previsões em todas as linhas antes de computar as métricas
-
terminate()uso do método: processa o conjunto de dados completo e produz resultados somente após todas as linhas terem sido avaliadas - Tratamento de erros: valida as colunas necessárias na tabela de entrada
O UDTF acumula todas as previsões na memória usando o eval() método, em seguida, calcula e produz a curva ROC completa no terminate() método. Esse padrão é útil para métricas que exigem o conjunto de dados completo para cálculo.
CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
def __init__(self):
from sklearn import metrics
self._roc_curve = metrics.roc_curve
self._roc_auc_score = metrics.roc_auc_score
self._true_labels = []
self._predicted_scores = []
def eval(self, row):
if 'y_true' not in row or 'y_score' not in row:
raise KeyError("Required columns 'y_true' and 'y_score' not found")
true_label = row['y_true']
predicted_score = row['y_score']
label = float(true_label)
self._true_labels.append(label)
self._predicted_scores.append(float(predicted_score))
def terminate(self):
false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
self._true_labels,
self._predicted_scores,
drop_intermediate=False
)
auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))
for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
yield float(threshold), float(tpr), float(fpr), auc_score
$$;
Crie dados de classificação binária de exemplo com previsões:
CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
( 1, 1.0, 0.95, 'high_confidence_positive'),
( 2, 1.0, 0.87, 'high_confidence_positive'),
( 3, 1.0, 0.82, 'medium_confidence_positive'),
( 4, 0.0, 0.78, 'false_positive'),
( 5, 1.0, 0.71, 'medium_confidence_positive'),
( 6, 0.0, 0.65, 'false_positive'),
( 7, 0.0, 0.58, 'true_negative'),
( 8, 1.0, 0.52, 'low_confidence_positive'),
( 9, 0.0, 0.45, 'true_negative'),
(10, 0.0, 0.38, 'true_negative'),
(11, 1.0, 0.31, 'low_confidence_positive'),
(12, 0.0, 0.15, 'true_negative'),
(13, 0.0, 0.08, 'high_confidence_negative'),
(14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);
Compute a curva ROC e a AUC:
SELECT
threshold,
true_positive_rate,
false_positive_rate,
auc
FROM compute_roc_curve(
TABLE(
SELECT y_true, y_score
FROM binary_classification_data
WHERE y_true IS NOT NULL AND y_score IS NOT NULL
ORDER BY sample_id
)
)
ORDER BY threshold DESC;
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate | false_positive_rate | auc |
+-----------+---------------------+----------------------+-------+
| 1.95 | 0.0 | 0.0 | 0.786 |
| 0.95 | 0.167 | 0.0 | 0.786 |
| 0.87 | 0.333 | 0.0 | 0.786 |
| 0.82 | 0.5 | 0.0 | 0.786 |
| 0.78 | 0.5 | 0.125 | 0.786 |
| 0.71 | 0.667 | 0.125 | 0.786 |
| 0.65 | 0.667 | 0.25 | 0.786 |
| 0.58 | 0.667 | 0.375 | 0.786 |
| 0.52 | 0.833 | 0.375 | 0.786 |
| 0.45 | 0.833 | 0.5 | 0.786 |
| 0.38 | 0.833 | 0.625 | 0.786 |
| 0.31 | 1.0 | 0.625 | 0.786 |
| 0.15 | 1.0 | 0.75 | 0.786 |
| 0.08 | 1.0 | 0.875 | 0.786 |
| 0.03 | 1.0 | 1.0 | 0.786 |
+-----------+---------------------+----------------------+-------+
Limitações
As seguintes limitações se aplicam às UDTFs do Python do Catálogo do Unity:
- Não há suporte para funções de tabela polimórficas.
- Não há suporte para credenciais de serviço do Catálogo do Unity.
- Não há suporte para dependências personalizadas.