Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Dans cet article, vous allez apprendre à utiliser Apache Spark MLlib pour créer une application Machine Learning qui effectue une analyse prédictive simple sur un jeu de données ouvert Azure. Spark fournit des bibliothèques de Machine Learning intégrées. Cet exemple utilise la classification par le biais de la régression logistique.
SparkML et MLlib sont des bibliothèques Spark de base qui fournissent de nombreux utilitaires utiles pour les tâches de Machine Learning, notamment les utilitaires appropriés pour :
- Classification
- Regression
- Clustering
- Modélisation des rubriques
- Décomposition de valeurs singulières (SVD) et analyse des composants principaux (PCA)
- Tests d’hypothèses et calcul des exemples de statistiques
Comprendre la classification et la régression logistique
La classification, une tâche de Machine Learning populaire, est le processus de tri des données d’entrée en catégories. Il s’agit du travail d’un algorithme de classification pour déterminer comment affecter des étiquettes aux données d’entrée que vous fournissez. Par exemple, vous pouvez considérer un algorithme De Machine Learning qui accepte les informations de stock comme entrée et divisez le stock en deux catégories : les actions que vous devez vendre et les actions que vous devez conserver.
La régression logistique est un algorithme que vous pouvez utiliser pour la classification. L’API de régression logistique de Spark est utile pour la classification binaire ou la classification des données d’entrée dans l’un des deux groupes. Pour plus d’informations sur la régression logistique, consultez Wikipédia.
En résumé, le processus de régression logistique produit une fonction logistique que vous pouvez utiliser pour prédire la probabilité qu’un vecteur d’entrée appartient à un groupe ou à l’autre.
Exemple d’analyse prédictive sur les données de taxi nyC
Dans cet exemple, vous utilisez Spark pour effectuer une analyse prédictive sur les données de pourboire des trajets en taxi de New York. Les données sont disponibles via Azure Open Datasets. Ce sous-ensemble du jeu de données contient des informations sur les courses de taxi jaunes, notamment des informations sur chaque voyage, l’heure de début et l’heure de fin, le coût et d’autres attributs intéressants.
Important
Il peut y avoir des frais supplémentaires pour extraire ces données à partir de son emplacement de stockage.
Dans les étapes suivantes, vous développez un modèle pour prédire si un voyage particulier inclut un pourboire ou non.
Créer un modèle Machine Learning Apache Spark
Créez un notebook à l’aide du noyau PySpark. Pour obtenir des instructions, consultez Créer un bloc-notes.
Importez les types requis pour cette application. Copiez et collez le code suivant dans une cellule vide, puis appuyez sur Maj+Entrée. Vous pouvez aussi exécuter la cellule à l’aide de l’icône de lecture bleue située à gauche du code.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluatorEn raison du noyau PySpark, vous n’avez pas besoin de créer de contextes explicitement. Le contexte Spark est automatiquement créé pour vous lorsque vous exécutez la première cellule de code.
Construire le DataFrame d’entrée
Comme les données brutes sont au format Parquet, vous pouvez utiliser le contexte Spark pour extraire le fichier en mémoire directement en tant que DataFrame. Bien que le code des étapes suivantes utilise les options par défaut, il est possible de forcer le mappage des types de données et d’autres attributs de schéma si nécessaire.
Exécutez les lignes suivantes pour créer un DataFrame Spark en collant le code dans une nouvelle cellule. Cette étape récupère les données via l’API Open Datasets. L’extraction de toutes ces données génère environ 1,5 milliard de lignes.
Selon la taille de votre pool Apache Spark serverless, les données brutes peuvent être trop volumineuses ou prendre trop de temps pour fonctionner. Vous pouvez filtrer ces données jusqu’à quelque chose de plus petit. L’exemple de code suivant utilise
start_dateetend_dateapplique un filtre qui retourne un mois de données unique.from azureml.opendatasets import NycTlcYellow from datetime import datetime from dateutil import parser end_date = parser.parse('2018-05-08 00:00:00') start_date = parser.parse('2018-05-01 00:00:00') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) filtered_df = spark.createDataFrame(nyc_tlc.to_pandas_dataframe())L’inconvénient du filtrage simple est que, du point de vue statistique, il peut introduire un biais dans les données. Une autre approche consiste à utiliser l’échantillonnage intégré à Spark.
Le code suivant réduit le jeu de données à environ 2 000 lignes, s’il est appliqué après le code précédent. Vous pouvez utiliser cette étape d’échantillonnage au lieu du filtre simple ou conjointement avec le filtre simple.
# To make development easier, faster, and less expensive, downsample for now sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)Il est désormais possible d’examiner les données pour voir ce qui a été lu. Il est normalement préférable d’examiner les données avec un sous-ensemble plutôt que le jeu complet, en fonction de la taille du jeu de données.
Le code suivant offre deux façons d’afficher les données. La première façon est de base. La deuxième façon offre une expérience de grille beaucoup plus riche, ainsi que la possibilité de visualiser les données graphiquement.
#sampled_taxi_df.show(5) display(sampled_taxi_df)En fonction de la taille du jeu de données généré et de votre besoin d’expérimenter ou d’exécuter le notebook plusieurs fois, vous pouvez mettre en cache le jeu de données localement dans l’espace de travail. Il existe trois façons d’effectuer une mise en cache explicite :
- Enregistrez le DataFrame localement en tant que fichier.
- Enregistrez le DataFrame en tant que table ou vue temporaire.
- Enregistrez le DataFrame en tant que table permanente.
Les deux premières approches sont incluses dans les exemples de code suivants.
La création d’une table ou d’une vue temporaire fournit différents chemins d’accès aux données, mais elle dure uniquement pendant la durée de la session d’instance Spark.
sampled_taxi_df.createOrReplaceTempView("nytaxi")
Préparer les données
Les données de sa forme brute ne conviennent souvent pas à passer directement à un modèle. Vous devez effectuer une série d’actions sur les données pour l’obtenir dans un état où le modèle peut l’utiliser.
Dans le code suivant, vous effectuez quatre classes d’opérations :
- Suppression de valeurs hors norme ou incorrectes par le biais du filtrage.
- Suppression des colonnes, qui ne sont pas nécessaires.
- Création de nouvelles colonnes dérivées des données brutes pour rendre le modèle plus efficace. Cette opération est parfois appelée caractérisation.
- Étiquetage. Étant donné que vous effectuez une classification binaire (sera-t-il un pourboire ou non sur un voyage donné), il est nécessaire de convertir le montant du pourboire en une valeur de 0 ou 1.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Vous effectuez ensuite un deuxième passage sur les données pour ajouter les fonctionnalités finales.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Créer un modèle de régression logistique
La tâche finale consiste à convertir les données étiquetées dans un format qui peut être analysé par régression logistique. L’entrée d’un algorithme de régression logistique doit être un ensemble de paires d’étiquettes/vecteurs de caractéristiques, où le vecteur de caractéristique est un vecteur de nombres qui représentent le point d’entrée.
Vous devez donc convertir les colonnes catégorielles en nombres. Plus précisément, vous devez convertir les colonnes trafficTimeBins et weekdayString en représentations entières. Il existe plusieurs approches pour effectuer la conversion. L’exemple suivant utilise l’approche OneHotEncoder , qui est courante.
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Cette action entraîne un nouveau DataFrame avec toutes les colonnes au format approprié pour entraîner un modèle.
Entraîner un modèle de régression logistique
La première tâche consiste à fractionner le jeu de données en un jeu d’entraînement et un jeu de test ou de validation. Le fractionnement ici est arbitraire. Expérimentez avec différents paramètres de fractionnement pour voir s’ils affectent le modèle.
# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Maintenant qu’il existe deux DataFrames, la tâche suivante consiste à créer la formule de modèle et à l’exécuter sur le DataFrame d’apprentissage. Vous pouvez ensuite valider par rapport au DataFrame de test. Expérimentez avec différentes versions de la formule de modèle pour voir l’impact de différentes combinaisons.
Note
Pour enregistrer le modèle, affectez le rôle Contributeur de données d’objet blob de stockage à l’étendue de la ressource serveur d’Azure SQL Database. Pour connaître les étapes détaillées, consultez Attribuer des rôles Azure à l’aide du portail Azure. Seuls les membres disposant de privilèges de propriétaire peuvent effectuer cette étape.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Saving the model is optional, but it's another form of inter-session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName
lrModel.save(logRegDirfilename)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
La sortie de cette cellule est la suivante :
Area under ROC = 0.9779470729751403
Créer une représentation visuelle de la prédiction
Vous pouvez maintenant construire une visualisation finale pour vous aider à raisonner sur les résultats de ce test. Une courbe ROC est une façon de passer en revue le résultat.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Arrêter l’instance Spark
Une fois l’application exécutée, arrêtez le bloc-notes pour libérer les ressources en fermant l’onglet. Vous pouvez également sélectionner Terminer la session dans le volet d’état en bas du bloc-notes.