Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
A API do pipeline create_sink está em Visualização Pública.
A função create_sink() grava em um serviço de streaming de eventos, como o Apache Kafka, Azure Event Hubs, ou em uma tabela Delta a partir de um pipeline declarativo. Depois de criar um coletor com a função create_sink(), use o coletor em um fluxo de acréscimo para gravar dados no coletor. o fluxo de acréscimo é o único tipo de fluxo com suporte na função create_sink(). Não há suporte para outros tipos de fluxo, como create_auto_cdc_flow.
O coletor Delta dá suporte às tabelas externas e gerenciadas do Catálogo do Unity e às tabelas gerenciadas do metastore do Hive. Os nomes de tabela devem ser totalmente qualificados. Por exemplo, as tabelas do Catálogo do Unity devem usar um identificador de três camadas: <catalog>.<schema>.<table>. As tabelas do metastore do Hive devem usar <schema>.<table>.
Observação
- A execução de uma atualização completa não limpa os dados dos coletores. Todos os dados reprocessados serão acrescentados ao coletor e os dados existentes não serão alterados.
- Não há suporte para expectativas com a
sinkAPI.
Sintaxe
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Parâmetros
| Parâmetro | Tipo | Description |
|---|---|---|
name |
str |
Obrigatório Uma cadeia de caracteres que identifica o coletor e é usada para referenciar e gerenciar o coletor. Os nomes dos "sinks" devem ser exclusivos dentro do pipeline, incluindo todos os arquivos de código-fonte que são parte do pipeline. |
format |
str |
Obrigatório Uma cadeia de caracteres que define o formato de saída, kafka ou delta. |
options |
dict |
Uma lista de opções de coletor, formatada como {"key": "value"}, em que a chave e o valor são cadeias de caracteres. Todas as opções do Databricks Runtime compatíveis com os coletores Kafka e Delta têm suporte.
|
Exemplos
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)