Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Vous pouvez charger des données à partir de n’importe quelle source de données prise en charge par Apache Spark sur Azure Databricks à l’aide de pipelines. Vous pouvez définir des jeux de données (tables et vues) dans des pipelines déclaratifs Spark Lakeflow sur n’importe quelle requête qui retourne un DataFrame Spark, y compris les DataFrames de streaming et Pandas pour les DataFrames Spark. Pour les tâches d’ingestion de données, Databricks recommande d’utiliser des tables de streaming pour la plupart des cas d’usage. Les tables de streaming sont utiles pour ingérer des données à partir du stockage d’objets cloud à l’aide d’Auto Loader ou de bus de messages comme Kafka.
Note
- Toutes les sources de données ne prennent pas en charge SQL pour l’ingestion. Vous pouvez combiner des sources SQL et Python dans des pipelines pour utiliser Python où il est nécessaire, et SQL pour d’autres opérations dans le même pipeline.
- Pour plus d’informations sur l’utilisation des bibliothèques non empaquetées dans les pipelines déclaratifs Spark Lakeflow par défaut, consultez Gérer les dépendances Python pour les pipelines.
- Pour obtenir des informations générales sur l’ingestion dans Azure Databricks, consultez les connecteurs Standard dans Lakeflow Connect.
Ci-dessous, des exemples de modèles courants.
Charger depuis une table existante
Chargez des données à partir de n’importe quelle table existante dans Azure Databricks. Vous pouvez transformer les données à l’aide d’une requête ou charger la table pour un traitement ultérieur dans votre pipeline.
L’exemple suivant lit les données d’une table existante :
Python
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Charger des fichiers à partir du stockage d’objets cloud
Databricks recommande d’utiliser le chargeur automatique dans les pipelines pour la plupart des tâches d’ingestion de données à partir du stockage d’objets cloud ou à partir de fichiers dans un volume de catalogue Unity. Le chargeur automatique et les pipelines sont conçus pour charger de manière incrémentielle et idempotente des données toujours croissantes à mesure qu’elles arrivent dans le stockage cloud.
Voir Qu’est-ce que le chargeur automatique ? et charger des données à partir du stockage d’objets.
L’exemple suivant lit les données du stockage cloud à l’aide du chargeur automatique :
Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
Les exemples suivants utilisent le chargeur automatique pour créer des jeux de données à partir de fichiers CSV dans un volume de catalogue Unity :
Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
Note
- Si vous utilisez Auto Loader avec les notifications de fichiers, et si vous exécutez une actualisation complète pour votre pipeline ou votre table de streaming, vous devez nettoyer manuellement vos ressources. Vous pouvez utiliser CloudFilesResourceManager dans un notebook pour effectuer le nettoyage.
- Pour charger des fichiers avec le chargeur automatique dans un pipeline compatible avec le catalogue Unity, vous devez utiliser des emplacements externes. Pour en savoir plus sur l’utilisation du catalogue Unity avec des pipelines, consultez Utiliser le catalogue Unity avec des pipelines.
Charger des données à partir d’un bus de messages
Vous pouvez configurer des pipelines pour ingérer des données à partir de bus de messages. Databricks recommande d’utiliser des tables de diffusion en continu avec une exécution continue et une mise à l’échelle automatique améliorée pour fournir l’ingestion la plus efficace pour le chargement à faible latence à partir de bus de messages. Consultez Optimiser l’utilisation du cluster des pipelines déclaratifs Spark Lakeflow avec mise à l’échelle automatique.
Par exemple, le code suivant configure une table de diffusion en continu pour ingérer des données à partir de Kafka à l’aide de la fonction read_kafka :
Python
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Pour ingérer depuis d’autres sources de bus de messages, consultez :
- La read_kinesis
- Pub/Sub rubrique : read_pubsub
- Pulsar : read_pulsar
Charger des données à partir d’Azure Event Hubs
Azure Event Hubs est un service de streaming de données qui fournit une interface compatible Apache Kafka. Vous pouvez utiliser le connecteur Kafka pour le streaming structuré, inclus dans le runtime des pipelines déclaratifs Spark Lakeflow, afin de charger des messages à partir d’Azure Event Hubs. Pour en savoir plus sur le chargement et le traitement des messages à partir d’Azure Event Hubs, consultez Utiliser Azure Event Hubs comme source de données de pipeline.
Charger des données à partir de systèmes externes
Lakeflow Spark Declarative Pipelines prend en charge le chargement de données à partir de n’importe quelle source de données prise en charge par Azure Databricks. Consultez Se connecter aux sources de données et aux services externes. Vous pouvez également charger des données externes à l’aide de Lakehouse Federation pour les sources de données prises en charge. Étant donné que Lakehouse Federation nécessite Databricks Runtime 13.3 LTS ou une version ultérieure, pour utiliser Lakehouse Federation, votre pipeline doit être configuré pour utiliser le canal en préversion.
Certaines sources de données n’ont pas de prise en charge équivalente en SQL. Si vous ne pouvez pas utiliser Lakehouse Federation avec l’une de ces sources de données, vous pouvez utiliser Python pour ingérer des données à partir de la source. Vous pouvez ajouter des fichiers sources Python et SQL au même pipeline. L’exemple suivant déclare une vue matérialisée pour accéder à l’état actuel des données dans une table PostgreSQL distante :
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Charger des jeux de données petits ou statiques à partir du stockage d’objets cloud
Vous pouvez charger des jeux de données petits ou statiques à l’aide de la syntaxe de chargement Apache Spark. Lakeflow Spark Declarative Pipelines prend en charge tous les formats de fichier pris en charge par Apache Spark sur Azure Databricks. Pour obtenir une liste complète, consultez les options de format de données.
Les exemples suivants illustrent le chargement de JSON pour créer une table :
Python
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
Note
La fonction SQL read_files est commune à tous les environnements SQL sur Azure Databricks. Il s’agit du modèle recommandé pour l’accès direct aux fichiers à l’aide de SQL dans les pipelines. Pour plus d’informations, consultez Options.
Charger des données à partir d’une source de données personnalisée Python
Les sources de données personnalisées Python vous permettent de charger des données dans des formats personnalisés. Vous pouvez écrire du code pour lire et écrire dans une source de données externe spécifique, ou tirer parti du code Python existant dans vos systèmes existants pour lire des données à partir de vos propres systèmes internes. Pour plus d’informations sur le développement de sources de données Python, consultez les sources de données personnalisées PySpark.
Pour utiliser une source de données personnalisée Python pour charger des données dans un pipeline, inscrivez-la avec un nom de format, par exemple my_custom_datasource, puis lisez-la :
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
Configurer une table de streaming pour ignorer les modifications dans une table de streaming source
Note
- L’indicateur
skipChangeCommitsne fonctionne qu'avecspark.readStreamen utilisant la fonctionoption(). Vous ne pouvez pas utiliser cet indicateur dans une fonctiondp.read_stream(). - Vous ne pouvez pas utiliser l’indicateur
skipChangeCommitslorsque la table de diffusion en continu source est définie comme cible d’une fonction create_auto_cdc_flow().
Par défaut, les tables de streaming nécessitent des sources en ajout uniquement. Lorsqu'une table de streaming utilise une autre table de streaming comme source et que la table de streaming source nécessite des mises à jour ou des suppressions, par exemple, le traitement du « droit à l'oubli » RGPD, l'indicateur skipChangeCommits peut être défini lors de la lecture de la table de streaming pour ignorer ces modifications. Pour plus d’informations sur cet indicateur, consultez Ignorer les mises à jour et les suppressions.
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Accéder de manière sécurisée aux identifiants de stockage avec des secrets dans un pipeline
Vous pouvez utiliser des secrets Azure Databricks pour stocker des informations d’identification telles que des clés d’accès ou des mots de passe. Pour configurer le secret dans votre pipeline, utilisez une propriété Spark dans la configuration de cluster des paramètres de pipeline. Consultez Configurer le calcul classique pour les pipelines.
L’exemple suivant utilise un secret pour stocker une clé d’accès requise pour lire les données d’entrée à partir d’un compte de stockage Azure Data Lake Storage (ADLS) à l’aide du chargeur automatique. Vous pouvez utiliser cette même méthode pour configurer tout secret requis par votre pipeline, par exemple, des clés AWS pour accéder à S3 ou le mot de passe pour un metastore Apache Hive.
Pour en savoir plus sur l’utilisation d’Azure Data Lake Storage, consultez Se connecter à Azure Data Lake Storage et au Stockage Blob.
Note
Vous devez ajouter le préfixe spark.hadoop. à la clé de configuration spark_conf qui définit la valeur du secret.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Replace
-
<storage-account-name>par le nom du compte de stockage ADLS. -
<scope-name>par le nom de l'étendue du secret Azure Databricks. -
<secret-name>par le nom de la clé contenant la clé d'accès du compte de stockage Azure.
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Replace
-
<container-name>par le nom du conteneur de compte de stockage Azure qui stocke les données d’entrée. -
<storage-account-name>par le nom du compte de stockage ADLS. -
<path-to-input-dataset>par le chemin du jeu de données d’entrée.