Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cet article propose des exemples simples pour illustrer l’utilisation de PySpark. Il part du principe que vous comprenez les concepts fondamentaux d'Apache Spark et que vous exécutez des commandes dans un notebook Azure Databricks connecté à un ordinateur. Vous créez des DataFrames à l’aide d’exemples de données, effectuez des transformations de base, notamment des opérations de ligne et de colonne sur ces données, combinez plusieurs DataFrames et agréger ces données, visualisez ces données, puis enregistrez-les dans une table ou un fichier.
Charger des données
Certains exemples de cet article utilisent des exemples de données fournis par Databricks pour illustrer l’utilisation de DataFrames dans le but charger, transformer et enregistrer des données. Si vous souhaitez utiliser vos propres données qui ne sont pas encore dans Databricks, vous pouvez commencer par les charger, puis créer un DataFrame à partir de celles-ci. Consultez Créer ou modifier une table à l'aide du téléversement de fichiers et Téléverser des fichiers dans un volume de catalogue Unity.
À propos des exemples de données Databricks
Databricks fournit des exemples de données dans le samples catalogue et dans le /databricks-datasets répertoire.
- Pour accéder aux exemples de données du
samplescatalogue, utilisez le formatsamples.<schema-name>.<table-name>. Cet article utilise des tables dans lesamples.tpchschéma, qui contient des données d’une entreprise fictive. Lecustomertableau contient des informations sur les clients etorderscontient des informations sur les commandes passées par ces clients. - Utilisez
dbutils.fs.lspour explorer les données dans/databricks-datasets. Utilisez Spark SQL ou DataFrames pour interroger des données dans cet emplacement à l’aide des chemins d’accès aux fichiers. Pour en savoir plus sur les exemples de données fournis par Databricks, consultez Exemples de jeux de données.
Importer des types de données
De nombreuses opérations PySpark nécessitent que vous utilisiez des fonctions SQL ou que vous interagissiez avec des types Spark natifs. Importez directement les fonctions et les types dont vous avez besoin, ou pour éviter de remplacer les fonctions intégrées Python, importez ces modules à l’aide d’un alias commun.
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
# import modules using an alias
import pyspark.sql.types as T
import pyspark.sql.functions as F
Pour obtenir la liste complète des types de données, consultez PySpark Data Types.
Pour obtenir la liste complète des fonctions SQL PySpark, consultez Fonctions PySpark.
Créer un DataFrame
Il existe plusieurs façons de créer un DataFrame. En règle générale, vous définissez un DataFrame sur une source de données telle qu’une table ou une collection de fichiers. Ensuite, comme décrit dans la section consacrée aux concepts fondamentaux d’Apache Spark, utilisez une action telle que display pour déclencher les transformations à exécuter. La méthode display génère des DataFrames.
Créer un DataFrame avec des valeurs spécifiées
Pour créer un DataFrame avec des valeurs spécifiées, utilisez la createDataFrame méthode, où les lignes sont exprimées sous forme de liste de tuples :
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
Notez dans la sortie que les types de données des colonnes de df_children sont déduits automatiquement. Vous pouvez également spécifier les types en ajoutant un schéma. Les schémas sont définis à l’aide du type StructType, lequel est constitué de champs StructFields qui spécifient le nom, le type de données et un indicateur booléen indiquant si elles contiennent ou non une valeur Null.. Vous devez importer les types de données à partir de pyspark.sql.types.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Créer un DataFrame à partir d’une table dans le catalogue Unity
Pour créer un DataFrame à partir d’une table dans le catalogue Unity, utilisez la table méthode identifiant la table au format <catalog-name>.<schema-name>.<table-name>. Cliquez sur Catalogue dans la barre de navigation de gauche pour utiliser l’Explorateur de catalogues pour accéder à votre table. Cliquez dessus, puis sélectionnez Copier le chemin de la table pour insérer le chemin d’accès au tableau dans le bloc-notes.
L’exemple suivant charge la table samples.tpch.customer, mais vous pouvez également fournir le chemin d’accès à votre propre table.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
Créer un DataFrame à partir d’un fichier chargé
Pour créer un DataFrame à partir d’un fichier que vous avez chargé dans des volumes de catalogue Unity, utilisez la read propriété. Cette méthode retourne un DataFrameReader, que vous pouvez ensuite utiliser pour lire le format approprié. Cliquez sur l’option de catalogue sur la petite barre latérale à gauche et utilisez le navigateur du catalogue pour localiser votre fichier. Sélectionnez-le, puis cliquez sur Copier le chemin du fichier de volume.
L’exemple ci-dessous lit à partir d’un fichier *.csv, mais DataFrameReader prend en charge le chargement de fichiers dans de nombreux autres formats. Consultez les méthodes DataFrameReader.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Pour plus d’informations sur les volumes de catalogue Unity, consultez Qu’est-ce que les volumes de catalogue Unity ?.
Créer un DataFrame à partir d'une réponse JSON.
Pour créer un DataFrame à partir d’une charge utile de réponse JSON retournée par une API REST, utilisez le package Python requests pour interroger et analyser la réponse. Vous devez importer le package pour l’utiliser. Cet exemple utilise des données de la base de données sur l’application de médicaments de l’Administration des aliments et des drogues des États-Unis.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Pour plus d’informations sur l’utilisation de JSON et d’autres données semi-structurées sur Databricks, consultez Modéliser des données semi-structurées.
Sélectionner un champ ou un objet JSON
Pour sélectionner un champ ou un objet spécifique à partir du JSON converti, utilisez la [] notation. Par exemple, pour sélectionner le products champ qui lui-même est un tableau de produits :
display(df_drugs.select(df_drugs["products"]))
Vous pouvez également chaîner des appels de méthode pour parcourir plusieurs champs. Par exemple, pour générer le nom de marque du premier produit dans une application de médicaments :
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
Créer un DataFrame à partir d’un fichier
Pour illustrer la création d’un DataFrame à partir d’un fichier, cet exemple charge les données CSV dans le répertoire /databricks-datasets.
Pour accéder aux exemples de jeux de données, vous pouvez utiliser les commandes du système de fichiers Databricks Utilties . L’exemple suivant utilise dbutils pour répertorier les jeux de données disponibles dans /databricks-datasets:
display(dbutils.fs.ls('/databricks-datasets'))
Vous pouvez également utiliser %fs pour accéder aux commandes de système de fichiers CLI Databricks, comme illustré dans l’exemple suivant :
%fs ls '/databricks-datasets'
Pour créer un DataFrame à partir d’un fichier ou d’un répertoire de fichiers, spécifiez le chemin d’accès dans la méthode load :
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
Transformer des données avec des DataFrames
Les DataFrames simplifient la transformation des données à l’aide de méthodes intégrées pour trier, filtrer et agréger des données. De nombreuses transformations ne sont pas spécifiées en tant que méthodes dans les DataFrames, mais sont plutôt fournies dans le package pyspark.sql.functions. Consultez Databricks PySpark SQL Functions.
Opérations de colonne
Spark fournit de nombreuses opérations de colonne de base :
- Sélectionner des colonnes
- Créer des colonnes
- Renommer des colonnes
- Convertir des types de colonnes
- Supprimer des colonnes
Tip
Pour générer toutes les colonnes d’un DataFrame, utilisez columns, par exemple df_customer.columns.
Sélectionner des colonnes
Vous pouvez sélectionner des colonnes spécifiques à l’aide de select et col. La fonction col se trouve dans le sous-module pyspark.sql.functions.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
Vous pouvez également faire référence à une colonne en utilisant expr, dans laquelle une expression est définie sous forme de chaîne :
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
Vous pouvez également utiliser selectExpr, qui accepte les expressions SQL :
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
Pour sélectionner des colonnes à l’aide d’un littéral de chaîne, procédez comme suit :
df_customer.select(
"c_custkey",
"c_acctbal"
)
Pour sélectionner explicitement une colonne à partir d’un DataFrame spécifique, vous pouvez utiliser l’opérateur [] ou l’opérateur . . (L’opérateur . ne peut pas être utilisé pour sélectionner des colonnes commençant par un entier, ou celles qui contiennent un espace ou un caractère spécial.) Cela peut être particulièrement utile lorsque vous joignez des DataFrames où certaines colonnes ont le même nom.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
Créer des colonnes
Pour créer une colonne, utilisez la withColumn méthode. L’exemple suivant crée une colonne qui contient une valeur booléenne en fonction du dépassement du solde c_acctbal1000du compte client :
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
Renommer des colonnes
Pour renommer une colonne, utilisez la withColumnRenamed méthode, qui accepte les noms de colonnes existants et nouveaux :
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
La alias méthode est particulièrement utile lorsque vous souhaitez renommer vos colonnes dans le cadre d’agrégations :
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
Convertir des types de colonnes
Dans certains cas, vous pouvez modifier le type de données pour une ou plusieurs colonnes de votre DataFrame. Pour ce faire, utilisez la cast méthode pour convertir entre les types de données de colonne. L’exemple suivant montre comment convertir une colonne d’un entier en type de chaîne, à l’aide de la col méthode pour référencer une colonne :
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
Supprimer des colonnes
Pour supprimer des colonnes, vous pouvez omettre des colonnes pendant une sélection select(*) except ou utiliser la méthode drop :
df_customer_flag_renamed.drop("balance_flag_renamed")
Vous pouvez également supprimer plusieurs colonnes à la fois :
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
Opérations sur les lignes
Spark fournit de nombreuses opérations de ligne de base :
- Lignes de filtre
- Supprimer les lignes dupliquées
- Gérer les valeurs Null
- Ajouter des lignes
- Trier les lignes
- Lignes de filtre
Filtrer les lignes
Pour filtrer les lignes, utilisez la méthode filter ou where dans un DataFrame pour retourner uniquement certaines lignes. Pour identifier une colonne à filtrer, utilisez la méthode col ou une expression qui évalue une colonne.
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
Pour filtrer selon plusieurs conditions, utilisez des opérateurs logiques. Par exemple, & et | vous permettent respectivement d'utiliser les conditions AND et OR. L’exemple suivant filtre les lignes dont la valeur c_nationkey est égale à 20 et c_acctbal est supérieure à 1000.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
Supprimer les lignes dupliquées
Pour dédupliquer des lignes, utilisez distinct, qui retourne uniquement les lignes uniques.
df_unique = df_customer.distinct()
Gérer les valeurs nulles
Pour gérer les valeurs Null, supprimez les lignes qui contiennent des valeurs Null à l’aide de la na.drop méthode. Cette méthode vous permet de spécifier si vous souhaitez supprimer des lignes contenant des any valeurs Null ou all des valeurs Null.
Pour supprimer les valeurs Null, utilisez l’un des exemples suivants.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
Si, au lieu de cela, vous souhaitez uniquement filtrer les lignes qui contiennent toutes les valeurs Null utilisent les éléments suivants :
df_customer_no_nulls = df_customer.na.drop("all")
Vous pouvez l’appliquer à un sous-ensemble de colonnes en spécifiant ceci, comme indiqué ci-dessous :
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
Pour remplir les valeurs manquantes, utilisez la fill méthode. Vous pouvez choisir de l’appliquer à toutes les colonnes ou à un sous-ensemble de colonnes. Dans l’exemple ci-dessous, les soldes de compte dont la valeur est Null c_acctbal sont remplacés par 0.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
Pour remplacer des chaînes par d’autres valeurs, utilisez la replace méthode. Dans l’exemple ci-dessous, toutes les chaînes d’adresses vides sont remplacées par le mot UNKNOWN :
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
Ajouter des lignes
Pour ajouter des lignes, vous devez utiliser la méthode union afin de créer un DataFrame. Dans l’exemple suivant, le DataFrame df_that_one_customer créé précédemment et df_filtered_customer sont combinés, ce qui retourne un DataFrame avec trois clients :
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
Note
Vous pouvez également combiner des DataFrames en les écrivant dans une table, puis en ajoutant de nouvelles lignes. Pour les charges de travail de production, le traitement incrémentiel des sources de données vers une table cible peut réduire considérablement la latence et les coûts de calcul à mesure que les données augmentent en taille. Consultez les connecteurs Standard dans Lakeflow Connect.
Trier les lignes
Important
Le tri peut s'avérer onéreux à grande échelle et, si vous stockez des données triées et rechargez les données avec Spark, l’ordre n’est pas garanti. Veillez à utiliser le tri de manière réfléchie.
Pour trier des lignes selon une ou plusieurs colonnes, utilisez la méthode sort ou orderBy. Par défaut, ces méthodes effectuent le tri dans l’ordre croissant :
df_customer.orderBy(col("c_acctbal"))
Pour filtrer dans l’ordre décroissant, utilisez desc :
df_customer.sort(col("c_custkey").desc())
L’exemple suivant montre comment trier sur deux colonnes :
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
Pour limiter le nombre de lignes à retourner une fois le DataFrame trié, utilisez la limit méthode. L'exemple suivant permet d'afficher uniquement les 10 premiers résultats :
display(df_sorted.limit(10))
Joindre des DataFrames
Pour joindre au moins deux DataFrames, utilisez la join méthode. Vous pouvez spécifier la façon dont vous souhaitez que les DataFrames soient joints aux how paramètres (type de jointure) et on (sur quelles colonnes baser la jointure). Les types de jointure courants sont les suivants :
-
inner: il s’agit de la valeur par défaut du type de jointure, qui retourne un DataFrame qui conserve uniquement les lignes où il existe une correspondance pour leonparamètre dans les DataFrames. -
left: ce type de jointure conserve toutes les lignes du premier DataFrame spécifié et uniquement les lignes du second DataFrame spécifié qui ont une correspondance avec le premier. -
outer: une jointure externe conserve toutes les lignes des deux DataFrames, quelle que soit la correspondance.
Pour plus d’informations sur les jointures, consultez Utiliser des jointures sur Azure Databricks. Pour obtenir la liste des jointures prises en charge dans PySpark, consultez jointures DataFrame.
L’exemple suivant retourne un DataFrame unique où chaque ligne du orders DataFrame est jointe à la ligne correspondante à partir du customers DataFrame. Une jointure interne est utilisée, car l’attente est que chaque commande correspond exactement à un client.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
Pour se joindre à plusieurs conditions, utilisez des opérateurs booléens tels que & et | pour spécifier AND et OR, respectivement. L’exemple suivant permet d'ajouter une condition supplémentaire, en filtrant uniquement les lignes qui présentent un o_totalprice supérieur à 500,000 :
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
Données agrégées
Pour agréger des données dans un DataFrame, comme dans SQL GROUP BY , utilisez la groupBy méthode pour spécifier des colonnes à regrouper et la agg méthode pour spécifier des agrégations. Importez des agrégations courantes, notamment avg, sum, max et min à partir de pyspark.sql.functions. L’exemple suivant montre le solde client moyen par segment de marché :
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
Certaines agrégations correspondent à des actions, ce qui signifie qu’elles déclenchent des calculs. Dans ce cas, vous n’avez pas besoin d’utiliser d’autres actions pour afficher les résultats.
Pour compter les lignes d’un DataFrame, utilisez la méthode count :
df_customer.count()
Appels de chaînage
Les méthodes qui transforment les DataFrames retournent des DataFrames, et Spark n’applique les transformations qu'une fois les actions appelées. Cette évaluation différée signifie que vous pouvez chaîner plusieurs méthodes pour des raisons pratiques et de lisibilité. L’exemple suivant montre comment chaîner le filtrage, l’agrégation et l’ordre :
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
Visualiser votre DataFrame
Pour visualiser un DataFrame dans un bloc-notes, cliquez sur le signe en regard de la + table en haut à gauche du DataFrame, puis sélectionnez Visualisation pour ajouter un ou plusieurs graphiques en fonction de votre DataFrame. Pour plus d’informations sur les visualisations, consultez Visualisations dans les notebooks Databricks et l’éditeur SQL.
display(df_order)
Pour effectuer des visualisations supplémentaires, Databricks recommande d’utiliser l’API Pandas pour Spark. La fonction .pandas_api() vous permet de convertir vers l'API Pandas correspondante pour un DataFrame Spark. Pour plus d’informations, consultez API Pandas sur Spark.
Enregistrer vos données
Une fois que vous avez transformé vos données, vous pouvez les enregistrer à l’aide des méthodes DataFrameWriter. Vous trouverez la liste complète de ces méthodes dans DataFrameWriter. Les sections suivantes montrent comment enregistrer votre DataFrame en tant que table et sous forme de collection de fichiers de données.
Enregistrer votre DataFrame en tant que table
Pour enregistrer votre DataFrame sous forme de tableau dans le catalogue Unity, utilisez la write.saveAsTable méthode et spécifiez le chemin d’accès au format <catalog-name>.<schema-name>.<table-name>.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
Écrire votre DataFrame au format CSV
Pour écrire votre DataFrame au format *.csv, utilisez la méthode write.csv en veillant à spécifier le format et les options. Par défaut, si des données existent à l'emplacement spécifié, l'opération d'écriture échoue. Pour effectuer une autre action, vous pouvez spécifier l’un des modes suivants :
-
overwriteremplace toutes les données existantes dans le chemin cible par le contenu du DataFrame. -
appendajoute le contenu du DataFrame aux données du chemin cible. -
ignorerejette de manière silencieuse l’écriture si des données existent dans le chemin cible.
L'exemple suivant montre comment remplacer des données avec le contenu d'un DataFrame dans des fichiers CSV :
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
Étapes suivantes
Pour profiter d’autres fonctionnalités Spark sur Databricks, consultez les articles suivants :