Partilhar via


Tarefas de classificação usando SynapseML

Este artigo mostra como executar uma tarefa de classificação específica com dois métodos. Um método usa plain pyspark, e um método usa a synapseml biblioteca. Embora os métodos produzam o mesmo desempenho, destacam a simplicidade de synapseml comparado com pyspark.

A tarefa descrita neste artigo prevê se uma avaliação específica de um cliente sobre o livro vendido na Amazon é boa (classificação > 3) ou ruim, com base no texto da avaliação. Para criar a tarefa, treine os alunos do LogisticRegression com diferentes hiperparâmetros e, em seguida, escolha o melhor modelo.

Pré-requisitos

Ligue o seu bloco de notas a uma casa no lago. No lado esquerdo, você pode selecionar Adicionar para adicionar uma casa de lago existente, ou você pode criar uma nova casa de lago.

Configuração

Importe as bibliotecas Python necessárias e obtenha uma sessão do Spark:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Leia os dados

Faça o download e leia os dados:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Extrair características e processar dados

Os dados reais têm mais complexidade, em comparação com o conjunto de dados que baixamos anteriormente. Um conjunto de dados geralmente tem recursos de vários tipos - por exemplo, texto, numérico e categórico. Para mostrar as dificuldades de trabalhar com esses conjuntos de dados, adicione duas características numéricas ao conjunto de dados: a contagem de palavras da revisão e o comprimento médio das palavras:

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Classificar usando pyspark

Para escolher o melhor classificador LogisticRegression usando a pyspark biblioteca, você deve executar explicitamente estas etapas:

  1. Processar os recursos
    • Tokenizar a coluna de texto
    • Transformar a coluna tokenizada num vetor usando hash.
    • Fundir as características numéricas com o vetor
  2. Para processar a coluna do rótulo, converta-a no tipo adequado
  3. Treinar vários algoritmos de Regressão Logística no conjunto de dados train, com diferentes hiperparâmetros.
  4. Calcule a área sob a curva ROC para cada um dos modelos treinados e selecione o modelo com a métrica mais alta, conforme calculado no test conjunto de dados
  5. Avalie o melhor modelo no conjunto validation
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Classificar usando SynapseML

A synapseml opção envolve etapas mais simples:

  1. O TrainClassifier Estimador featuriza internamente os dados, desde que as colunas selecionadas no train, test, validation conjunto de dados representem as características

  2. O FindBestModel Estimador encontra o melhor modelo a partir de um conjunto de modelos treinados. Para fazer isso, ele encontra o modelo que tem melhor desempenho no test conjunto de dados dada a métrica especificada

  3. O ComputeModelStatistics Transformer calcula as diferentes métricas num conjunto de dados avaliado (no nosso caso, o conjunto de dados validation) simultaneamente.

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)