Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
A sink API está em Visualização Pública.
Esta página descreve a API de Pipelines Declarativos sink do Lakeflow Spark e como usá-la com fluxos para gravar registros transformados por um pipeline em um coletor de dados externo. Os destinos de dados externos incluem tabelas gerenciadas e externas do Unity Catalog e serviços de streaming de eventos, como Apache Kafka ou Hubs de Eventos do Azure. Você também pode usar coletores de dados para gravar em fontes de dados personalizadas escrevendo código Python para essa fonte de dados.
Observação
- A
sinkAPI só está disponível para Python. - Você pode criar um coletor personalizado com a API ForEachBatch. Consulte Use ForEachBatch para gravar em coletores de dados arbitrários em pipelines.
O que são coletores?
Coletores são destinos para fluxos em um pipeline. Por padrão, os fluxos de pipeline emitem dados para uma tabela de streaming ou um destino de exibição materializado. Essas duas são tabelas Delta gerenciadas do Azure Databricks. Sinks são um alvo alternativo que você usa para gravar dados transformados em alvos como serviços de streaming de eventos como Apache Kafka ou Hubs de Eventos do Azure e tabelas externas gerenciadas pelo Unity Catalog. Usando coletores, agora você tem mais opções para persistir a saída do pipeline.
Quando devo usar sinks?
O Databricks recomenda o uso de coletores quando você precisar:
- Crie um caso de uso operacional, como detecção de fraude, análise em tempo real e recomendações do cliente. Os casos de uso operacional normalmente leem dados de um barramento de mensagens, como um tópico do Apache Kafka, e processam dados com baixa latência e gravam os registros processados de volta em um barramento de mensagens. Essa abordagem permite que você obtenha uma latência menor não gravando ou lendo do armazenamento em nuvem.
- Escreva dados transformados de seus fluxos para tabelas gerenciadas por uma instância Delta externa, incluindo tabelas externas e gerenciadas pelo Unity Catalog.
- Executar extração, transformação e carregamento (ETL) reversa em coletores externos ao Databricks, como tópicos do Apache Kafka. Essa abordagem permite que você dê suporte efetivamente a casos de uso em que os dados precisam ser lidos ou usados fora das tabelas do Catálogo do Unity ou de outro armazenamento gerenciado pelo Databricks.
- É necessário gravar em um formato de dados que não tenha suporte diretamente pelo Azure Databricks. As fontes de dados personalizadas do Python permitem criar um coletor que grava em qualquer fonte de dados usando código Python personalizado. Consulte fontes de dados personalizadas do PySpark.
Como posso usar sumidouros?
À medida que os dados de evento são ingeridos de uma fonte de streaming em seu pipeline, você processa e refina esses dados em transformações em seu pipeline. Em seguida, você usa o processamento de fluxo de acréscimo para transmitir os registros de dados transformados para um coletor. Você cria esse coletor usando a função create_sink(). Para obter mais detalhes sobre a create_sink função, consulte a referência da API do coletor.
Se você tiver um pipeline que crie ou processe seus dados de evento de streaming e prepare registros de dados para gravação, estará pronto para usar um coletor.
A implementação de um coletor consiste em duas etapas:
- Crie o coletor.
- Use um fluxo de acréscimo para gravar os registros preparados no coletor.
Criar um coletor
O Databricks dá suporte a vários tipos de coletores de destino nos quais você grava seus registros processados de seus dados de fluxo:
- Coletores de tabela delta (incluindo tabelas gerenciadas e externas do Catálogo do Unity)
- Coletores do Apache Kafka
- Coletores dos Hubs de Eventos do Azure
- Sinks personalizados escritos em Python, utilizando fontes de dados personalizadas do Python
Abaixo estão exemplos de configurações para coletores Delta, Kafka e Hubs de Eventos do Azure e fontes de dados personalizadas do Python:
Coletores Delta
Para criar um coletor Delta por caminho de arquivo:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Para criar um coletor Delta por nome de tabela usando um catálogo e caminho de esquema totalmente qualificados:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
Coletores do Kafka e Hubs de Eventos do Azure
Este código funciona tanto para sinks do Apache Kafka quanto do Azure Event Hubs.
credential_name = "<service-credential>"
eh_namespace_name = "dp-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
topic_name = "dp-sink"
dp.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"databricks.serviceCredential": credential_name,
"kafka.bootstrap.servers": bootstrap_servers,
"topic": topic_name
}
)
O credential_name é uma referência a uma credencial de serviço do Catálogo do Unity. Para obter mais informações, consulte Usar as credenciais de serviço do Catálogo do Unity para se conectar a serviços de nuvem externos.
Fontes de dados personalizadas do Python
Supondo que você tenha uma fonte de dados personalizada do Python registrada como my_custom_datasource, o código a seguir pode gravar nessa fonte de dados.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python streaming
# data source that writes data to your system.
# Create LDP sink using my_custom_datasource
dp.create_sink(
name="custom_sink",
format="my_custom_datasource",
options={
<options-needed-for-custom-datasource>
}
)
# Create append flow to send data to RequestBin
@dp.append_flow(name="flow_to_custom_sink", target="custom_sink")
def flow_to_custom_sink():
return read_stream("my_source_data")
Para obter detalhes sobre como criar fontes de dados personalizadas no Python, consulte fontes de dados personalizadas do PySpark.
Para obter mais detalhes sobre como usar a função create_sink, confira a referência de API do coletor.
Depois que o coletor for criado, você poderá começar a transmitir registros processados para o coletor.
Gravar em um coletor com um fluxo de acréscimo
Com o coletor criado, a próxima etapa é gravar registros processados nele especificando-o como o destino para a saída de registros por um fluxo de acréscimo. Faça isso especificando o coletor como o valor target no decorador append_flow.
- Para tabelas gerenciadas e externas do Catálogo do Unity, use o formato
deltae especifique o caminho ou o nome da tabela nas opções. Seu pipeline deve ser configurado para usar o Catálogo do Unity. - Para tópicos do Apache Kafka, use o formato
kafkae especifique o nome do tópico, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções às quais um coletor do Kafka de Streaming Estruturado do Spark dá suporte. Confira Configurar o gravador de Streaming Estruturado do Kafka. - Para Hubs de Eventos do Azure, use o formato
kafkae especifique o nome dos Hubs de Eventos, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções com suporte em um coletor de Hubs de Eventos de Streaming Estruturado do Spark que usa a interface do Kafka. Confira Autenticação de Entidade de Serviço com Microsoft Entra ID e Hubs de Eventos do Azure.
Abaixo estão exemplos de como configurar fluxos para gravar em coletores Delta, Kafka e Hubs de Eventos do Azure com registros processados pelo pipeline.
Coletor Delta
@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Coletores do Kafka e Hubs de Eventos do Azure
@dp.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
O parâmetro value é obrigatório para um coletor dos Hubs de Eventos do Azure. Parâmetros adicionais como key, partition, headerse topic são opcionais.
Para mais detalhes sobre o decorador append_flow, consulte Como usar vários fluxos para gravar em um único destino.
Limitações
Há suporte apenas para a API do Python. Não há suporte para SQL.
Há suporte apenas para consultas de streaming. Não há suporte para consultas em lote.
Somente
append_flowpode ser usado para gravar em coletores. Outros fluxos, comocreate_auto_cdc_flow, não têm suporte e você não pode usar um coletor em uma definição de conjunto de dados de pipeline. Por exemplo, não há suporte para o seguinte:@table("from_sink_table") def fromSink(): return read_stream("my_sink")Para os coletores Delta, o nome da tabela deve ser totalmente qualificado. Especificamente, para tabelas externas gerenciadas do Unity Catalog, o nome da tabela deve ser no formato
<catalog>.<schema>.<table>. Para o metastore do Hive, ele deve estar no formato<schema>.<table>.A execução de uma atualização completa não limpa os dados de resultados computados anteriormente nos coletores. Isso significa que todos os dados reprocessados são acrescentados ao coletor e os dados existentes não são alterados.
Não há suporte para expectativas de pipeline.