Delen via


Classificatietaken met SynapseML

In dit artikel wordt beschreven hoe u een specifieke classificatietaak met twee methoden uitvoert. Eén methode maakt gebruik van gewone pysparkgegevens en één methode maakt gebruik van de synapseml bibliotheek. Hoewel de methoden dezelfde prestaties opleveren, benadrukken ze de eenvoud van synapseml in vergelijking met pyspark.

De taak die in dit artikel wordt beschreven, voorspelt of een specifieke klantbeoordeling van het boek dat op Amazon is verkocht, goed is (beoordeling > 3) of slecht, op basis van de beoordelingstekst. Als u de taak wilt bouwen, traint u LogisticRegression-cursisten met verschillende hyperparameters en kiest u vervolgens het beste model.

Vereiste voorwaarden

Koppel uw notitieblok aan een lakehouse. Aan de linkerkant kunt u Toevoegen selecteren om een bestaand lakehouse toe te voegen of u kunt een nieuw lakehouse maken.

Configuratie

Importeer de benodigde Python-bibliotheken en haal een Spark-sessie op:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

De gegevens lezen

Download en lees de gegevens:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Functies extraheren en gegevens verwerken

Echte gegevens hebben meer complexiteit, vergeleken met de gegevensset die we eerder hebben gedownload. Een gegevensset heeft vaak functies van meerdere typen, bijvoorbeeld tekst, numeriek en categorisch. Als u de problemen met het werken met deze gegevenssets wilt weergeven, voegt u twee numerieke functies toe aan de gegevensset: het aantal woorden van de beoordeling en de gemiddelde woordlengte:

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Classificeren met behulp van pyspark

Als u de beste LogisticRegression-classificatie wilt kiezen met behulp van de pyspark bibliotheek, moet u deze stappen expliciet uitvoeren:

  1. De functies verwerken
    • De tekstkolom tokeniseren
    • Hash de getokeniseerde kolom in een vector.
    • De numerieke kenmerken samenvoegen met de vector
  2. Om de labelkolom te verwerken, moet u die kolom naar het juiste type casten.
  3. Meerdere LogisticRegression-algoritmen trainen op de train gegevensset, met verschillende hyperparameters
  4. Bereken het gebied onder de ROC-curve voor elk van de getrainde modellen en selecteer het model met de hoogste metrische waarde zoals berekend op de test gegevensset
  5. Het beste model op de validation set evalueren
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Classificeren met SynapseML

De synapseml optie omvat eenvoudigere stappen:

  1. De TrainClassifier estimator bevat intern de gegevens, zolang de kolommen die zijn geselecteerd in de traingegevensset testvalidation de functies vertegenwoordigen

  2. De FindBestModel Estimator vindt het beste model uit een groep getrainde modellen. Hiervoor wordt het model gevonden dat het beste presteert op de test gegevensset op basis van de opgegeven metrische waarde

  3. De ComputeModelStatistics transformator berekent de verschillende metrische gegevens op een gescoorde gegevensset (in ons geval de validation gegevensset) tegelijkertijd

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)