Partager via


Entraînement distribué avec TorchDistributor

Cet article décrit comment effectuer un entraînement distribué sur des modèles ML PyTorch à l'aide de TorchDistributor.

TorchDistributor est un module open source de PySpark qui aide les utilisateurs à effectuer des entraînements distribués avec PyTorch sur leurs clusters Spark. Il vous permet ainsi de lancer des tâches d'entraînement PyTorch en tant que tâches Spark. En arrière-plan, il initialise l'environnement et les canaux de communication entre les workers et utilise la commande CLI torch.distributed.run pour exécuter l'entraînement distribué sur les nœuds Worker.

L'API TorchDistributor prend en charge les méthodes présentées dans le tableau suivant.

Méthode et signature Descriptif
init(self, num_processes, local_mode, use_gpu) Créez une instance de TorchDistributor.
run(self, main, *args) Exécute l'entraînement distribué en appelant main(**kwargs) si main est une fonction et exécute la commande CLI torchrun main *args si main est un chemin de fichier.

Spécifications

  • Spark 3.4
  • Databricks Runtime 13.0 ML ou ultérieur

Workflow de développement pour les notebooks

Si le processus de création et d'entraînement du modèle se déroule entièrement à partir d'un notebook sur votre ordinateur local ou d'un notebook Databricks, vous n'avez qu'à apporter quelques modifications mineures pour que votre code soit prêt pour l'entraînement distribué.

  1. Préparer le code à nœud unique : préparez et testez le code à nœud unique avec PyTorch, PyTorch Lightning ou d'autres frameworks basés sur PyTorch/PyTorch Lightning, tels que l'API HuggingFace Trainer.

  2. Préparer le code pour l'entraînement distribué standard : vous devez convertir votre entraînement à processus unique en entraînement distribué. Intégrez tout ce code distribué dans une seule fonction d'entraînement que vous pouvez utiliser avec TorchDistributor.

  3. Déplacer des importations dans la fonction d'entraînement : ajoutez les importations nécessaires, telles que import torch, dans la fonction d'entraînement. Cela vous permet d'éviter les erreurs courantes liées au pickling. De plus, le device_id auquel les modèles et les données sont liés est déterminé par :

    device_id = int(os.environ["LOCAL_RANK"])
    
  4. Lancer l'entraînement distribué : instanciez le TorchDistributor avec les paramètres souhaités et appelez .run(*args) pour lancer l'entraînement.

Voici un exemple de code d'entraînement :

from pyspark.ml.torch.distributor import TorchDistributor

def train(learning_rate, use_gpu):
  import torch
  import torch.distributed as dist
  import torch.nn.parallel.DistributedDataParallel as DDP
  from torch.utils.data import DistributedSampler, DataLoader

  backend = "nccl" if use_gpu else "gloo"
  dist.init_process_group(backend)
  device = int(os.environ["LOCAL_RANK"]) if use_gpu  else "cpu"
  model = DDP(createModel(), **kwargs)
  sampler = DistributedSampler(dataset)
  loader = DataLoader(dataset, sampler=sampler)

  output = train(model, loader, learning_rate)
  dist.cleanup()
  return output

distributor = TorchDistributor(num_processes=2, local_mode=False, use_gpu=True)
distributor.run(train, 1e-3, True)

Migrer l'entraînement à partir de référentiels externes

Si vous disposez d'une procédure d'entraînement distribué existante stockée dans un référentiel externe, vous pouvez facilement migrer vers Azure Databricks en procédant comme suit :

  1. Importer le référentiel : importez le référentiel externe en tant que dossier Git Databricks.
  2. Créer un nouveau notebook Initialisez un nouveau notebook Azure Databricks dans le référentiel.
  3. Lancer l'entraînement distribué Dans une cellule de notebook, appelez TorchDistributor comme suit :
from pyspark.ml.torch.distributor import TorchDistributor

train_file = "/path/to/train.py"
args = ["--learning_rate=0.001", "--batch_size=16"]
distributor = TorchDistributor(num_processes=2, local_mode=False, use_gpu=True)
distributor.run(train_file, *args)

Résolution des problèmes

Une erreur courante dans le workflow de notebook est que les objets ne peuvent pas être trouvés ou conservés lors de l'exécution d'un entraînement distribué. Cela peut se produire lorsque les instructions d'importation de bibliothèque ne sont pas distribuées à d'autres exécuteurs.

Pour éviter ce problème, incluez toutes les déclarations d'importation (par exemple, import torch) à la fois en haut de la fonction d'entraînement appelée avec TorchDistributor(...).run(<func>) et à l'intérieur de toute autre fonction définie par l'utilisateur appelée dans la méthode d'entraînement.

Échec NCCL : ncclInternalError: Internal check failed.

Lorsque cette erreur survient pendant l'entraînement sur plusieurs nœuds, cela indique généralement un problème de communication réseau entre les GPU. Ce problème survient lorsque la bibliothèque NCCL (NVIDIA Collective Communications Library) ne peut pas utiliser certaines interfaces réseau pour la communication GPU.

Pour résoudre cette erreur, ajoutez l'extrait de code suivant dans votre code d'entraînement pour utiliser l'interface réseau principale.

import os
os.environ["NCCL_SOCKET_IFNAME"] = "eth0"

Échec Gloo : RuntimeError: Connection refused

Vous pouvez potentiellement rencontrer cette erreur lors de l’utilisation de Gloo pour l’entraînement distribué sur les instances de processeur. Pour résoudre cette erreur, ajoutez l’extrait de code suivant dans votre code d’entraînement :

import os
os.environ["GLOO_SOCKET_IFNAME"] = "eth0"

Exemples de notebooks

Les exemples de notebooks suivants montrent comment effectuer un entraînement distribué avec PyTorch.

Entraînement distribué de bout en bout sur un notebook Databricks

Obtenir le notebook

Optimisation distribuée d'un notebook de modèle Hugging Face

Obtenir le notebook

Entraînement distribué sur un notebook de fichier PyTorch

Obtenir le notebook

Entraînement distribué à l'aide d'un notebook PyTorch Lightning

Obtenir le notebook