Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
In dit artikel wordt beschreven hoe u een specifieke classificatietaak met twee methoden uitvoert. Eén methode maakt gebruik van gewone pysparkgegevens en één methode maakt gebruik van de synapseml bibliotheek. Hoewel de methoden dezelfde prestaties opleveren, benadrukken ze de eenvoud van synapseml in vergelijking met pyspark.
De taak die in dit artikel wordt beschreven, voorspelt of een specifieke klantbeoordeling van het boek dat op Amazon is verkocht, goed is (beoordeling > 3) of slecht, op basis van de beoordelingstekst. Als u de taak wilt bouwen, traint u LogisticRegression-cursisten met verschillende hyperparameters en kiest u vervolgens het beste model.
Vereiste voorwaarden
Koppel uw notitieblok aan een lakehouse. Aan de linkerkant kunt u Toevoegen selecteren om een bestaand lakehouse toe te voegen of u kunt een nieuw lakehouse maken.
Configuratie
Importeer de benodigde Python-bibliotheken en haal een Spark-sessie op:
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
De gegevens lezen
Download en lees de gegevens:
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Functies extraheren en gegevens verwerken
Echte gegevens hebben meer complexiteit, vergeleken met de gegevensset die we eerder hebben gedownload. Een gegevensset heeft vaak functies van meerdere typen, bijvoorbeeld tekst, numeriek en categorisch. Als u de problemen met het werken met deze gegevenssets wilt weergeven, voegt u twee numerieke functies toe aan de gegevensset: het aantal woorden van de beoordeling en de gemiddelde woordlengte:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Classificeren met behulp van pyspark
Als u de beste LogisticRegression-classificatie wilt kiezen met behulp van de pyspark bibliotheek, moet u deze stappen expliciet uitvoeren:
- De functies verwerken
- De tekstkolom tokeniseren
- Hash de getokeniseerde kolom in een vector.
- De numerieke kenmerken samenvoegen met de vector
- Om de labelkolom te verwerken, moet u die kolom naar het juiste type casten.
- Meerdere LogisticRegression-algoritmen trainen op de
traingegevensset, met verschillende hyperparameters - Bereken het gebied onder de ROC-curve voor elk van de getrainde modellen en selecteer het model met de hoogste metrische waarde zoals berekend op de
testgegevensset - Het beste model op de
validationset evalueren
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Classificeren met SynapseML
De synapseml optie omvat eenvoudigere stappen:
De
TrainClassifierestimator bevat intern de gegevens, zolang de kolommen die zijn geselecteerd in detraingegevenssettestvalidationde functies vertegenwoordigenDe
FindBestModelEstimator vindt het beste model uit een groep getrainde modellen. Hiervoor wordt het model gevonden dat het beste presteert op detestgegevensset op basis van de opgegeven metrische waardeDe
ComputeModelStatisticstransformator berekent de verschillende metrische gegevens op een gescoorde gegevensset (in ons geval devalidationgegevensset) tegelijkertijd
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)