使用
重要
這項功能處於公開預覽狀態。
Python 套件 xgboost>=1.7 包含新的模組 xgboost.spark。 本模組包含 xgboost PySpark 估算器 xgboost.spark.SparkXGBRegressor、xgboost.spark.SparkXGBClassifier 和 xgboost.spark.SparkXGBRanker。 這些新類別支援在 SparkML 管線中支援包含 XGBoost 估算器。 如需 API 詳細資料,請參閱 XGBoost Python Spark API 文件 (英文)。
需求
Databricks Runtime 12.0 ML 和更新版本。
xgboost.spark 參數
xgboost.spark 模組中定義的估算器支援標準 XGBoost 中使用的大部分相同參數和引數。
- 類別建構函式、
fit方法和predict方法的參數,大部分會與xgboost.sklearn模組中的參數完全相同。 - 命名、值和預設值大多與 XGBoost 參數中所述的相同。
- 例外狀況是一些不受支援的參數 (例如
gpu_id、nthread、sample_weight、eval_set) 和已新增的pyspark估算器特定參數 (例如featuresCol、labelCol、use_gpu、validationIndicatorCol)。 如需詳細資料,請參閱 XGBoost Python Spark API 文件 (英文)。
分散式訓練
xgboost.spark 模組中定義的 PySpark 估算器支援使用 num_workers 參數的分散式 XGBoost 訓練。 若要使用分散式訓練,請建立分類器或迴歸輸入變數,並將 num_workers 設定為分散式訓練期間並行執行的 Spark 工作數量。 若要使用所有 Spark 工作位置,請設定 num_workers=sc.defaultParallelism。
例如:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
注意
- 您無法使用
mlflow.xgboost.autolog搭配分散式 XGBoost。 若要使用 MLflow 記錄 xgboost Spark 模型,請使用mlflow.spark.log_model(spark_xgb_model, artifact_path)。 - 您無法在已啟用自動調整功能的叢集上使用分散式 XGBoost。 在此彈性調整範例中啟動的新工作者節點無法接收新的工作集,並維持閒置。 如需停用自動調整的指示,請參閱啟用自動調整 (英文)。
啟用疏鬆功能資料集訓練的最佳化
xgboost.spark 模組中定義的 PySpark 估算器支援針對具有疏鬆功能之資料集的培訓最佳化。
若要啟用疏鬆功能集的最佳化,您必須提供資料集給包含類型值 fit 之功能資料行的資料集給 pyspark.ml.linalg.SparseVector 方法,並將估算器參數 enable_sparse_data_optim 設定為 True。 此外,您必須將 missing 參數設定為 0.0。
例如:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU 訓練
xgboost.spark 模組中定義的 PySpark 估算器支援 GPU 訓練。 將參數 use_gpu 設定為 True 以啟用 GPU 訓練。
注意
針對 XGBoost 分散式訓練中使用的每個 Spark 工作,use_gpu 引數設定為 True 時,只會在訓練中使用一個 GPU。 Databricks 建議針對 Spark 叢集組態 1 使用預設值 spark.task.resource.gpu.amount。 否則,配置給此 Spark 工作的其他 GPU 會閒置。
例如:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
疑難排解
在多節點訓練期間,如果您收到 NCCL failure: remote process exited or there was a network error 訊息,通常表示 GPU 之間的網路通訊發生問題。 NCCL (NVIDIA 集體通訊程式庫) 無法使用特定網路介面進行 GPU 通訊時,就會發生此問題。
若要解決,請將 叢集的sparkConf spark.executorEnv.NCCL_SOCKET_IFNAME 設定為 eth。 這基本上會將節點中所有工作者的環境變數 NCCL_SOCKET_IFNAME 設定為 eth。
範例筆記本
此 Notebook 顯示搭配 Spark MLlib 使用 Python 套件 xgboost.spark。
PySpark-XGBoost Notebook
已取代 sparkdl.xgboost 模組的移轉指南
- 將
from sparkdl.xgboost import XgboostRegressor取代為from xgboost.spark import SparkXGBRegressor,並將from sparkdl.xgboost import XgboostClassifier取代為from xgboost.spark import SparkXGBClassifier。 - 將估算器建構函式中的所有參數名稱從 camelCase 樣式變更為 snake_case 樣式。 例如,將
XgboostRegressor(featuresCol=XXX)變更為SparkXGBRegressor(features_col=XXX)。 - 參數
use_external_storage和external_storage_precision已移除。xgboost.spark估算器會使用 DMatrix 資料反覆項目 API,以更有效率地使用記憶體。 不再需要使用效率不佳的外部儲存模式。 對於極大型資料集,Databricks 建議您增加num_workers參數,讓每個訓練工作將資料分割成更小、更容易管理的資料分割區。 請考慮設定num_workers = sc.defaultParallelism,這會將num_workers設定為叢集中 Spark 工作位置的總數。 - 針對
xgboost.spark中定義的估算器,設定num_workers=1會使用單一 Spark 工作執行模型訓練。 這會利用 Spark 叢集組態設定spark.task.cpus所指定的 CPU 核心數目,預設為 1。 若要使用更多 CPU 核心來訓練模型,請增加num_workers或spark.task.cpus。 您無法為nthread中定義的估算器設定n_jobs或xgboost.spark參數。 此行為與已取代之sparkdl.xgboost套件中定義的估算器先前行為不同。
將 sparkdl.xgboost 模型轉換成 xgboost.spark 模型
sparkdl.xgboost 模型會以不同於 xgboost.spark 模型的格式儲存,而且有不同的參數設定。 使用下列公用程式函數來轉換模型:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
如果您有包含 pyspark.ml.PipelineModel 模型作為最後一個階段的 sparkdl.xgboost 模型,就可以將 sparkdl.xgboost 模型的階段取代為已轉換的 xgboost.spark 模型。
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)