Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Gedistribueerde training van XGBoost-modellen met behulp van
Belangrijk
Deze functie is beschikbaar als openbare preview.
Het Python-pakket xgboost>=1.7 bevat een nieuwe module xgboost.spark. Deze module bevat de xgboost PySpark-estimators xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifieren xgboost.spark.SparkXGBRanker. Deze nieuwe klassen ondersteunen het opnemen van XGBoost-schattingen in SparkML-pijplijnen. Zie het XGBoost Python Spark API-document voor API-details voor XGBoost.
Vereisten
Databricks Runtime 12.0 ML en hoger.
parameters voor xgboost.spark
De schattingen die zijn gedefinieerd in de xgboost.spark module ondersteunen de meeste van dezelfde parameters en argumenten die worden gebruikt in standaard XGBoost.
- De parameters voor de klasseconstructor,
fitmethode enpredictmethode zijn grotendeels identiek aan de parameters in dexgboost.sklearnmodule. - Naamgeving, waarden en standaardinstellingen zijn grotendeels identiek aan die worden beschreven in XGBoost-parameters.
- Uitzonderingen zijn enkele niet-ondersteunde parameters (zoals
gpu_id,nthread,sample_weight,eval_set) en depyspark-schattingsparameters die zijn toegevoegd (zoalsfeaturesCol,labelCol,use_gpu,validationIndicatorCol). Zie de documentatie voor XGBoost Python Spark API voor meer informatie.
Gedistribueerde training
PySpark-estimators die zijn gedefinieerd in de xgboost.spark module ondersteunen gedistribueerde XGBoost-training met behulp van de num_workers parameter. Als u gedistribueerde training wilt gebruiken, maakt u een classificatie of regressor en stelt u num_workers in op het aantal gelijktijdige actieve Spark-taken tijdens gedistribueerde training. Als u alle Spark-taakslots wilt gebruiken, stelt u num_workers=sc.defaultParallelismals instelling in.
Bijvoorbeeld:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Notitie
- U kunt niet gebruiken
mlflow.xgboost.autologmet gedistribueerde XGBoost. Als u een xgboost Spark-model wilt registreren met behulp van MLflow, gebruikt umlflow.spark.log_model(spark_xgb_model, artifact_path). - U kunt gedistribueerde XGBoost niet gebruiken op een cluster waarvoor automatisch schalen is ingeschakeld. Nieuwe werkknooppunten die beginnen in dit paradigma voor elastisch schalen, kunnen geen nieuwe sets taken ontvangen en blijven inactief. Zie Automatisch schalen inschakelen voor instructies voor het uitschakelen van automatisch schalen.
Optimalisatie inschakelen voor training over gegevensset met sparsefuncties
PySpark Estimators die zijn gedefinieerd in xgboost.spark moduleondersteuningsoptimalisatie voor training over gegevenssets met sparse-functies.
Als u de optimalisatie van sparse-functie sets wilt inschakelen, moet u een gegevensset opgeven voor de fit methode die een kenmerkenkolom bevat die bestaat uit waarden van het type pyspark.ml.linalg.SparseVector en de parameter enable_sparse_data_optim van de estimator instellen op True. Daarnaast moet u de parameter missing instellen op 0.0.
Bijvoorbeeld:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU-training
PySpark-schattingen die zijn gedefinieerd in de xgboost.spark moduleondersteuningstraining over GPU's. Stel de parameter use_gpu in op True om GPU-training in te schakelen.
Notitie
Voor elke Spark-taak die wordt gebruikt in gedistribueerde XGBoost-training, wordt er slechts één GPU gebruikt in de training wanneer het argument use_gpu is ingesteld op True. Databricks raadt aan de standaardwaarde van 1 de Spark-clusterconfiguratie spark.task.resource.gpu.amountte gebruiken. Anders zijn de extra GPU's die aan deze Spark-taak zijn toegewezen, niet actief.
Bijvoorbeeld:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Probleemoplossing
Tijdens training met meerdere knooppunten, als u een NCCL failure: remote process exited or there was a network error bericht tegenkomt, duidt dit meestal op een probleem met netwerkcommunicatie tussen GPU's. Dit probleem treedt op wanneer NCCL (NVIDIA Collective Communications Library) bepaalde netwerkinterfaces voor GPU-communicatie niet kan gebruiken.
U kunt dit oplossen door de sparkConf van het cluster in te stellen op spark.executorEnv.NCCL_SOCKET_IFNAMEeth. Hiermee stelt u de omgevingsvariabele NCCL_SOCKET_IFNAMEeth in feite in voor alle werkrollen in een knooppunt.
Voorbeeld van notebook
Dit notebook toont het gebruik van het Python-pakket xgboost.spark met Spark MLlib.
PySpark-XGBoost-notebook
Migratiehandleiding voor de afgeschafte sparkdl.xgboost module
- Vervangen
from sparkdl.xgboost import XgboostRegressordoorfrom xgboost.spark import SparkXGBRegressoren vervangen doorfrom sparkdl.xgboost import XgboostClassifierfrom xgboost.spark import SparkXGBClassifier. - Wijzig alle parameternamen in de estimatorconstructor van de camelCase-stijl in snake_case stijl. Wijzig bijvoorbeeld
XgboostRegressor(featuresCol=XXX)inSparkXGBRegressor(features_col=XXX). - De parameters
use_external_storageenexternal_storage_precisionzijn verwijderd.xgboost.sparkschattingen maken gebruik van de DMatrix-gegevensiteratie-API om efficiënter geheugen te gebruiken. U hoeft de inefficiënte externe opslagmodus niet meer te gebruiken. Voor extreem grote gegevenssets raadt Databricks u aan de parameternum_workerste verhogen, waardoor elke trainingstaak de gegevens partitioneert in kleinere, beter beheerbare gegevenspartities. Overweeg het instellennum_workers = sc.defaultParallelism, waarmee het totale aantal Spark-taaksites in het cluster wordt ingesteldnum_workers. - Voor schattingen die zijn gedefinieerd in
xgboost.spark, voert de instellingnum_workers=1modeltraining uit met één Spark-taak. Dit maakt gebruik van het aantal CPU-kernen dat is opgegeven door de configuratie-instellingspark.task.cpusvan het Spark-cluster. Dit is standaard 1. Als u meer CPU-kernen wilt gebruiken om het model te trainen, verhoogtnum_workersu ofspark.task.cpus. U kunt de parameternthreadofn_jobsniet instellen voor schattingen die zijn gedefinieerd inxgboost.spark. Dit gedrag verschilt van het vorige gedrag van estimators die zijn gedefinieerd in het afgeschaftesparkdl.xgboostpakket.
Model converteren sparkdl.xgboost naar xgboost.spark model
sparkdl.xgboost modellen worden opgeslagen in een andere indeling dan xgboost.spark modellen en hebben verschillende parameterinstellingen. Gebruik de volgende functie om het model te converteren:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Als u een pyspark.ml.PipelineModel model met een sparkdl.xgboost model als laatste fase hebt, kunt u de fase van sparkdl.xgboost het model vervangen door het geconverteerde xgboost.spark model.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)