Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Este artigo mostra como executar uma tarefa de classificação específica com dois métodos. Um método usa plain pyspark, e um método usa a synapseml biblioteca. Embora os métodos produzam o mesmo desempenho, destacam a simplicidade de synapseml comparado com pyspark.
A tarefa descrita neste artigo prevê se uma avaliação específica de um cliente sobre o livro vendido na Amazon é boa (classificação > 3) ou ruim, com base no texto da avaliação. Para criar a tarefa, treine os alunos do LogisticRegression com diferentes hiperparâmetros e, em seguida, escolha o melhor modelo.
Pré-requisitos
Ligue o seu bloco de notas a uma casa no lago. No lado esquerdo, você pode selecionar Adicionar para adicionar uma casa de lago existente, ou você pode criar uma nova casa de lago.
Configuração
Importe as bibliotecas Python necessárias e obtenha uma sessão do Spark:
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Leia os dados
Faça o download e leia os dados:
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Extrair características e processar dados
Os dados reais têm mais complexidade, em comparação com o conjunto de dados que baixamos anteriormente. Um conjunto de dados geralmente tem recursos de vários tipos - por exemplo, texto, numérico e categórico. Para mostrar as dificuldades de trabalhar com esses conjuntos de dados, adicione duas características numéricas ao conjunto de dados: a contagem de palavras da revisão e o comprimento médio das palavras:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Classificar usando pyspark
Para escolher o melhor classificador LogisticRegression usando a pyspark biblioteca, você deve executar explicitamente estas etapas:
- Processar os recursos
- Tokenizar a coluna de texto
- Transformar a coluna tokenizada num vetor usando hash.
- Fundir as características numéricas com o vetor
- Para processar a coluna do rótulo, converta-a no tipo adequado
- Treinar vários algoritmos de Regressão Logística no conjunto de dados
train, com diferentes hiperparâmetros. - Calcule a área sob a curva ROC para cada um dos modelos treinados e selecione o modelo com a métrica mais alta, conforme calculado no
testconjunto de dados - Avalie o melhor modelo no conjunto
validation
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Classificar usando SynapseML
A synapseml opção envolve etapas mais simples:
O
TrainClassifierEstimador featuriza internamente os dados, desde que as colunas selecionadas notrain,test,validationconjunto de dados representem as característicasO
FindBestModelEstimador encontra o melhor modelo a partir de um conjunto de modelos treinados. Para fazer isso, ele encontra o modelo que tem melhor desempenho notestconjunto de dados dada a métrica especificadaO
ComputeModelStatisticsTransformer calcula as diferentes métricas num conjunto de dados avaliado (no nosso caso, o conjunto de dadosvalidation) simultaneamente.
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)