แชร์ผ่าน


การอนุมาน ONNX บน Spark

ในตัวอย่างนี้ คุณฝึกแบบจําลอง LightGBM และแปลงแบบจําลองนั้นเป็นรูปแบบ ONNX เมื่อแปลงแล้ว คุณจะใช้แบบจําลองเพื่ออนุมานข้อมูลการทดสอบบางอย่างบน Spark

ตัวอย่างนี้ใช้แพคเกจ Python และเวอร์ชันเหล่านี้:

  • onnxmltools==1.7.0
  • lightgbm==3.2.1

ข้อกําหนดเบื้องต้น

  • แนบสมุดบันทึกของคุณเข้ากับเลคเฮ้าส์ ทางด้านซ้าย เลือก เพิ่ม เพื่อเพิ่มเลคเฮ้าส์ที่มีอยู่แล้ว หรือสร้างเลคเฮ้าส์
  • คุณอาจต้องติดตั้งonnxmltools เมื่อต้องการทําเช่นนี้ ให้เพิ่ม !pip install onnxmltools==1.7.0 ในเซลล์ของรหัสสมุดบันทึก แล้วเรียกใช้เซลล์นั้น
  • คุณอาจต้องติดตั้งlightgbm เมื่อต้องการทําเช่นนี้ ให้เพิ่ม !pip install lightgbm==3.2.1 ในเซลล์ของรหัสสมุดบันทึก แล้วเรียกใช้เซลล์นั้น

โหลดข้อมูลตัวอย่าง

เมื่อต้องการโหลดข้อมูลตัวอย่าง ให้เพิ่มตัวอย่างโค้ดเหล่านี้ลงในเซลล์ในสมุดบันทึกของคุณ แล้วเรียกใช้เซลล์เหล่านั้น:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)

display(df)

ผลลัพธ์ควรมีลักษณะคล้ายกับตารางต่อไปนี้ คอลัมน์เฉพาะที่แสดง จํานวนแถว และค่าจริงในตารางอาจแตกต่างกัน:

อัตราส่วนความครอบคลุมดอกเบี้ย ธงรายได้สุทธิ ส่วนของผู้ถือหุ้นที่มีภาระความรับผิด
0.5641 1.0 0.0165
0.5702 1.0 0.0208
0.5673 1.0 0.0165

ใช้ LightGBM เพื่อฝึกแบบจําลอง

from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

train_data = featurizer.transform(df)["Bankrupt?", "features"]

model = (
    LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?", dataTransferMode="bulk")
    .setEarlyStoppingRound(300)
    .setLambdaL1(0.5)
    .setNumIterations(1000)
    .setNumThreads(-1)
    .setMaxDeltaStep(0.5)
    .setNumLeaves(31)
    .setMaxDepth(-1)
    .setBaggingFraction(0.7)
    .setFeatureFraction(0.7)
    .setBaggingFreq(2)
    .setObjective("binary")
    .setIsUnbalance(True)
    .setMinSumHessianInLeaf(20)
    .setMinGainToSplit(0.01)
)

model = model.fit(train_data)

แปลงแบบจําลองเป็นรูปแบบ ONNX

โค้ดต่อไปนี้ส่งออกแบบจําลองที่ได้รับการฝึกไปยังโปรแกรมเสริม LightGBM จากนั้นแปลงแบบจําลองเป็นรูปแบบ ONNX:

import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier


def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
    from onnxmltools.convert import convert_lightgbm
    from onnxconverter_common.data_types import FloatTensorType

    initial_types = [("input", FloatTensorType([-1, input_size]))]
    onnx_model = convert_lightgbm(
        lgbm_model, initial_types=initial_types, target_opset=9
    )
    return onnx_model.SerializeToString()


booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))

หลังจากการแปลงข้อมูล ให้โหลดส่วนข้อมูล ONNX ลงใน ONNXModelและตรวจสอบอินพุตและเอาต์พุตของแบบจําลอง:

from synapse.ml.onnx import ONNXModel

onnx_ml = ONNXModel().setModelPayload(model_payload_ml)

print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))

แมปข้อมูลป้อนเข้าแบบจําลองไปยังชื่อคอลัมน์ (FeedDict) ของกรอบข้อมูลป้อนเข้า และแมปชื่อคอลัมน์ของดาต้าเฟรมผลลัพธ์ไปยังการแสดงผลแบบจําลอง (FetchDict):

onnx_ml = (
    onnx_ml.setDeviceType("CPU")
    .setFeedDict({"input": "features"})
    .setFetchDict({"probability": "probabilities", "prediction": "label"})
    .setMiniBatchSize(5000)
)

ใช้แบบจําลองสําหรับการอนุมาน

หากต้องการดําเนินการอนุมานกับแบบจําลอง โค้ดต่อไปนี้จะสร้างข้อมูลทดสอบและแปลงข้อมูลผ่านแบบจําลอง ONNX:

from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np

n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
    VectorAssembler()
    .setInputCols(cols)
    .setOutputCol("features")
    .transform(testDf)
    .drop(*cols)
    .cache()
)

display(onnx_ml.transform(testDf))

ผลลัพธ์ควรมีลักษณะคล้ายกับตารางต่อไปนี้ แต่ค่าและจํานวนแถวอาจแตกต่างกัน:

Index คุณลักษณะ การคาดการณ์ ความน่าจะเป็น
1 "{"type":1,"values":[0.105... 0 "{"0":0.835...
2 "{"type":1,"values":[0.814... 0 "{"0":0.658...