Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Treinamento distribuído de modelos XGBoost usando o
Importante
Esse recurso está em uma versão prévia.
O pacote xgboost>=1.7 do Python contém um novo módulo xgboost.spark. Este módulo inclui os avaliadores xgboost xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifier e xgboost.spark.SparkXGBRanker do PySpark. Essas novas classes dão suporte à inclusão de avaliadores XGBoost em Pipelines do SparkML. Para saber detalhes da API, confira a documentação da API XGBoost python spark.
Requisitos
Databricks Runtime 12.0 ML e superior.
Parâmetros xgboost.spark
Os avaliadores definidos no módulo xgboost.spark dão suporte à maioria dos mesmos parâmetros e argumentos usados no XGBoost padrão.
- Os parâmetros para o construtor de classe, o método
fite o métodopredictsão praticamente idênticos aos do móduloxgboost.sklearn. - Nomenclatura, valores e padrões são praticamente idênticos aos descritos nos parâmetros do XGBoost.
- Exceções são alguns parâmetros sem suporte (como
gpu_id,nthread,sample_weight,eval_set) e os parâmetros específicos do avaliadorpysparkque foram adicionados (comofeaturesCol,labelCol,use_gpu,validationIndicatorCol). Para obter detalhes, confira a documentação da API XGBoost Python Spark.
Treinamento distribuído
Os avaliadores do PySpark definidos no módulo xgboost.spark dão suporte ao treinamento XGBoost distribuído usando o parâmetro num_workers. Para usar o treinamento distribuído, crie um classificador ou regressor e defina num_workers como o número de tarefas do Spark em execução simultâneas durante o treinamento distribuído. Para usar todos os slots de tarefa do Spark, defina num_workers=sc.defaultParallelism.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Observação
- Você não pode usar
mlflow.xgboost.autologcom XGBoost distribuído. Para registrar um modelo xgboost do Spark usando o MLflow, usemlflow.spark.log_model(spark_xgb_model, artifact_path). - Você não pode usar o XGBoost distribuído em um cluster que tenha o dimensionamento automático habilitado. Novos nós de trabalho que começam nesse paradigma de dimensionamento elástico não podem receber novos conjuntos de tarefas e permanecer ociosos. Para obter instruções de como desabilitar o dimensionamento automático, consulte Habilitar dimensionamento automático.
Habilitar a otimização para treinamento no conjunto de dados de recursos esparsos
Os Avaliadores do PySpark definidos no módulo xgboost.spark dão suporte à otimização para treinamento em conjuntos de dados com recursos esparsos.
Para habilitar a otimização de conjuntos de recursos esparsos, você precisa fornecer um conjunto de dados para o método fit que contém uma coluna de recursos que consiste em valores do tipo pyspark.ml.linalg.SparseVector e definir o parâmetro do avaliador enable_sparse_data_optim como True. Além disso, você precisa definir o parâmetro missing como 0.0.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
Treinamento de GPU
Os avaliadores do PySpark definidos no módulo xgboost.spark dão suporte ao treinamento em GPUs. Defina o parâmetro use_gpu como True para habilitar o treinamento de GPU.
Observação
Para cada tarefa do Spark usada no treinamento distribuído XGBoost, apenas uma GPU é usada no treinamento quando o argumento use_gpu é definido como True. O Databricks recomenda usar o valor padrão de 1 para a configuração spark.task.resource.gpu.amount do cluster do Spark. Caso contrário, as GPUs adicionais alocadas para essa tarefa do Spark ficarão ociosas.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Solução de problemas
Durante o treinamento de vários nós, se você encontrar uma mensagem de NCCL failure: remote process exited or there was a network error, geralmente isso indica um problema com a comunicação de rede entre GPUs. Esse problema ocorre quando a NCCL (Biblioteca de Comunicações Coletivas NVIDIA) não pode usar determinados adaptadores de rede para comunicação de GPU.
Para resolver, configure o sparkConf do cluster de forma a definir spark.executorEnv.NCCL_SOCKET_IFNAME para eth. Isso basicamente ajusta a variável de ambiente NCCL_SOCKET_IFNAME para eth em todos os trabalhos em um node.
Caderno de exemplo
Este notebook mostra o uso do pacote xgboost.spark do Python com o Spark MLlib.
Notebook PySpark-XGBoost
Guia de migração para o módulo preterido sparkdl.xgboost
- Substitua
from sparkdl.xgboost import XgboostRegressorporfrom xgboost.spark import SparkXGBRegressore substituafrom sparkdl.xgboost import XgboostClassifierporfrom xgboost.spark import SparkXGBClassifier. - Altere todos os nomes de parâmetro no construtor do avaliador do estilo camelCase para o estilo snake_case. Por exemplo, altere
XgboostRegressor(featuresCol=XXX)paraSparkXGBRegressor(features_col=XXX). - Os parâmetros
use_external_storageeexternal_storage_precisionforam removidos. Os avaliadoresxgboost.sparkusam a API de iteração de dados DMatrix para usar a memória com mais eficiência. Não é mais necessário usar o modo de armazenamento externo ineficiente. Para conjuntos de dados extremamente grandes, o Databricks recomenda que você aumente o parâmetronum_workers, fazendo com que cada tarefa de treinamento particione os dados em partições de dados menores e mais gerenciáveis. Considere a configuraçãonum_workers = sc.defaultParallelism, que definenum_workerscomo o número total de slots de tarefa do Spark no cluster. - Para avaliadores definidos no
xgboost.spark, a configuraçãonum_workers=1executa o treinamento do modelo usando uma única tarefa do Spark. Isso utiliza o número de núcleos de CPU especificados pela definiçãospark.task.cpusda configuração do cluster do Spark, que é 1 por padrão. Para usar mais núcleos de CPU para treinar o modelo, aumentenum_workersouspark.task.cpus. Não é possível definir o parâmetronthreadoun_jobspara avaliadores definidos noxgboost.spark. Esse comportamento é diferente do comportamento anterior dos avaliadores definidos no pacote preteridosparkdl.xgboost.
Converter o modelo sparkdl.xgboost em modelo xgboost.spark
Os modelos sparkdl.xgboost são salvos em um formato diferente dos modelos xgboost.spark e têm configurações de parâmetro diferentes. Use a seguinte função de utilitário para converter o modelo:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Se você tiver um modelo pyspark.ml.PipelineModel que contém um modelo sparkdl.xgboost como o último estágio, poderá substituir o estágio do modelo sparkdl.xgboost pelo modelo convertido xgboost.spark.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)