Delen via


ONNX-inference op Spark

In dit voorbeeld traint u een LightGBM-model en converteert u dat model naar de ONNX-indeling . Nadat het model is geconverteerd, gebruikt u het model om enkele testgegevens in Spark af te leiden.

In dit voorbeeld worden deze Python-pakketten en -versies gebruikt:

  • onnxmltools==1.7.0
  • lightgbm==3.2.1

Vereiste voorwaarden

  • Koppel uw notitieblok aan een lakehouse. Selecteer aan de linkerkant Toevoegen om een bestaand lakehouse toe te voegen of een nieuw lakehouse te creĆ«ren.
  • Mogelijk moet u installeren onnxmltools. U doet dit door !pip install onnxmltools==1.7.0 in een notebookcodecel toe te voegen en die cel vervolgens uit te voeren.
  • Mogelijk moet u installeren lightgbm. Om dit te doen, voegt u !pip install lightgbm==3.2.1 toe in een notebookcodecel en voert u die cel vervolgens uit.

De voorbeeldgegevens laden

Als u de voorbeeldgegevens wilt laden, voegt u deze codevoorbeelden toe aan cellen in uw notebook en voert u deze cellen uit:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)

display(df)

De uitvoer moet er ongeveer uitzien als in de volgende tabel. De specifieke kolommen die worden weergegeven, het aantal rijen en de werkelijke waarden in de tabel kunnen verschillen:

Rentedekkingsverhouding Vlag netto-inkomsten Eigen vermogen ten opzichte van verplichtingen
0.5641 1.0 0.0165
0.5702 1.0 0.0208
0.5673 1.0 0.0165

LightGBM gebruiken om een model te trainen

from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

train_data = featurizer.transform(df)["Bankrupt?", "features"]

model = (
    LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?", dataTransferMode="bulk")
    .setEarlyStoppingRound(300)
    .setLambdaL1(0.5)
    .setNumIterations(1000)
    .setNumThreads(-1)
    .setMaxDeltaStep(0.5)
    .setNumLeaves(31)
    .setMaxDepth(-1)
    .setBaggingFraction(0.7)
    .setFeatureFraction(0.7)
    .setBaggingFreq(2)
    .setObjective("binary")
    .setIsUnbalance(True)
    .setMinSumHessianInLeaf(20)
    .setMinGainToSplit(0.01)
)

model = model.fit(train_data)

Het model converteren naar ONNX-indeling

Met de volgende code wordt het getrainde model geƫxporteerd naar een LightGBM Booster en wordt het model vervolgens geconverteerd naar de ONNX-indeling:

import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier


def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
    from onnxmltools.convert import convert_lightgbm
    from onnxconverter_common.data_types import FloatTensorType

    initial_types = [("input", FloatTensorType([-1, input_size]))]
    onnx_model = convert_lightgbm(
        lgbm_model, initial_types=initial_types, target_opset=9
    )
    return onnx_model.SerializeToString()


booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))

Laad na de conversie de ONNX-payload in een ONNXModel en controleer de invoer en uitvoer van het model.

from synapse.ml.onnx import ONNXModel

onnx_ml = ONNXModel().setModelPayload(model_payload_ml)

print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))

Wijs de modelinvoer toe aan de kolomnaam (FeedDict) van het invoergegevensframe en wijs de kolomnamen van het uitvoergegevensframe toe aan de modeluitvoer (FetchDict):

onnx_ml = (
    onnx_ml.setDeviceType("CPU")
    .setFeedDict({"input": "features"})
    .setFetchDict({"probability": "probabilities", "prediction": "label"})
    .setMiniBatchSize(5000)
)

Het model gebruiken voor deductie

Om inferentie met een model uit te voeren, genereert de volgende code testgegevens en transformeert deze gegevens via het ONNX-model.

from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np

n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
    VectorAssembler()
    .setInputCols(cols)
    .setOutputCol("features")
    .transform(testDf)
    .drop(*cols)
    .cache()
)

display(onnx_ml.transform(testDf))

De uitvoer moet er ongeveer uitzien als in de volgende tabel, hoewel de waarden en het aantal rijen kunnen verschillen:

Index Functies Voorspelling Kans
1 "{"type":1,"values":[0.105... 0 "{"0":0.835...
2 "{"type":1,"values":[0.814... 0 "{"0":0.658...