Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Importante
La sink API está en versión preliminar pública.
En esta página se describe la API de canalizaciones declarativas de Spark de Lakeflow y cómo usarla con sink para escribir registros transformados por una canalización en un receptor de datos externo. Los receptores de datos externos incluyen tablas externas y administradas de Unity Catalog, y servicios de streaming de eventos como Apache Kafka o Azure Event Hubs. También puede usar sumideros de datos para escribir en destinos de datos personalizados escribiendo código Python para ese destino de datos.
Nota:
- La
sinkAPI solo está disponible para Python. - Puede crear un sink personalizado con la API ForEachBatch. Consulte Uso de ForEachBatch para escribir en receptores de datos arbitrarios en canalizaciones.
¿Qué son los sumideros?
Los sumideros son destinos para los flujos de una tubería. De forma predeterminada, los flujos de canalización emiten datos a una tabla de transmisión o una vista materializada como destino. Estas son tablas delta administradas de Azure Databricks. Los sink son un destino alternativo que se utilizan para escribir datos transformados en destinos tales como servicios de transmisión de eventos, Apache Kafka o Azure Event Hubs, y tablas externas administradas por Unity Catalog. Con los sumideros, ahora dispone de más opciones para persistir el resultado de la canalización.
¿Cuándo debo usar sumideros?
Databricks recomienda usar sinks si necesita:
- Cree un caso de uso operativo, como la detección de fraudes, el análisis en tiempo real y las recomendaciones de los clientes. Los casos de uso operativos suelen leer datos de un bus de mensajes, como un tema de Apache Kafka, y luego procesar datos con baja latencia y volver a escribir los registros procesados en un bus de mensajes. Este enfoque le permite lograr una menor latencia al no escribir ni leer desde el almacenamiento en la nube.
- Escriba datos transformados desde sus flujos a tablas gestionadas por una instancia Delta externa, incluidas las tablas administradas por el Unity Catalog y las tablas externas.
- Realice la carga inversa de extracción y transformación (ETL) en receptores externos a Databricks, como temas de Apache Kafka. Este enfoque le permite admitir eficazmente casos de uso en los que los datos deben leerse o usarse fuera de las tablas del Catálogo de Unity u otro almacenamiento administrado por Databricks.
- Debe escribir en un formato de datos que Azure Databricks no admita directamente. Los orígenes de datos personalizados de Python permiten crear un receptor que escriba en cualquier origen de datos mediante código de Python personalizado. Consulte Orígenes de datos personalizados de PySpark.
¿Cómo uso los sumideros?
A medida que los datos de eventos se ingieren desde un origen de streaming en su canalización, usted procesa y refina estos datos mediante las transformaciones de su canalización. Después, use el procesamiento de flujo de anexión para transmitir los registros de datos transformados a un receptor. Cree este receptor mediante la función create_sink(). Para obtener más información sobre la función create_sink, consulte la referencia de API de receptor.
Si tiene una canalización que crea o procesa los datos de eventos de streaming y prepara los registros de datos para escribirlos, entonces estará listo para usar un sumidero.
La implementación de un sumidero consta de dos pasos:
- Cree el sumidero.
- Use un flujo de anexión para escribir los registros preparados en el receptor.
Creación de un receptor
Databricks admite varios tipos de receptores de destino en los que se escriben los registros procesados a partir de los datos de flujo:
- Receptores de tablas delta (incluidas las tablas externas y administradas por Unity Catalog)
- Receptores de Apache Kafka
- Receptores de Azure Event Hubs
- Sumideros personalizados escritos en Python mediante los orígenes de datos personalizados de Python
A continuación se muestran ejemplos de configuraciones para receptores de Delta, Kafka y Azure Event Hubs y orígenes de datos personalizados de Python:
Receptor delta
Para crear un receptor Delta por ruta de acceso de archivo:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Para crear un receptor Delta por nombre de tabla mediante un catálogo completo y una ruta de acceso de esquema:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
Receptores de Kafka y Azure Event Hubs
Este código funciona tanto para los receptores de Apache Kafka como para Azure Event Hubs.
credential_name = "<service-credential>"
eh_namespace_name = "dp-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
topic_name = "dp-sink"
dp.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"databricks.serviceCredential": credential_name,
"kafka.bootstrap.servers": bootstrap_servers,
"topic": topic_name
}
)
El credential_name es una referencia a una credencial de servicio del Catálogo de Unity. Para obtener más información, consulte Uso de credenciales de servicio del catálogo de Unity para conectarse a servicios en la nube externos.
Orígenes de datos personalizados de Python
Suponiendo que tiene un origen de datos personalizado de Python registrado como my_custom_datasource, el código siguiente puede escribir en ese origen de datos.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python streaming
# data source that writes data to your system.
# Create LDP sink using my_custom_datasource
dp.create_sink(
name="custom_sink",
format="my_custom_datasource",
options={
<options-needed-for-custom-datasource>
}
)
# Create append flow to send data to RequestBin
@dp.append_flow(name="flow_to_custom_sink", target="custom_sink")
def flow_to_custom_sink():
return read_stream("my_source_data")
Para más información sobre cómo crear orígenes de datos personalizados en Python, consulte Orígenes de datos personalizados de PySpark.
Para obtener más información sobre el uso de la función create_sink, consulte la Referencia de API de receptor .
Una vez creado el receptor, puede comenzar a transmitir los registros procesados al receptor.
Escritura en un receptor con un flujo de anexión
Una vez creado el receptor, el siguiente paso consiste en escribir en él los registros procesados especificándolo como destino de los registros emitidos por un flujo de anexión. Para ello, especifique el receptor como targetvalor en el decorador append_flow.
- En el caso de las tablas externas y administradas del catálogo de Unity, use el formato
deltay especifique la ruta de acceso o el nombre de la tabla en las opciones. La canalización debe configurarse para usar el catálogo de Unity. - Para temas de Apache Kafka, use el formato
kafkay especifique el nombre del tema, la información de conexión y la información de autenticación en las opciones. Estas son las mismas opciones que admite un receptor de Kafka de Spark Structured Streaming. Consulte Configurar el escritor de Kafka Structured Streaming. - Para Azure Event Hubs, use el formato
kafkay especifique el nombre, la información de conexión y la información de autenticación de Event Hubs en las opciones. Estas son las mismas opciones admitidas en un receptor de Event Hubs de Spark Structured Streaming que usa la interfaz de Kafka. Consulte Autenticación de entidad de servicio con Microsoft Entra ID y Azure Event Hubs.
A continuación se muestran ejemplos de cómo configurar flujos para escribir en destinos Delta, Kafka y Azure Event Hubs con registros procesados por su canalización.
Receptor delta
@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Receptores de Kafka y Azure Event Hubs
@dp.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
El parámetro value es obligatorio para un receptor de Azure Event Hubs. Los parámetros adicionales, como key, partition, headersy topic son opcionales.
Para obtener más información sobre el decorador append_flow, consulte Usar varios flujos para escribir en un único destino.
Limitaciones
Solo se admite la API de Python. SQL no se admite.
Solo se admiten consultas de streaming. No se admiten consultas por lotes.
Solo se pueden usar
append_flowpara escribir en receptores. No se admiten otros flujos, comocreate_auto_cdc_flow, y no se puede usar un sumidero en una definición de conjunto de datos para canalizaciones. Por ejemplo, no se admite lo siguiente:@table("from_sink_table") def fromSink(): return read_stream("my_sink")En el caso de los receptores delta, el nombre de la tabla debe estar completo. En concreto, para las tablas externas administradas por el catálogo de Unity, el nombre de la tabla debe tener el formato
<catalog>.<schema>.<table>. Para el metastore de Hive, debe tener el formato<schema>.<table>.La ejecución de una actualización completa no limpia los datos de resultados calculados previamente en los receptores. Esto significa que los datos reprocesados se anexan al receptor y no se modifican los datos existentes.
No se admiten las expectativas de canalización.