Partilhar via


Utilização de sumidouros em pipelines

Importante

A sink API está em Visualização Pública.

Esta página descreve a API Lakeflow Spark Declarative Pipelines sink e como a utilizar com fluxos para escrever registos transformados por um pipeline em um destino de dados externo. Os coletores de dados externos incluem tabelas gerenciadas e externas do Unity Catalog e serviços de streaming de eventos, como Apache Kafka ou Hubs de Eventos do Azure. Também podes usar dissipadores de dados para escrever em fontes de dados personalizadas, escrevendo código Python para essa fonte de dados.

Observação

O que são pias?

Os sumidouros são alvos de fluxos em um gasoduto. Por padrão, os fluxos de pipeline emitem dados para uma tabela de streaming ou para um destino de exibição materializado. Estas são ambas as tabelas Delta geridas pelo Azure Databricks. Os coletores são um destino alternativo que você usa para gravar dados transformados em destinos, como serviços de streaming de eventos, como Apache Kafka ou Hubs de Eventos do Azure, e tabelas externas gerenciadas pelo Unity Catalog. Usando coletores, agora você tem mais opções para persistir a saída do seu pipeline.

Quando devo usar lavatórios?

A Databricks recomenda o uso de pias se você precisar:

  • Crie um caso de uso operacional, como deteção de fraudes, análises em tempo real e recomendações de clientes. Os casos de uso operacionais normalmente leem dados de um barramento de mensagens, como um tópico no Apache Kafka, processam dados com baixa latência e escrevem os registos processados de volta para um barramento de mensagens. Essa abordagem permite que você obtenha latência mais baixa ao não escrever ou ler a partir do armazenamento em nuvem.
  • Escreva dados transformados de seus fluxos em tabelas gerenciadas por uma instância Delta externa, incluindo tabelas gerenciadas pelo Unity Catalog e tabelas externas.
  • Execute ETL (reverse extract-transform-load) em coletores externos ao Databricks, como tópicos do Apache Kafka. Essa abordagem permite que você ofereça suporte eficaz a casos de uso em que os dados precisam ser lidos ou usados fora das tabelas do Unity Catalog ou de outro armazenamento gerenciado pelo Databricks.
  • Precisa gravar em um formato de dados que não é suportado diretamente pelo Azure Databricks. As fontes de dados personalizadas do Python permitem que você crie um coletor que grava em qualquer fonte de dados usando código Python personalizado. Consulte Fontes de dados personalizadas do PySpark.

Como usar lavatórios?

À medida que os dados de eventos são ingeridos de uma fonte de streaming para o seu pipeline, processa e refina esses dados nas suas transformações no pipeline. Depois, utiliza-se o processamento de fluxo de adição para transmitir os registos de dados transformados para um destino. Você cria esse coletor usando a função create_sink(). Para obter mais detalhes sobre a create_sink função, consulte a referência da API do coletor.

Se você tiver um pipeline que cria ou processa seus dados de eventos de streaming e prepara registros de dados para gravação, então você está pronto para usar um coletor.

A implementação de um coletor consiste em duas etapas:

  1. Crie a pia.
  2. Use um fluxo de anexação para gravar os registos preparados no coletor.

Criar um lavatório

O Databricks suporta vários tipos de coletores de destino nos quais você grava seus registros processados a partir de seus dados de fluxo:

  • Repositórios de tabelas Delta (incluindo tabelas do Unity Catalog geridos e externos)
  • Pias Apache Kafka
  • Destinos dos Hubs de Eventos do Azure
  • Coletores personalizados escritos em Python, usando as fontes de dados personalizadas do Python

Abaixo estão exemplos de configurações para coletores de Hubs de Eventos Delta, Kafka e Azure e fontes de dados personalizadas do Python:

Delta afunda

Para criar um depósito Delta usando o caminho do arquivo:

dp.create_sink(
  name = "delta_sink",
  format = "delta",
  options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

Para criar um coletor Delta por nome de tabela usando um catálogo totalmente qualificado e um caminho de esquema:

dp.create_sink(
  name = "delta_sink",
  format = "delta",
  options = { "tableName": "catalog_name.schema_name.table_name" }
)

Kafka e pontos de destino do Azure Event Hubs

Esse código funciona para coletores do Apache Kafka e dos Hubs de Eventos do Azure.

credential_name = "<service-credential>"
eh_namespace_name = "dp-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
topic_name = "dp-sink"

dp.create_sink(
name = "eh_sink",
format = "kafka",
options = {
    "databricks.serviceCredential": credential_name,
    "kafka.bootstrap.servers": bootstrap_servers,
    "topic": topic_name
  }
)

O credential_name é uma referência a uma credencial de serviço do Catálogo Unity. Para obter mais informações, consulte Usar credenciais de serviço do Unity Catalog para se conectar a serviços de nuvem externos.

Fontes de dados personalizadas do Python

Supondo que você tenha uma fonte de dados personalizada do Python registrada como my_custom_datasource, o código a seguir pode gravar nessa fonte de dados.

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python streaming
# data source that writes data to your system.

# Create LDP sink using my_custom_datasource
dp.create_sink(
    name="custom_sink",
    format="my_custom_datasource",
    options={
        <options-needed-for-custom-datasource>
    }
)

# Create append flow to send data to RequestBin
@dp.append_flow(name="flow_to_custom_sink", target="custom_sink")
def flow_to_custom_sink():
    return read_stream("my_source_data")

Para detalhes sobre a criação de fontes de dados personalizadas em Python, consulte PySpark de fontes de dados personalizadas.

Para obter mais detalhes sobre como usar a função create_sink, consulte a referência da API do coletor .

Depois que o coletor for criado, você poderá começar a transmitir os registros processados para o coletor.

Gravar em um coletor com um fluxo de acréscimo

Com o seu destino de dados criado, a próxima etapa é gravar registos processados nele, especificando-o como o alvo para a saída de registos num fluxo de acrescento. Você faz isso especificando sua pia como o valor target no decorador de append_flow.

  • Para tabelas gerenciadas e externas do Unity Catalog, use o formato delta e especifique o caminho ou o nome da tabela nas opções. O seu pipeline deve ser configurado para usar o Unity Catalog.
  • Para tópicos do Apache Kafka, use o formato kafka e especifique o nome do tópico, as informações de conexão e as informações de autenticação nas opções. Estas são as mesmas opções que um coletor Spark Structured Streaming Kafka suporta. Consulte Configuração do gravador de Kafka Structured Streaming.
  • Para Hubs de Eventos do Azure, use o formato kafka e especifique o nome, as informações de conexão e as informações de autenticação dos Hubs de Eventos nas opções. Essas são as mesmas opções suportadas em um coletor de Hubs de Eventos do Spark Structured Streaming que usa a interface Kafka. Consulte Autenticação do Principal do Serviço com o Microsoft Entra ID e Azure Event Hubs.

Abaixo estão exemplos de como configurar fluxos para gravar em coletores de Hubs de Eventos Delta, Kafka e Azure com registros processados pelo seu pipeline.

Lava-louça Delta

@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
  return(
  spark.readStream.table("spark_referrers")
  .selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

Kafka e pontos de destino do Azure Event Hubs

@dp.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
  spark.readStream.table("spark_referrers")
  .selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)

O parâmetro value é obrigatório para um coletor de Hubs de Eventos do Azure. Parâmetros adicionais, como key, partition, headerse topic são opcionais.

Para obter mais detalhes sobre o append_flow decorador, consulte Usando vários fluxos para gravar em um único destino.

Limitações

  • Apenas a API Python é suportada. SQL não é suportado.

  • Apenas consultas de streaming são suportadas. Não há suporte para consultas em lote.

  • Apenas append_flow pode ser usado para gravar em pias. Outros fluxos, como create_auto_cdc_flow, não são suportados, e não pode usar um sumidouro na definição de um conjunto de dados de pipeline. Por exemplo, o seguinte não é suportado:

    @table("from_sink_table")
    def fromSink():
      return read_stream("my_sink")
    
  • Para dissipadores Delta, o nome da tabela deve ser totalmente qualificado. Especificamente, para tabelas externas gerenciadas pelo Unity Catalog, o nome da tabela deve ter o formato <catalog>.<schema>.<table>. Para o metastore do Hive, ele deve estar na forma <schema>.<table>.

  • A execução de uma atualização completa não limpa os dados de resultados computados anteriormente nos destinos de dados. Isto significa que quaisquer dados reprocessados são adicionados ao sumidouro, e os dados existentes não são alterados.

  • As expectativas do pipeline não são suportadas.

Recursos