Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Treinamento distribuído de modelos XGBoost usando
Importante
Esta funcionalidade está em Pré-visualização Pública.
O pacote Python xgboost>=1.7 contém um novo módulo xgboost.spark. Este módulo inclui os estimadores xgboost.spark.SparkXGBRegressorxgboost PySpark, xgboost.spark.SparkXGBClassifiere xgboost.spark.SparkXGBRanker. Essas novas classes suportam a inclusão de estimadores XGBoost em pipelines SparkML. Para obter detalhes da API, consulte o documento da API XGBoost python spark.
Requisitos
Databricks Runtime 12.0 ML e superior.
xgboost.spark Parâmetros
Os estimadores definidos no módulo suportam a xgboost.spark maioria dos mesmos parâmetros e argumentos usados no XGBoost padrão.
- Os parâmetros para o construtor de classe,
fitmétodo epredictmétodo são em grande parte idênticos aos doxgboost.sklearnmódulo. - Nomenclatura, valores e padrões são basicamente idênticos aos descritos em parâmetros XGBoost.
- As exceções são alguns parâmetros sem suporte (como
gpu_id, ,nthreadsample_weight,eval_set) e os parâmetros específicos dopysparkestimador que foram adicionados (comofeaturesCol,labelCol,use_gpu,validationIndicatorCol). Para obter detalhes, consulte a documentação da API XGBoost Python Spark.
Preparação distribuída
Os estimadores PySpark definidos no módulo suportam treinamento xgboost.spark XGBoost distribuído usando o num_workers parâmetro. Para usar o treinamento distribuído, crie um classificador ou regressor e defina num_workers o número de tarefas simultâneas do Spark em execução durante o treinamento distribuído. Para usar todos os slots de tarefas do Spark, defina num_workers=sc.defaultParallelism.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Nota
- Você não pode usar
mlflow.xgboost.autologcom XGBoost distribuído. Para registrar um modelo xgboost Spark usando MLflow, usemlflow.spark.log_model(spark_xgb_model, artifact_path). - Não é possível usar XGBoost distribuído em um cluster que tenha o dimensionamento automático habilitado. Novos nós de trabalho que começam nesse paradigma de dimensionamento elástico não podem receber novos conjuntos de tarefas e permanecem ociosos. Para obter instruções sobre como desativar o dimensionamento automático, consulte Habilitar o dimensionamento automático.
Habilite a otimização para treinamento em conjuntos de dados de recursos esparsos
Os estimadores PySpark definidos no xgboost.spark módulo suportam otimização para treinamento em conjuntos de dados com recursos esparsos.
Para habilitar a otimização de conjuntos de recursos esparsos, você precisa fornecer um conjunto de dados para o fit método que contém uma coluna de recursos que consiste em valores do tipo pyspark.ml.linalg.SparseVector e definir o parâmetro enable_sparse_data_optim do estimador como True. Além disso, você precisa definir o missing parâmetro como 0.0.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
Treinamento de GPU
Os estimadores PySpark definidos no módulo suportam o xgboost.spark treinamento em GPUs. Defina o parâmetro use_gpu para True habilitar o treinamento da GPU.
Nota
Para cada tarefa do Spark usada no treinamento distribuído XGBoost, apenas uma GPU é usada no treinamento quando o use_gpu argumento é definido como True. O Databricks recomenda o uso do valor padrão de para a configuração 1do cluster Sparkspark.task.resource.gpu.amount. Caso contrário, as GPUs adicionais alocadas para essa tarefa do Spark ficarão ociosas.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Resolução de Problemas
Durante o treinamento de vários nós, se você encontrar uma NCCL failure: remote process exited or there was a network error mensagem, ela geralmente indica um problema com a comunicação de rede entre GPUs. Esse problema surge quando NCCL (NVIDIA Collective Communications Library) não pode usar determinadas interfaces de rede para comunicação GPU.
Para resolver, defina sparkConf do cluster como spark.executorEnv.NCCL_SOCKET_IFNAMEeth. Isso essencialmente define a variável NCCL_SOCKET_IFNAME de ambiente para eth todos os trabalhadores em um nó.
Bloco de notas de exemplo
Este bloco de anotações mostra o uso do pacote xgboost.spark Python com o Spark MLlib.
Notebook PySpark-XGBoost
Guia de migração para o módulo preterido sparkdl.xgboost
- Substitua
from sparkdl.xgboost import XgboostRegressorporfrom xgboost.spark import SparkXGBRegressore substituafrom sparkdl.xgboost import XgboostClassifierporfrom xgboost.spark import SparkXGBClassifier. - Altere todos os nomes de parâmetros no construtor do estimador de camelCase style para snake_case style. Por exemplo, altere
XgboostRegressor(featuresCol=XXX)paraSparkXGBRegressor(features_col=XXX). - Os parâmetros
use_external_storageeexternal_storage_precisionforam removidos.xgboost.sparkos estimadores usam a API de iteração de dados DMatrix para usar a memória de forma mais eficiente. Não há mais necessidade de usar o modo de armazenamento externo ineficiente. Para conjuntos de dados extremamente grandes, o Databricks recomenda que você aumente o parâmetro, onum_workersque faz com que cada tarefa de treinamento particione os dados em partições de dados menores e mais gerenciáveis. Considere a configuraçãonum_workers = sc.defaultParallelism, que definenum_workerso número total de slots de tarefas do Spark no cluster. - Para estimadores definidos no
xgboost.spark, a configuraçãonum_workers=1executa o treinamento do modelo usando uma única tarefa do Spark. Isso utiliza o número de núcleos de CPU especificado pela definiçãospark.task.cpusde configuração do cluster Spark, que é 1 por padrão. Para usar mais núcleos de CPU para treinar o modelo, aumentenum_workersouspark.task.cpus. Não é possível definir onthreadparâmetro oun_jobspara estimadores definidos emxgboost.spark. Esse comportamento é diferente do comportamento anterior dos estimadores definidos no pacote preteridosparkdl.xgboost.
Converter sparkdl.xgboost modelo em xgboost.spark modelo
sparkdl.xgboost Os modelos são salvos em um formato diferente dos xgboost.spark modelos e têm configurações de parâmetros diferentes. Use a seguinte função de utilitário para converter o modelo:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Se você tiver um pyspark.ml.PipelineModel modelo contendo um sparkdl.xgboost modelo como o último estágio, poderá substituir o estágio do sparkdl.xgboost modelo pelo modelo convertido xgboost.spark .
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)