Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Este artigo mostra como executar uma tarefa de classificação específica com dois métodos. Um método usa simples pysparke um método usa a synapseml biblioteca. Embora os métodos produzam o mesmo desempenho, eles realçam a simplicidade em synapseml comparação com pyspark.
A tarefa descrita neste artigo prevê se uma avaliação específica de um cliente de um livro vendido na Amazon é boa (classificação >= 3) ou ruim, com base no texto da avaliação. Para criar a tarefa, treine os aprendizes de LogisticRegression com hiperparâmetros diferentes e escolha o melhor modelo.
Pré-requisitos
Anexe seu notebook a um lakehouse. No lado esquerdo, você pode selecionar Adicionar para adicionar uma lakehouse existente ou criar uma nova lakehouse.
Configuração
Importe as bibliotecas necessárias do Python e obtenha uma sessão do Spark:
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Ler os dados
Baixe e leia os dados:
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Extrair características e processar dados
Os dados reais têm mais complexidade em comparação com o conjunto de dados que baixamos anteriormente. Um conjunto de dados geralmente tem recursos de vários tipos – por exemplo, texto, numérico e categórico. Para mostrar as dificuldades de trabalhar com esses conjuntos de dados, adicione dois recursos numéricos ao conjunto de dados: a contagem de palavras da revisão e o comprimento médio da palavra:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Classificar usando pyspark
Para escolher o melhor classificador logisticRegression usando a pyspark biblioteca, você deve executar explicitamente estas etapas:
- Processar os recursos
- Tokenizar a coluna de texto
- Hash da coluna tokenizada em um vetor usando hash
- Mesclar as características numéricas com o vetor
- Para processar a coluna de rótulo, converta essa coluna no tipo adequado
- Treinar vários algoritmos de LogisticRegression no
trainconjunto de dados, com hiperparâmetros diferentes - Compute a área na curva ROC para cada um dos modelos treinados e selecione o modelo com a métrica mais alta, conforme computado no
testconjunto de dados - Avaliar o melhor modelo no conjunto
validation
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Classificar usando SynapseML
A synapseml opção envolve etapas mais simples:
O Avaliador
TrainClassifierapresenta os dados internamente, desde que as colunas selecionadas no conjunto de dadostrain,test,validationrepresentem os recursosO
FindBestModelAvaliador localiza o melhor modelo de um pool de modelos treinados. Para fazer isso, ele localiza o modelo que tem o melhor desempenho notestconjunto de dados dada a métrica especificadaO
ComputeModelStatisticsTransformador calcula ao mesmo tempo as diferentes métricas em um conjunto de dados avaliado (em nosso caso, o conjunto de dadosvalidation)
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)