Utiliser Spark pour travailler avec des fichiers de données
Après avoir configuré un notebook et l’attacher à un cluster, vous pouvez utiliser Spark pour lire et traiter des fichiers de données. Spark prend en charge un large éventail de formats, tels que CSV, JSON, Parquet, ORC, Avro et Delta, et Databricks fournit des connecteurs intégrés pour accéder aux fichiers stockés dans l’espace de travail, dans Azure Data Lake ou Stockage Blob, ou dans d’autres systèmes externes.
Le flux de travail suit généralement trois étapes :
Lisez un fichier dans un DataFrame Spark à l’aide de spark.read avec le format et le chemin appropriés. Lors de la lecture de formats de texte brut comme CSV ou JSON, Spark peut déduire le schéma (noms de colonnes et types de données), mais cela est parfois lent ou non fiable. Une meilleure pratique en production consiste à définir le schéma explicitement afin que les données soient chargées de manière cohérente et efficace.
Explorez et transformez le DataFrame à l’aide d’opérations SQL ou DataFrame (par exemple, filtrage de lignes, sélection de colonnes, agrégation de valeurs).
Réécrire les résultats dans le stockage dans un format choisi.
L’utilisation de fichiers dans Spark est conçue pour être cohérente entre les jeux de données petits et volumineux. Le même code utilisé pour tester un petit fichier CSV fonctionnera également sur des jeux de données beaucoup plus volumineux, car Spark distribue le travail sur le cluster. Cela facilite le scale-up de l’exploration rapide vers le traitement des données plus complexe.
Chargement des données dans un dataframe
Explorons un exemple hypothétique afin de voir comment utiliser un dataframe pour travailler avec des données. Supposons que vous disposiez des données suivantes dans un fichier texte délimité par des virgules appelé products.csv dans le dossier data de votre stockage Databricks File System (DBFS) :
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Dans un notebook Spark, vous pouvez utiliser le code PySpark suivant pour charger les données dans un dataframe et afficher les 10 premières lignes :
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
La ligne %pyspark au début est appelée magic et indique à Spark que le langage utilisé dans cette cellule est PySpark. Voici le code Scala équivalent pour l’exemple de données des produits :
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
La commande magic %spark est utilisée pour spécifier Scala.
Conseil
Vous pouvez également sélectionner le langage que vous souhaitez utiliser pour chaque cellule dans l’interface Notebook.
Les deux exemples présentés précédemment produisent une sortie comme suit :
| Identifiant produit | Nom de produit | Catégorie | ListPrice |
|---|---|---|---|
| 771 | Mountain-100 Silver, 38 | VTT | 3399.9900 |
| 772 | Mountain-100 Silver, 42 | VTT | 3399.9900 |
| 773 | Mountain-100 Silver, 44 | VTT | 3399.9900 |
| ... | ... | ... | ... |
Spécification d’un schéma de dataframe
Dans l’exemple précédent, la première ligne du fichier CSV contenait les noms de colonne, et Spark pouvait déduire le type de données de chaque colonne en se basant sur les données qu’elle contenait. Vous pouvez également spécifier un schéma explicite pour les données, ce qui est utile lorsque les noms de colonne ne sont pas inclus dans le fichier de données, comme cet exemple CSV :
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
L’exemple PySpark suivant montre comment spécifier un schéma pour que le dataframe soit chargé à partir d’un fichier appelé product-data.csv dans ce format :
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Les résultats seraient une fois de plus similaires à :
| Identifiant produit | Nom de produit | Catégorie | ListPrice |
|---|---|---|---|
| 771 | Mountain-100 Silver, 38 | VTT | 3399.9900 |
| 772 | Mountain-100 Silver, 42 | VTT | 3399.9900 |
| 773 | Mountain-100 Silver, 44 | VTT | 3399.9900 |
| ... | ... | ... | ... |
Filtrage et regroupement des dataframes
Vous pouvez utiliser les méthodes de la classe Dataframe pour filtrer, trier, regrouper et manipuler les données qu’elle contient. Par exemple, l’exemple de code suivant utilise la select méthode pour récupérer les colonnes ProductName et ListPrice à partir du dataframe df contenant des données de produit dans l’exemple précédent :
pricelist_df = df.select("ProductID", "ListPrice")
Les résultats de cet exemple de code devraient ressembler à ceci :
| Identifiant produit | ListPrice |
|---|---|
| 771 | 3399.9900 |
| 772 | 3399.9900 |
| 773 | 3399.9900 |
| ... | ... |
En commun avec la plupart des méthodes de manipulation de données, select retourne un nouvel objet de trame de données.
Conseil
La sélection d’une partie des colonnes d’un dataframe est une opération courante, qui peut également être réalisée à l’aide de la syntaxe plus courte suivante :
pricelist_df = df["ProductID", "ListPrice"]
Vous pouvez « chaîner » les méthodes ensemble pour effectuer une série de manipulations qui entraînent un dataframe transformé. Par exemple, cet exemple de code enchaîne les méthodes select et where pour créer un nouveau dataframe contenant les colonnes ProductName et ListPrice pour les produits dans les catégories Mountain Bikes ou Road Bikes :
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Les résultats de cet exemple de code devraient ressembler à ceci :
| Nom de produit | ListPrice |
|---|---|
| Mountain-100 Silver, 38 | 3399.9900 |
| Road-750 Noir, 52 | 539.9900 |
| ... | ... |
Pour regrouper et agréger des données, vous pouvez utiliser la méthode et les groupby fonctions d’agrégation. Par exemple, le code PySpark suivant compte le nombre de produits de chaque catégorie :
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Les résultats de cet exemple de code devraient ressembler à ceci :
| Catégorie | nombre |
|---|---|
| Headsets | 3 |
| Roues | 14 |
| VTT | 32 |
| ... | ... |
Remarque
Les DataFrames Spark sont déclaratifs et immuables. Chaque transformation (par exempleselect, ou filtergroupBy) crée un DataFrame qui représente ce que vous souhaitez, et non la façon dont elle s’exécute. Cela rend le code réutilisable, optimisé et exempt d’effets secondaires. Mais aucune de ces transformations ne s’exécute réellement jusqu’à ce que vous déclenchez une action (par exemple, display, collect, write), à quel point Spark exécute le plan optimisé complet.
Utilisation d’expressions SQL dans Spark
L’API Dataframe fait partie d’une bibliothèque Spark appelée Spark SQL, qui permet aux analystes Données d’utiliser des expressions SQL pour interroger et manipuler des données.
Création d’objets de base de données dans le catalogue Spark
Le catalogue Spark est un metastore pour les objets de données relationnelles tels que les vues et les tables. Le runtime Spark peut utiliser le catalogue pour intégrer de façon fluide le code écrit dans n’importe quel langage pris en charge par Spark avec des expressions SQL qui peuvent être plus naturelles pour certains analystes Données ou développeurs.
L’une des méthodes les plus simples pour rendre les données d’un dataframe disponibles pour pouvoir les interroger dans le catalogue Spark consiste à créer une vue temporaire, comme illustré dans l’exemple de code suivant :
df.createOrReplaceTempView("products")
Une vue est temporaire, ce qui signifie qu’elle est automatiquement supprimée à la fin de la session active. Vous pouvez également créer des tables persistantes dans le catalogue pour définir une base de données pouvant être interrogée à l’aide de Spark SQL.
Remarque
Nous n’allons pas explorer les tables de catalogue Spark en profondeur dans ce module, mais cela vaut la peine de prendre le temps de mettre en évidence quelques points clés :
- Vous pouvez créer une table vide à l’aide de la méthode
spark.catalog.createTable. Les tables sont des structures de métadonnées qui stockent leurs données sous-jacentes dans l’emplacement de stockage associé au catalogue. La suppression d’une table supprime également ses données sous-jacentes. - Vous pouvez enregistrer un dataframe en tant que table en utilisant sa méthode
saveAsTable. - Vous pouvez créer une table externe en utilisant la méthode
spark.catalog.createExternalTable. Les tables externes définissent des métadonnées dans le catalogue, mais obtiennent leurs données sous-jacentes d’un emplacement de stockage externe, généralement un dossier dans un lac de données. La suppression d’une table externe ne supprime pas les données sous-jacentes.
Utilisation de l’API Spark SQL pour interroger des données
Vous pouvez utiliser l’API Spark SQL dans le code écrit dans n’importe quel langage pour interroger les données du catalogue. Par exemple, le code PySpark suivant utilise une requête SQL pour retourner les données de la vue produits en tant que dataframe.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Les résultats de l’exemple de code ressembleraient au tableau suivant :
| Nom de produit | ListPrice |
|---|---|
| Mountain-100 Silver, 38 | 3399.9900 |
| Road-750 Noir, 52 | 539.9900 |
| ... | ... |
Utilisation du code SQL
L’exemple précédent a montré comment utiliser l’API Spark SQL pour incorporer des expressions SQL dans le code Spark. Dans un notebook, vous pouvez également utiliser la commande magic %sql pour exécuter le code SQL qui interroge les objets du catalogue, comme suit :
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
L’exemple de code SQL retourne un jeu de résultats qui s’affiche automatiquement dans le notebook sous forme de tableau, comme celui ci-dessous :
| Catégorie | NombreDeProduit |
|---|---|
| Bib Shorts | 3 |
| Racks à vélo | 1 |
| Bike Stands | 1 |
| ... | ... |