Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym artykule pokazano, jak wykonać określone zadanie klasyfikacji przy użyciu dwóch metod. Jedna metoda używa zwykłego pyspark, a inna metoda korzysta z biblioteki synapseml. Mimo że metody zapewniają taką samą wydajność, podkreślają prostotę synapseml w porównaniu z pyspark.
Zadanie opisane w tym artykule przewiduje, czy konkretny przegląd książki sprzedanej przez klienta na Amazon jest dobry (ocena > 3) lub zły, na podstawie tekstu przeglądu. Aby skompilować zadanie, przeszkolisz uczniów usługi LogisticsRegression przy użyciu różnych hiperparametrów, a następnie wybierz najlepszy model.
Wymagania wstępne
Dołącz swój notebook do lakehouse. Po lewej stronie możesz wybrać opcję Dodaj, aby dodać istniejący lakehouse lub stworzyć nowy lakehouse.
Konfiguracja
Zaimportuj niezbędne biblioteki języka Python i uzyskaj sesję platformy Spark:
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Odczytywanie danych
Pobierz i odczytaj dane:
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Wyodrębnianie funkcji i przetwarzanie danych
Rzeczywiste dane mają większą złożoność w porównaniu z pobranym wcześniej zestawem danych. Zestaw danych często zawiera funkcje wielu typów — na przykład tekst, liczbowy i kategorialny. Aby pokazać trudności w pracy z tymi zestawami danych, dodaj do zestawu danych dwie cechy liczbowe: liczbę wyrazów przeglądu i średnią długość słowa:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Klasyfikowanie przy użyciu narzędzia pyspark
Aby wybrać najlepszy klasyfikator LogisticsRegression przy użyciu pyspark biblioteki, należy jawnie wykonać następujące kroki:
- Przetwarzanie cech
- Tokenizowanie kolumny tekstowej
- Skróć kolumnę tokenizowaną do wektora przy użyciu haszowania
- Scalanie cech liczbowych z wektorem
- Aby przetworzyć kolumnę etykiety, zmień typ tej kolumny na odpowiedni.
- Przećwicz wiele algorytmów regresji logistycznej na zestawie danych
trainprzy użyciu różnych hiperparametrów - Oblicz obszar pod krzywą ROC dla każdego z przeszkolonych modeli i wybierz model z najwyższą metryką obliczoną na podstawie zestawu danych
test. - Oceń najlepszy model na
validationzestawie
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Klasyfikowanie przy użyciu usługi SynapseML
Opcja synapseml obejmuje prostsze kroki:
Narzędzie
TrainClassifierdo szacowania wewnętrznie featurizuje dane, o ile kolumny wybrane w zestawie danychtrain,test,validationreprezentują cechyEstimator
FindBestModelznajduje najlepszy model z puli wytrenowanych modeli. W tym celu znajduje model, który najlepiej sprawdza się wtestzestawie danych, biorąc pod uwagę określoną metryęFunkcja
ComputeModelStatisticsTransformer oblicza różne metryki na ocenianym zestawie danych (w naszym przypadkuvalidationzestaw danych) w tym samym czasie
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)