Udostępnij przez


Zadania klasyfikacji przy użyciu usługi SynapseML

W tym artykule pokazano, jak wykonać określone zadanie klasyfikacji przy użyciu dwóch metod. Jedna metoda używa zwykłego pyspark, a inna metoda korzysta z biblioteki synapseml. Mimo że metody zapewniają taką samą wydajność, podkreślają prostotę synapseml w porównaniu z pyspark.

Zadanie opisane w tym artykule przewiduje, czy konkretny przegląd książki sprzedanej przez klienta na Amazon jest dobry (ocena > 3) lub zły, na podstawie tekstu przeglądu. Aby skompilować zadanie, przeszkolisz uczniów usługi LogisticsRegression przy użyciu różnych hiperparametrów, a następnie wybierz najlepszy model.

Wymagania wstępne

Dołącz swój notebook do lakehouse. Po lewej stronie możesz wybrać opcję Dodaj, aby dodać istniejący lakehouse lub stworzyć nowy lakehouse.

Konfiguracja

Zaimportuj niezbędne biblioteki języka Python i uzyskaj sesję platformy Spark:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Odczytywanie danych

Pobierz i odczytaj dane:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Wyodrębnianie funkcji i przetwarzanie danych

Rzeczywiste dane mają większą złożoność w porównaniu z pobranym wcześniej zestawem danych. Zestaw danych często zawiera funkcje wielu typów — na przykład tekst, liczbowy i kategorialny. Aby pokazać trudności w pracy z tymi zestawami danych, dodaj do zestawu danych dwie cechy liczbowe: liczbę wyrazów przeglądu i średnią długość słowa:

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Klasyfikowanie przy użyciu narzędzia pyspark

Aby wybrać najlepszy klasyfikator LogisticsRegression przy użyciu pyspark biblioteki, należy jawnie wykonać następujące kroki:

  1. Przetwarzanie cech
    • Tokenizowanie kolumny tekstowej
    • Skróć kolumnę tokenizowaną do wektora przy użyciu haszowania
    • Scalanie cech liczbowych z wektorem
  2. Aby przetworzyć kolumnę etykiety, zmień typ tej kolumny na odpowiedni.
  3. Przećwicz wiele algorytmów regresji logistycznej na zestawie danych train przy użyciu różnych hiperparametrów
  4. Oblicz obszar pod krzywą ROC dla każdego z przeszkolonych modeli i wybierz model z najwyższą metryką obliczoną na podstawie zestawu danych test.
  5. Oceń najlepszy model na validation zestawie
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Klasyfikowanie przy użyciu usługi SynapseML

Opcja synapseml obejmuje prostsze kroki:

  1. Narzędzie TrainClassifier do szacowania wewnętrznie featurizuje dane, o ile kolumny wybrane w zestawie danych train, test, validation reprezentują cechy

  2. Estimator FindBestModel znajduje najlepszy model z puli wytrenowanych modeli. W tym celu znajduje model, który najlepiej sprawdza się w test zestawie danych, biorąc pod uwagę określoną metryę

  3. Funkcja ComputeModelStatistics Transformer oblicza różne metryki na ocenianym zestawie danych (w naszym przypadku validation zestaw danych) w tym samym czasie

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)