Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Entraînement distribué de modèles XGBoost à l’aide de
Important
Cette fonctionnalité est disponible en préversion publique.
Le package Python xgboost>=1.7 contient un nouveau module xgboost.spark. Ce module comprend les estimateurs PySpark xgboost xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifier et xgboost.spark.SparkXGBRanker. Ces nouvelles classes prennent en charge l’inclusion des estimateurs XGBoost dans les pipelines SparkML. Pour plus d’informations sur l’API, consultez la documentation de l’API XGBoost Python Spark.
Spécifications
Databricks Runtime 12.0 ML et versions ultérieures.
Paramètres xgboost.spark
Les estimateurs définis dans le module xgboost.spark prennent en charge la plupart des paramètres et arguments utilisés dans le XGBoost standard.
- Les paramètres du constructeur de classe, de la méthode
fitet de la méthodepredictsont en grande partie identiques à ceux du modulexgboost.sklearn. - Les noms, les valeurs et les paramètres par défaut sont pour la plupart identiques à ceux décrits dans Paramètres XGBoost.
- Les exceptions concernent quelques paramètres non pris en charge (par exemple
gpu_id,nthread,sample_weight,eval_set) ainsi que les paramètres spécifiques à l’estimateurpyspark, qui ont été ajoutés (par exemplefeaturesCol,labelCol,use_gpu,validationIndicatorCol). Pour plus d’informations, consultez la documentation de l’API XGBoost Python Spark.
Entraînement distribué
Les estimateurs PySpark définis dans le module xgboost.spark prennent en charge l’entraînement XGBoost distribué à l’aide du paramètre num_workers. Pour utiliser l’entraînement distribué, créez un classifieur ou un régresseur, puis affectez à num_workers le nombre de tâches Spark s’exécutant simultanément durant l’entraînement distribué. Pour utiliser tous les emplacements de tâches Spark, définissez num_workers=sc.defaultParallelism.
Par exemple :
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Notes
- Vous ne pouvez pas utiliser
mlflow.xgboost.autologavec XGBoost distribué. Pour journaliser un modèle Spark xgboost à l’aide de MLflow, utilisezmlflow.spark.log_model(spark_xgb_model, artifact_path). - Vous ne pouvez pas utiliser de XGBoost distribué sur un cluster pour lequel la mise à l’échelle automatique est activée. Les nouveaux nœuds Worker qui démarrent dans ce paradigme de mise à l’échelle élastique ne peuvent pas recevoir de nouveaux ensembles de tâches, et restent inactifs. Pour obtenir des instructions sur la désactivation de la mise à l’échelle automatique, consultez Activer la mise à l’échelle automatique.
Activer l’optimisation de l’entraînement sur un jeu de données aux caractéristiques éparses
Les estimateurs PySpark définis dans le module xgboost.spark prennent en charge l’optimisation de l’entraînement sur les jeux de données ayant des caractéristiques éparses.
Pour permettre l’optimisation des jeux de données aux caractéristiques éparses, vous devez fournir un jeu de données à la méthode fit, qui contient une colonne de caractéristiques composée des valeurs de type pyspark.ml.linalg.SparseVector, et affecter au paramètre enable_sparse_data_optim de l’estimateur la valeur True. De plus, vous devez affecter au paramètre missing la valeur 0.0.
Par exemple :
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
Formation du GPU
Les estimateurs PySpark définis dans le module xgboost.spark prennent en charge l’entraînement sur les GPU. Affectez au paramètre use_gpu la valeur True pour activer l’entraînement basé sur les GPU.
Notes
Pour chaque tâche Spark utilisée dans l’entraînement distribué XGBoost, un seul GPU est utilisé dans l’entraînement quand l’argument use_gpu a la valeur True. Databricks recommande d’utiliser la valeur par défaut 1 pour la configuration de cluster Spark spark.task.resource.gpu.amount. Sinon, les GPU supplémentaires alloués à cette tâche Spark sont inactifs.
Par exemple :
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Dépannage
Lors d’un entraînement à plusieurs nœuds, si vous rencontrez un message NCCL failure: remote process exited or there was a network error, cela indique généralement un problème de communication réseau entre les GPU. Ce problème survient lorsque la bibliothèque NCCL (NVIDIA Collective Communications Library) ne peut pas utiliser certaines interfaces réseau pour la communication avec le GPU.
Pour résoudre, définissez la sparkConf du cluster pour spark.executorEnv.NCCL_SOCKET_IFNAME sur eth. Cela définit essentiellement la variable d’environnement NCCL_SOCKET_IFNAME sur eth pour tous les workers d’un nœud.
Exemple de bloc-notes
Ce notebook montre l’utilisation du package Python xgboost.spark avec Spark MLlib.
Notebook PySpark-XGBoost
Guide de migration du module déprécié sparkdl.xgboost
- Remplacez
from sparkdl.xgboost import XgboostRegressorparfrom xgboost.spark import SparkXGBRegressor, puisfrom sparkdl.xgboost import XgboostClassifierparfrom xgboost.spark import SparkXGBClassifier. - Changez tous les noms de paramètres dans le constructeur de l’estimateur en passant de la casse camelCase à la casse snake_case. Par exemple, remplacez
XgboostRegressor(featuresCol=XXX)parSparkXGBRegressor(features_col=XXX). - Les paramètres
use_external_storageetexternal_storage_precisionont été supprimés. Les estimateursxgboost.sparkutilisent l’API d’itération des données DMatrix pour utiliser la mémoire de manière plus efficace. Il n’est plus nécessaire d’utiliser le mode de stockage externe inefficace. Pour les jeux de données extrêmement volumineux, Databricks vous recommande d’augmenter le paramètrenum_workers, ce qui oblige chaque tâche d’entraînement à partitionner les données en partitions de données plus petites, plus faciles à gérer. Envisagez de définirnum_workers = sc.defaultParallelism, qui définitnum_workersau nombre total d’emplacements de tâches Spark dans le cluster. - Pour les estimateurs définis dans
xgboost.spark, le paramètrenum_workers=1permet d’exécuter l’entraînement du modèle à l’aide d’une seule tâche Spark. Cela implique l’utilisation du nombre de cœurs de processeur spécifié par le paramètre de configuration de cluster Sparkspark.task.cpus, dont la valeur est 1 par défaut. Pour utiliser davantage de cœurs d’unité centrale pour entraîner le modèle, augmenteznum_workersouspark.task.cpus. Vous ne pouvez pas définir le paramètrenthreadoun_jobspour les estimateurs définis dansxgboost.spark. Ce comportement est différent du comportement précédent des estimateurs définis dans le packagesparkdl.xgboostdéprécié.
Convertir un modèle sparkdl.xgboost en modèle xgboost.spark
Les modèles sparkdl.xgboost sont enregistrés dans un format différent de celui des modèles xgboost.spark et ont des paramètres différents. Utilisez la fonction utilitaire suivante pour convertir le modèle :
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Si vous avez un modèle pyspark.ml.PipelineModel contenant un modèle sparkdl.xgboost comme dernière étape, vous pouvez remplacer l’étape du modèle sparkdl.xgboost par le modèle converti xgboost.spark.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)