Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Important
L’API sink est en préversion publique.
Cette page décrit l’API Pipelines déclaratifs Spark Lakeflow et explique comment l’utiliser avec sink pour écrire des enregistrements transformés par un pipeline vers un récepteur de données externe. Les récepteurs de données externes incluent les tables managées et externes du catalogue Unity, ainsi que les services de diffusion en continu d’événements tels qu’Apache Kafka ou Azure Event Hubs. Vous pouvez également utiliser des récepteurs de données pour écrire dans des sources de données personnalisées en écrivant du code Python pour cette source de données.
Note
- L’API
sinkest disponible uniquement pour Python. - Vous pouvez créer un sink personnalisé avec l’API ForEachBatch. Consultez l'utilisation de ForEachBatch pour écrire vers des destinations de données arbitraires dans des pipelines.
Qu’est-ce que des éviers ?
Les récepteurs sont des cibles pour les flux dans un pipeline. Par défaut, les flux de pipeline émettent des données vers une table de diffusion en continu ou une cible de vue matérialisée. Il s’agit des deux tables Delta gérées par Azure Databricks. Les puits de données sont une cible alternative que vous utilisez pour écrire des données transformées vers des cibles telles que des services de streaming d’événements comme Apache Kafka ou Azure Event Hubs, et des tables externes gérées par Unity Catalog. À l’aide de récepteurs, vous disposez maintenant d’autres options pour conserver la sortie de votre pipeline.
Quand dois-je utiliser des collecteurs ?
Databricks recommande d’utiliser des récepteurs si vous devez :
- Créez un cas d’usage opérationnel comme la détection des fraudes, l’analytique en temps réel et les recommandations des clients. Les cas d’usage opérationnels lisent généralement des données à partir d’un bus de messages, comme une rubrique Apache Kafka, puis traitent les données avec une faible latence et écrivent les enregistrements traités dans un bus de messages. Cette approche vous permet d’obtenir une latence inférieure en n’écrivant pas ou en lisant à partir du stockage cloud.
- Écrivez des données transformées de vos flux vers des tables gérées par une instance Delta externe, y compris les tables gérées par le catalogue Unity et les tables externes.
- Effectuer une opération ETL (extraction, transformation et chargement) inverse dans des récepteurs externes à Databricks, par exemple des rubriques Apache Kafka. Cette approche vous permet de prendre en charge efficacement les cas d’utilisation où les données doivent être lues ou utilisées en dehors des tables du catalogue Unity ou d’autres stockages gérés par Databricks.
- Vous devez écrire dans un format de données qui n’est pas pris en charge directement par Azure Databricks. Les sources de données personnalisées Python vous permettent de créer un puits qui écrit dans n’importe quelle source de données à l’aide du code Python personnalisé. Consultez les sources de données personnalisées PySpark.
Comment utiliser des éviers ?
À mesure que les données d’événement sont ingérées à partir d’une source de diffusion en continu dans votre pipeline, vous traitez et affinez ces données dans les transformations de votre pipeline. Vous utilisez ensuite le traitement de flux d’ajout pour diffuser les enregistrements de données transformés vers un puits. Vous créez ce sink à l’aide de la fonction create_sink(). Pour découvrir plus d’informations sur la fonction create_sink, consultez la référence de récepteur d’API.
Si vous disposez d'un pipeline qui crée ou traite vos données d'événements de diffusion en continu et prépare les enregistrements de données pour l'écriture, vous êtes prêt à utiliser un sink.
L’implémentation d’un puits se compose de deux étapes :
- Créez le récepteur.
- Utilisez un flux d’ajout pour écrire les enregistrements préparés dans le récepteur.
Créer un récepteur
Databricks prend en charge plusieurs types de destinations de sortie dans lesquelles vous écrivez vos enregistrements traités à partir de vos flux de données :
- Récepteurs de tables Delta (y compris les tables gérées par Unity Catalog et externes)
- Récepteurs Apache Kafka
- Récepteurs Azure Event Hubs
- Récepteurs personnalisés écrits en Python à l’aide des sources de données personnalisées Python
Vous trouverez ci-dessous des exemples de configurations pour les récepteurs Delta, Kafka et Azure Event Hubs et les sources de données personnalisées Python :
Récepteurs Delta
Pour créer un récepteur Delta par chemin d’accès de fichier :
dp.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Pour créer un récepteur Delta par nom de table à l’aide d’un chemin d’accès de catalogue et de schéma complet :
dp.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
Récepteurs Kafka et Azure Event Hubs
Ce code fonctionne à la fois pour les récepteurs Apache Kafka et les récepteurs Azure Event Hubs.
credential_name = "<service-credential>"
eh_namespace_name = "dp-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
topic_name = "dp-sink"
dp.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"databricks.serviceCredential": credential_name,
"kafka.bootstrap.servers": bootstrap_servers,
"topic": topic_name
}
)
Ceci credential_name est une référence à un identifiant de service Unity Catalog. Pour plus d’informations, consultez Utiliser les informations d’identification du service catalogue Unity pour se connecter aux services cloud externes.
Sources de données personnalisées Python
En supposant que vous disposez d’une source de données personnalisée Python inscrite en tant que my_custom_datasource, le code suivant peut écrire dans cette source de données.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python streaming
# data source that writes data to your system.
# Create LDP sink using my_custom_datasource
dp.create_sink(
name="custom_sink",
format="my_custom_datasource",
options={
<options-needed-for-custom-datasource>
}
)
# Create append flow to send data to RequestBin
@dp.append_flow(name="flow_to_custom_sink", target="custom_sink")
def flow_to_custom_sink():
return read_stream("my_source_data")
Pour plus d’informations sur la création de sources de données personnalisées dans Python, consultez les sources de données personnalisées PySpark.
Pour plus d’informations sur l’utilisation de la fonction create_sink, consultez la référence API sink.
Une fois votre récepteur créé, vous pouvez commencer à diffuser en continu des enregistrements traités dans le récepteur.
Écrire dans un récepteur avec un flux d’ajout
Une fois votre récepteur créé, l’étape suivante consiste à écrire des enregistrements traités dans celui-ci en le spécifiant comme cible pour les enregistrements générés par un flux d’ajout. Pour ce faire, spécifiez votre récepteur comme valeur target dans le décorateur append_flow.
- Pour les tables gérées et externes du catalogue Unity, utilisez le format
deltaet spécifiez le chemin d’accès ou le nom de la table dans les options. Votre pipeline doit être configuré pour utiliser le catalogue Unity. - Pour les rubriques Apache Kafka, utilisez le format
kafkaet spécifiez le nom de la rubrique, les informations de connexion et les informations d’authentification dans les options. Il s’agit des mêmes options que celles prises en charge par un récepteur Spark Structured Streaming Kafka. Consultez Configurer l’enregistreur Kafka Structured Streaming. - Pour Azure Event Hubs, utilisez le format
kafkaet spécifiez le nom d’Event Hubs, les informations de connexion et les informations d’authentification dans les options. Il s’agit des mêmes options prises en charge dans un récepteur Spark Structured Streaming Event Hubs qui utilise l’interface Kafka. Consultez Authentification du principal de service avec Microsoft Entra ID et Azure Event Hubs.
Vous trouverez ci-dessous des exemples de configuration de flux pour écrire dans des récepteurs Delta, Kafka et Azure Event Hubs avec des enregistrements traités par votre pipeline.
Récepteur Delta
@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Récepteurs Kafka et Azure Event Hubs
@dp.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
Le paramètre value est obligatoire pour un récepteur Azure Event Hubs. Des paramètres supplémentaires tels que key, partition, headerset topic sont facultatifs.
Pour découvrir plus d’informations sur l’élément décoratif append_flow, consultez Utilisation de plusieurs flux pour écrire dans une seule cible.
Limites
Seule l’API Python est prise en charge. SQL n’est pas pris en charge.
Seules les requêtes de diffusion en continu sont prises en charge. Les requêtes Batch ne sont pas prises en charge.
Vous pouvez uniquement utiliser
append_flowpour écrire dans des récepteurs. D'autres flux, tels quecreate_auto_cdc_flow, ne sont pas pris en charge et vous ne pouvez pas utiliser un puits dans une définition de jeu de données de pipeline. Par exemple, les éléments suivants ne sont pas pris en charge :@table("from_sink_table") def fromSink(): return read_stream("my_sink")Pour les récepteurs Delta, le nom de la table doit être complet. Plus précisément, pour les tables externes gérées par le catalogue Unity, le nom de la table doit être de la forme
<catalog>.<schema>.<table>. Pour le metastore Hive, il doit être au format<schema>.<table>.L’exécution d’une mise à jour d’actualisation complète ne nettoie pas les données de résultats précédemment calculées dans les récepteurs. Cela signifie que toutes les données traitées sont ajoutées au récepteur et que les données existantes ne sont pas modifiées.
Les attentes concernant le pipeline ne sont pas prises en charge.