Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cet article explique comment effectuer une tâche de classification spécifique avec deux méthodes. Une méthode utilise une méthode simple pysparket une méthode utilise la synapseml bibliothèque. Bien que les méthodes produisent les mêmes performances, elles mettent en évidence la simplicité de synapseml par rapport à pyspark.
La tâche décrite dans cet article prédit si un avis client spécifique du livre vendu sur Amazon est bon (évaluation > 3) ou mauvais, en fonction du texte de révision. Pour créer la tâche, vous entraînez les apprenants LogisticRegression avec différents hyperparamètres, puis choisissez le meilleur modèle.
Conditions préalables
Fixez votre notebook à un lakehouse. Sur le côté gauche, vous pouvez sélectionner Ajouter pour ajouter une lakehouse existante, ou vous pouvez créer une nouvelle lakehouse.
Configuration
Importez les bibliothèques Python nécessaires et obtenez une session Spark :
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Lire les données
Téléchargez et lisez les données :
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Extraire des fonctionnalités et traiter des données
Les données réelles sont plus complexes, par rapport au jeu de données que nous avons téléchargé précédemment. Un jeu de données a souvent des caractéristiques de plusieurs types , par exemple, du texte, du numérique et des catégories. Pour montrer les difficultés d’utilisation de ces jeux de données, ajoutez deux fonctionnalités numériques au jeu de données : le nombre de mots de la révision et la longueur moyenne du mot :
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Classifier à l’aide de pyspark
Pour choisir le classifieur LogisticRegression le mieux adapté à l’aide de la pyspark bibliothèque, vous devez effectuer explicitement les étapes suivantes :
- Traiter les fonctionnalités
- Tokeniser la colonne de texte
- Transformer la colonne tokenisée en vecteur en utilisant le hachage
- Fusionner les caractéristiques numériques avec le vecteur
- Pour traiter la colonne d’étiquette, convertissez cette colonne en type approprié
- Entraîner plusieurs algorithmes LogisticRegression sur le
trainjeu de données, avec différents hyperparamètres - Calculez la zone sous la courbe ROC pour chacun des modèles entraînés, puis sélectionnez le modèle avec la métrique la plus élevée calculée sur le
testjeu de données. - Évaluer le meilleur modèle sur l’ensemble
validation
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Classifier à l’aide de SynapseML
L’option synapseml implique des étapes plus simples :
L’estimateur
TrainClassifiertransforme les données en caractéristiques en interne, à condition que les colonnes sélectionnées dans le jeu de donnéestrain,test,validationreprésentent les caractéristiques.L’estimateur
FindBestModeltrouve le meilleur modèle à partir d’un pool de modèles entraînés. Pour ce faire, il recherche le modèle qui fonctionne le mieux sur letestjeu de données en fonction de la métrique spécifiéeLe
ComputeModelStatisticstransformateur calcule les différentes métriques sur un jeu de données noté (dans notre cas, levalidationjeu de données) en même temps
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)