Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Obciążenie rozproszone można uruchomić na wielu procesorach GPU — w jednym węźle lub w wielu węzłach — przy użyciu bezserwerowego interfejsu API języka Python procesora GPU. Interfejs API udostępnia prosty, ujednolicony interfejs, który usuwa szczegóły dotyczące dostarczania GPU, konfiguracji środowiska i dystrybucji obciążeń. Dzięki minimalnym zmianom kodu można bezproblemowo przejść z nauczania przy użyciu pojedynczego GPU do dystrybuowanego wykonywania na zdalnych procesorach GPU z tego samego środowiska notebooka.
Szybki start
Bezserwerowy interfejs API GPU do rozproszonego trenowania jest wstępnie zainstalowany w bezserwerowych środowiskach obliczeniowych GPU dla notatników usługi Databricks. Zalecamy środowisko procesora GPU 4 i nowsze. Aby użyć go do trenowania rozproszonego, zaimportuj i użyj dekoratora distributed do dystrybucji funkcji trenowania.
Poniższy fragment kodu przedstawia podstawowe użycie elementu @distributed:
# Import the distributed decorator
from serverless_gpu import distributed
# Decorate your training function with @distributed and specify the number of GPUs, the GPU type,
# and whether or not the GPUs are remote
@distributed(gpus=8, gpu_type='A10', remote=True)
def run_train():
...
Poniżej znajduje się pełny przykład, który trenuje model perceptronu wielowarstwowego (MLP) na 8 węzłach GPU A10 z notesu.
Skonfiguruj model i zdefiniuj funkcje narzędzi.
# Define the model import os import torch import torch.distributed as dist import torch.nn as nn def setup(): dist.init_process_group("nccl") torch.cuda.set_device(int(os.environ["LOCAL_RANK"])) def cleanup(): dist.destroy_process_group() class SimpleMLP(nn.Module): def __init__(self, input_dim=10, hidden_dim=64, output_dim=1): super().__init__() self.net = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.ReLU(), nn.Dropout(0.2), nn.Linear(hidden_dim, hidden_dim), nn.ReLU(), nn.Dropout(0.2), nn.Linear(hidden_dim, output_dim) ) def forward(self, x): return self.net(x)Zaimportuj bibliotekę serverless_gpu i moduł rozproszony .
import serverless_gpu from serverless_gpu import distributedUmieść kod trenowania modelu w funkcji i udekoruj funkcję dekoratorem
@distributed.@distributed(gpus=8, gpu_type='A10', remote=True) def run_train(num_epochs: int, batch_size: int) -> None: import mlflow import torch.optim as optim from torch.nn.parallel import DistributedDataParallel as DDP from torch.utils.data import DataLoader, DistributedSampler, TensorDataset # 1. Set up multi node environment setup() device = torch.device(f"cuda:{int(os.environ['LOCAL_RANK'])}") # 2. Apply the Torch distributed data parallel (DDP) library for data-parellel training. model = SimpleMLP().to(device) model = DDP(model, device_ids=[device]) # 3. Create and load dataset. x = torch.randn(5000, 10) y = torch.randn(5000, 1) dataset = TensorDataset(x, y) sampler = DistributedSampler(dataset) dataloader = DataLoader(dataset, sampler=sampler, batch_size=batch_size) # 4. Define the training loop. optimizer = optim.Adam(model.parameters(), lr=0.001) loss_fn = nn.MSELoss() for epoch in range(num_epochs): sampler.set_epoch(epoch) model.train() total_loss = 0.0 for step, (xb, yb) in enumerate(dataloader): xb, yb = xb.to(device), yb.to(device) optimizer.zero_grad() loss = loss_fn(model(xb), yb) # Log loss to MLflow metric mlflow.log_metric("loss", loss.item(), step=step) loss.backward() optimizer.step() total_loss += loss.item() * xb.size(0) mlflow.log_metric("total_loss", total_loss) print(f"Total loss for epoch {epoch}: {total_loss}") cleanup()Wykonaj trenowanie rozproszone, wywołując funkcję rozproszoną z argumentami zdefiniowanymi przez użytkownika.
run_train.distributed(num_epochs=3, batch_size=1)Po wykonaniu link do uruchomienia MLflow zostanie wygenerowany w danych wyjściowych komórki notatnika. Kliknij link przebiegu MLflow lub znajdź go w panelu Eksperyment, aby zobaczyć wyniki przebiegu.
Szczegóły wykonania rozproszonego
Bezserwerowy interfejs API procesora GPU składa się z kilku kluczowych składników:
- Menedżer zasobów obliczeniowych: obsługuje alokację zasobów i zarządzanie nimi
- Środowisko uruchomieniowe: zarządza środowiskami i zależnościami języka Python
- Program uruchamiający: orkiestruje wykonywanie i monitorowanie zadań
W przypadku uruchamiania w trybie rozproszonym:
- Funkcja jest serializowana i dystrybuowana w określonej liczbie procesorów GPU
- Każdy procesor GPU uruchamia kopię funkcji z tymi samymi parametrami
- Środowisko jest synchronizowane we wszystkich węzłach
- Wyniki są zbierane i zwracane ze wszystkich procesorów GPU
Jeśli remote jest ustawione na True, obciążenie jest rozdzielane na zdalne procesory GPU. Jeśli remote jest ustawione na False, obciążenie uruchamia się na pojedynczym węźle GPU połączonym z bieżącym notebookiem. Jeśli węzeł ma wiele układów GPU, wszystkie z nich zostaną wykorzystane.
Interfejs API obsługuje popularne biblioteki do trenowania równoległego, takie jak Distributed Data Parallel (DDP), Fully Sharded Data Parallel (FSDP), DeepSpeed i Ray.
Bardziej realistyczne scenariusze trenowania rozproszonego można znaleźć przy użyciu różnych bibliotek w przykładach z notesów.
Uruchamianie za pomocą raya
Bezserwerowy interfejs API GPU obsługuje również uruchamianie trenowania rozproszonego z wykorzystaniem Ray oraz dekoratora @ray_launch, który opiera się na warstwie @distributed.
Każde ray_launch zadanie najpierw inicjuje torch.distributed rendezvous, aby wybrać głównego pracownika Ray i zebrać adresy IP. Rank-zero uruchamia się ray start --head (z eksportem metryk, jeśli włączono), ustawia RAY_ADDRESS i uruchamia ozdobioną funkcję jako sterownik Ray. Inne węzły łączą się za pomocą ray start --address polecenia i czekają, aż sterownik zapisze znacznik ukończenia.
Dodatkowe szczegóły konfiguracji:
- Aby włączyć zbieranie metryk systemowych Ray na każdym węźle, użyj
RayMetricsMonitorzremote=True. - Zdefiniuj opcje środowiska uruchomieniowego Ray (aktory, zestawy danych, grupy umieszczania i planowania) przy użyciu standardowych interfejsów API Ray wewnątrz funkcji dekorowanej.
- Zarządzanie ustawieniami obejmującymi cały klaster (liczba i typ jednostki GPU, tryb zdalny vs lokalny, zachowanie asynchroniczne, oraz zmienne środowiskowe puli Databricks) poza funkcją w argumentach dekoratora lub środowisku notebooka.
W poniższym przykładzie pokazano, jak używać polecenia @ray_launch:
from serverless_gpu.ray import ray_launch
@ray_launch(gpus=16, remote=True, gpu_type='A10')
def foo():
import os
import ray
print(ray.state.available_resources_per_node())
return 1
foo.distributed()
Pełny przykład można znaleźć w tym notatniku, który uruchamia Ray w celu wytrenowania sieci neuronowej ResNet18 na wielu układach GPU A10.
FAQs
Gdzie należy umieścić kod ładowania danych?
Podczas korzystania z Serverless GPU API do rozproszonego trenowania, umieść kod ładowania danych wewnątrz dekoratora @distributed. Rozmiar zestawu danych może przekraczać maksymalny rozmiar dozwolony przez pickle, dlatego zaleca się wygenerować zestaw danych wewnątrz dekoratora, co przedstawiono poniżej.
from serverless_gpu import distributed
# this may cause pickle error
dataset = get_dataset(file_path)
@distributed(gpus=8, remote=True)
def run_train():
# good practice
dataset = get_dataset(file_path)
....
Czy mogę używać zarezerwowanych pul procesorów GPU?
Jeśli zarezerwowana pula GPU jest dostępna (proszę sprawdzić u administratora) w Twoim obszarze roboczym i określisz remote do True w dekoratorze @distributed, obciążenie zostanie domyślnie uruchomione w zarezerwowanej puli GPU. Jeśli chcesz użyć puli procesorów GPU na żądanie, ustaw zmienną środowiskową DATABRICKS_USE_RESERVED_GPU_POOL na False przed wywołaniem funkcji rozproszonej, jak pokazano poniżej:
import os
os.environ['DATABRICKS_USE_RESERVED_GPU_POOL'] = 'False'
@distributed(gpus=8, remote=True)
def run_train():
...
Dowiedz się więcej
Aby zapoznać się z dokumentacją, sprawdź dokumentację bezserwerowego interfejsu API GPU w języku Python.