重要
この機能はパブリック プレビュー段階にあります。
Python パッケージ xgboost>=1.7 には、新しいモジュール xgboost.spark が含まれています。 このモジュールには、xgboost PySpark 推定器 xgboost.spark.SparkXGBRegressor、xgboost.spark.SparkXGBClassifier および xgboost.spark.SparkXGBRanker が含まれています。 これらの新しいクラスは、SparkML パイプラインに XGBoost 推定器を含めることをサポートします。 API の詳細については、XGBoost Python Spark API ドキュメントを参照してください。
必要条件
Databricks Runtime 12.0 ML 以降。
xgboost.spark パラメーター
xgboost.spark モジュールで定義されている推定器は、標準の XGBoost で使用されるのと同じパラメーターと引数のほとんどをサポートしています。
- クラス コンストラクター、
fitメソッド、およびpredictメソッドのパラメーターは、xgboost.sklearnモジュールのものとほとんど同じです。 - 名前付け、値、既定値は、「XGBoost パラメーター」で説明されているものとほぼ同じです。
- 例外は、サポートされていないいくつかのパラメーター (
gpu_id、nthread、sample_weight、eval_setなど) と、追加されたpyspark推定器固有のパラメーター (featuresCol、labelCol、use_gpu、validationIndicatorColなど) です。 詳細については、XGBoost Python Spark API ドキュメントを参照してください。
分散トレーニング
xgboost.spark モジュールで定義されている PySpark 推定器は、num_workers パラメーターを使用した分散 XGBoost トレーニングをサポートします。 分散トレーニングを使用するには、分類子またはリグレッサーを作成し、分散トレーニング中に同時に実行される Spark タスクの数を num_workers に設定します。 すべての Spark タスク スロットを使用するには、num_workers=sc.defaultParallelism を設定します。
次に例を示します。
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
注意
- 分散 XGBoost では
mlflow.xgboost.autologを使用できません。 MLflow を使用して xgboost Spark モデルをログに記録するには、mlflow.spark.log_model(spark_xgb_model, artifact_path)を使用します。 - 自動スケールが有効になっているクラスターで分散 XGBoost を使用することはできません。 このエラスティック スケーリング パラダイムで開始される新しいワーカー ノードは、新しいタスク セットを受け取ることができず、アイドル状態のままになります。 自動スケールを無効にする手順については、自動スケールを有効にする を参照してください。
スパース特徴データセットのトレーニングの最適化を有効にする
xgboost.spark モジュールで定義されている PySpark 推定器は、スパース特徴を持つデータセットに対するトレーニングの最適化をサポートします。
スパース特徴セットの最適化を有効にするには、型 fit の値で構成される特徴列を含むデータセットを pyspark.ml.linalg.SparseVector メソッドに指定し、推定器パラメーター enable_sparse_data_optim を True に設定する必要があります。 さらに、missing パラメーターを 0.0 に設定する必要があります。
次に例を示します。
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU トレーニング
xgboost.spark モジュールで定義されている PySpark 推定器は、GPU のトレーニングをサポートします。 GPU トレーニングを有効にするには、パラメーター use_gpu を True に設定します。
注意
XGBoost 分散トレーニングで使用される Spark タスクごとに、use_gpu 引数が True に設定されている場合、トレーニングで使用される GPU は 1 つだけです。 Databricks では、Spark クラスター構成 1 に既定値の spark.task.resource.gpu.amount を使用することを推奨しています。 それ以外の場合、この Spark タスクに割り当てられた追加の GPU はアイドル状態となります。
次に例を示します。
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
トラブルシューティング
マルチノード トレーニング中に NCCL failure: remote process exited or there was a network error メッセージが表示された場合、通常、GPU 間のネットワーク通信に問題があることを示します。 この問題は、NCCL (NVIDIA Collective Communications Library) が GPU 通信に特定のネットワーク インターフェイスを使用できない場合に発生します。
解決するには、クラスターの sparkConf for spark.executorEnv.NCCL_SOCKET_IFNAME を eth に設定します。 これにより、基本的に環境変数 NCCL_SOCKET_IFNAME がノード内のすべてのワーカーに対して eth に設定されます。
ノートブックの例
このノートブックは、Spark MLlib での Python パッケージ xgboost.spark の使用を示しています。
PySpark-XGBoost ノートブック
非推奨の sparkdl.xgboost モジュールの移行ガイド
-
from sparkdl.xgboost import XgboostRegressorをfrom xgboost.spark import SparkXGBRegressorに置き換え、from sparkdl.xgboost import XgboostClassifierをfrom xgboost.spark import SparkXGBClassifierに置き換えます。 - 推定器コンストラクターのすべてのパラメーター名を camelCase スタイルから snake_case スタイルに変更します。 たとえば、
XgboostRegressor(featuresCol=XXX)をSparkXGBRegressor(features_col=XXX)に変更します。 - パラメーター
use_external_storageとexternal_storage_precisionが削除されました。xgboost.spark推定器では、DMatrix データ イテレーション API を使用してメモリをより効率的に使用することができます。 非効率的な外部ストレージ モードを使用する必要がなくなりました。 非常に大規模なデータセットの場合、Databricks ではnum_workersパラメーターを増やすことを推奨しています。これにより、各トレーニング タスクでデータが小さく、管理しやすいデータ パーティションにパーティション分割されます。 クラスター内の Spark タスク スロットの合計数にnum_workers = sc.defaultParallelismを設定するnum_workersを設定することを検討してください。 -
xgboost.sparkで定義されている推定器に対してnum_workers=1を設定すると、1 つの Spark タスクでモデル トレーニングが実行されます。 これにより、Spark クラスター構成設定spark.task.cpusで指定された CPU コア数 (既定では 1) が使用されます。 より多くの CPU コアを使用してモデルをトレーニングするには、num_workersまたはspark.task.cpusを増やします。nthreadで定義されている推定器に対してn_jobsまたはxgboost.sparkパラメーターを設定することはできません。 この動作は、非推奨のsparkdl.xgboostパッケージで定義されている推定器の以前の動作とは異なります。
sparkdl.xgboost モデルを xgboost.spark モデルに変換する
sparkdl.xgboost モデルは、xgboost.spark モデルとは異なる形式で保存され、パラメーター設定が異なります。 モデルを変換するには、次のユーティリティ関数を使用します。
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
pyspark.ml.PipelineModel モデルを最後のステージとして含む sparkdl.xgboost モデルがある場合は、sparkdl.xgboost モデルのステージを変換された xgboost.spark モデルに置き換えることができます。
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)