이 문서에서는 두 가지 방법으로 특정 분류 작업을 수행하는 방법을 보여 줍니다. 한 메서드는 일반 pyspark을 사용하고 한 메서드는 라이브러리를 synapseml 사용합니다. 비록 메서드들이 동일한 성능을 가져오지만, pyspark와 비교했을 때 synapseml의 단순함을 강조합니다.
이 문서에 설명된 작업은 Amazon에서 판매된 책의 특정 고객 리뷰가 리뷰 텍스트에 따라 양호(등급 > 3)인지 나쁜지 예측합니다. 작업을 빌드하려면 여러 하이퍼 매개 변수를 사용하여 LogisticRegression 학습자를 학습한 다음 최상의 모델을 선택합니다.
필수 조건
노트북을 레이크하우스에 연결합니다. 왼쪽에서 추가 를 선택하여 기존 레이크하우스를 추가하거나 새 레이크하우스를 만들 수 있습니다.
설치
필요한 Python 라이브러리를 가져오고 Spark 세션을 가져옵니다.
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
데이터 읽기
데이터를 다운로드하고 읽습니다.
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
기능 추출 및 데이터 처리
실제 데이터는 이전에 다운로드한 데이터 세트에 비해 더 복잡합니다. 데이터 세트에는 텍스트, 숫자 및 범주와 같은 여러 형식의 기능이 있는 경우가 많습니다. 이러한 데이터 세트 작업의 어려움을 표시하려면 데이터 세트에 검토 단어 수 와 평균 단어 길이라는 두 가지 숫자 기능을 추가합니다.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
pyspark를 사용하여 분류
라이브러리를 사용하여 pyspark 최상의 LogisticRegression 분류자를 선택하려면 다음 단계를 명시적으로 수행해야 합니다.
- 기능 처리
- 텍스트 열 토큰화
- 토큰화된 열을 해시 기법을 사용하여 벡터로 변환합니다.
- 숫자 기능을 벡터와 병합
- 레이블 열을 처리하려면 해당 열을 적절한 형식으로 캐스팅합니다.
- 서로 다른 하이퍼 매개 변수를 사용하여
train데이터 세트에서 여러 LogisticRegression 알고리즘 학습 - 학습된 각 모델에 대한 ROC 곡선 아래의 영역을 계산하고 데이터 세트에서 계산된
test메트릭이 가장 높은 모델을 선택합니다. -
validation집합에서 최상의 모델을 평가하기
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
SynapseML을 사용하여 분류
이 synapseml 옵션에는 더 간단한 단계가 포함됩니다.
내부 예측 도구는 데이터 세트의
train,test,validation열이 특성을 나타낼 경우, 데이터를 특징 형상화합니다.추정기는
FindBestModel학습된 모델 풀에서 최상의 모델을 찾습니다. 이 작업을 수행하기 위해, 지정된 메트릭을 기준으로test데이터 세트에서 가장 성능이 우수한 모델을 찾습니다.ComputeModelStatistics변환기는 점수가 매기된 데이터 세트(이 경우validation데이터 세트)에서 다른 메트릭을 동시에 계산합니다.
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)