Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este artículo se muestra cómo realizar una tarea de clasificación específica con dos métodos. Un método usa sin formato pyspark, y un método usa la biblioteca synapseml. Aunque los métodos producen el mismo rendimiento, resaltan la simplicidad de synapseml en comparación con pyspark.
La tarea descrita en este artículo predice si una opinión específica del cliente de libros vendidos en Amazon es buena (calificación > 3) o incorrecta, en función del texto de la revisión. Para compilar la tarea, entrene los aprendices de LogisticRegression con diferentes hiperparámetros y a continuación, elija el mejor modelo.
Prerrequisitos
Adjunte su bloc de notas a un lago de datos. En el lado izquierdo, puede seleccionar Agregar para agregar una instancia de Lakehouse existente, o bien puede crear una nueva instancia de LakeHouse.
Configuración
Importe las bibliotecas de Python necesarias y obtenga una sesión de Spark:
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Lee los datos
Descargue y lea los datos:
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Extracción de características y datos de proceso
Los datos reales tienen más complejidad, en comparación con el conjunto de datos que descargó anteriormente. Un conjunto de datos suele tener características de varios tipos, por ejemplo, texto, numérico y categórico. Para mostrar las dificultades de trabajar con estos conjuntos de datos, agregue dos características numéricas al conjunto de datos: el recuento de palabras de la revisión y la longitud media de la palabra:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Clasificación mediante pyspark
Para elegir el mejor clasificador LogisticRegression mediante la pyspark biblioteca, debe realizar explícitamente estos pasos:
- Procesar las características
- Tokenizar la columna de texto
- Hashea la columna tokenizada en un vector utilizando hash
- Combinar las características numéricas con el vector
- Para procesar la columna de etiqueta, convierta esa columna en el tipo adecuado.
- Entrena varios algoritmos de regresión logística en el
trainconjunto de datos, con diferentes hiperparámetros. - Calcule el área bajo la curva ROC para cada uno de los modelos entrenados y seleccione el modelo con la métrica más alta calculada en el
testconjunto de datos. - Evaluar el mejor modelo en el conjunto
validation
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Clasificación mediante SynapseML
La synapseml opción implica pasos más sencillos:
El estimador de
TrainClassifierincluye internamente los datos, siempre y cuando las columnas seleccionadas en el conjunto de datostrain,test,validationrepresenten las característicasEl
FindBestModelestimador busca el mejor modelo de un grupo de modelos entrenados. Para ello, busca el modelo que funciona mejor en eltestconjunto de datos según la métrica especificada.ComputeModelStatisticsEl transformador calcula las distintas métricas en un conjunto de datos puntuado (en nuestro caso, elvalidationconjunto de datos) al mismo tiempo.
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)