Partager via


Inférence ONNX sur Spark

Dans cet exemple, vous entraînez un modèle LightGBM et convertissez ce modèle au format ONNX . Une fois converti, vous utilisez le modèle pour déduire certaines données de test sur Spark.

Cet exemple utilise ces packages et versions Python :

  • onnxmltools==1.7.0
  • lightgbm==3.2.1

Conditions préalables

  • Fixez votre notebook à un lakehouse. Sur le côté gauche, sélectionnez Ajouter pour ajouter un lac existant ou créer un lac.
  • Vous devrez peut-être installer onnxmltools. Pour ce faire, ajoutez !pip install onnxmltools==1.7.0 dans une cellule de code dans un bloc-notes, puis exécutez cette cellule.
  • Vous devrez peut-être installer lightgbm. Pour ce faire, ajoutez !pip install lightgbm==3.2.1 dans une cellule de code de bloc-notes, puis exécutez cette cellule.

Charger les exemples de données

Pour charger les exemples de données, ajoutez ces exemples de code aux cellules de votre notebook, puis exécutez ces cellules :

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)

display(df)

La sortie doit ressembler au tableau suivant. Les colonnes spécifiques affichées, le nombre de lignes et les valeurs réelles de la table peuvent différer :

Taux de couverture des intérêts Indicateur de revenu net Capitaux propres et passif
0.5641 1.0 0.0165
0.5702 1.0 0.0208
0.5673 1.0 0.0165

Utiliser LightGBM pour entraîner un modèle

from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

train_data = featurizer.transform(df)["Bankrupt?", "features"]

model = (
    LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?", dataTransferMode="bulk")
    .setEarlyStoppingRound(300)
    .setLambdaL1(0.5)
    .setNumIterations(1000)
    .setNumThreads(-1)
    .setMaxDeltaStep(0.5)
    .setNumLeaves(31)
    .setMaxDepth(-1)
    .setBaggingFraction(0.7)
    .setFeatureFraction(0.7)
    .setBaggingFreq(2)
    .setObjective("binary")
    .setIsUnbalance(True)
    .setMinSumHessianInLeaf(20)
    .setMinGainToSplit(0.01)
)

model = model.fit(train_data)

Convertir le modèle au format ONNX

Le code suivant exporte le modèle entraîné vers un booster LightGBM, puis convertit le modèle au format ONNX :

import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier


def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
    from onnxmltools.convert import convert_lightgbm
    from onnxconverter_common.data_types import FloatTensorType

    initial_types = [("input", FloatTensorType([-1, input_size]))]
    onnx_model = convert_lightgbm(
        lgbm_model, initial_types=initial_types, target_opset=9
    )
    return onnx_model.SerializeToString()


booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))

Après la conversion, chargez la charge utile ONNX dans un ONNXModelmodèle et inspectez les entrées et sorties du modèle :

from synapse.ml.onnx import ONNXModel

onnx_ml = ONNXModel().setModelPayload(model_payload_ml)

print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))

Mappez l’entrée du modèle au nom de colonne (FeedDict) du dataframe d’entrée et mappez les noms des colonnes du dataframe de sortie aux sorties du modèle (FetchDict) :

onnx_ml = (
    onnx_ml.setDeviceType("CPU")
    .setFeedDict({"input": "features"})
    .setFetchDict({"probability": "probabilities", "prediction": "label"})
    .setMiniBatchSize(5000)
)

Utiliser le modèle pour l’inférence

Pour effectuer une inférence avec le modèle, le code suivant crée des données de test et transforme les données via le modèle ONNX :

from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np

n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
    VectorAssembler()
    .setInputCols(cols)
    .setOutputCol("features")
    .transform(testDf)
    .drop(*cols)
    .cache()
)

display(onnx_ml.transform(testDf))

La sortie doit ressembler au tableau suivant, même si les valeurs et le nombre de lignes peuvent différer :

Index Fonctionnalités Prédiction Probabilité
1 "{"type":1,"values":[0.105... 0 "{"0":0.835...
2 "{"type":1,"values":[0.814... 0 "{"0":0.658...