Distributed Training (Experimental)

Ray’s PyTorchTrainer simplifies distributed model training for PyTorch. The PyTorchTrainer is a wrapper around torch.distributed.launch with a Python API to easily incorporate distributed training into a larger Python application, as opposed to needing to execute training outside of Python.


With Ray:

Wrap your training with this:

ray.init(args.address)

trainer1 = PyTorchTrainer(
    model_creator,
    data_creator,
    optimizer_creator,
    loss_creator,
    num_replicas=<NUM_GPUS_YOU_HAVE> * <NUM_NODES>,
    use_gpu=True,
    batch_size=512,
    backend="nccl")

stats = trainer1.train()
print(stats)
trainer1.shutdown()
print("success!")

Then, start a Ray cluster via autoscaler or manually.

ray up CLUSTER.yaml
python train.py --address="localhost:<PORT>"

Before, with Pytorch:

In your training program, insert the following:

torch.distributed.init_process_group(backend='YOUR BACKEND',
                                     init_method='env://')

model = torch.nn.parallel.DistributedDataParallel(model,
                                                  device_ids=[arg.local_rank],
                                                  output_device=arg.local_rank)

Then, separately, on each machine:

# Node 1: *(IP: 192.168.1.1, and has a free port: 1234)*
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=4 --node_rank=0 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
# Node 2:
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=4 --node_rank=1 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
# Node 3:
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=4 --node_rank=2 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
# Node 4:
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=4 --node_rank=3 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

PyTorchTrainer Example

Below is an example of using Ray’s PyTorchTrainer. Under the hood, PytorchTrainer will create replicas of your model (controlled by num_replicas) which are each managed by a worker.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import numpy as np
import torch
import torch.nn as nn
from torch import distributed
from torch.utils.data.distributed import DistributedSampler

from ray.experimental.sgd.pytorch.pytorch_trainer import PyTorchTrainer


class LinearDataset(torch.utils.data.Dataset):
    """y = a * x + b"""

    def __init__(self, a, b, size=1000):
        x = np.random.random(size).astype(np.float32) * 10
        x = np.arange(0, 10, 10 / size, dtype=np.float32)
        self.x = torch.from_numpy(x)
        self.y = torch.from_numpy(a * x + b)

    def __getitem__(self, index):
        return self.x[index, None], self.y[index, None]

    def __len__(self):
        return len(self.x)


def model_creator(config):
    return nn.Linear(1, 1)


def optimizer_creator(model, config):
    """Returns optimizer."""
    return torch.optim.SGD(model.parameters(), lr=1e-4)


def data_creator(batch_size, config):
    """Returns training dataloader, validation dataloader."""
    train_dataset = LinearDataset(2, 5)
    validation_dataset = LinearDataset(2, 5, size=400)

    train_sampler = None
    if distributed.is_initialized():
        train_sampler = DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=(train_sampler is None),
        sampler=train_sampler)

    validation_sampler = None
    if distributed.is_initialized():
        validation_sampler = DistributedSampler(validation_dataset)
    validation_loader = torch.utils.data.DataLoader(
        validation_dataset,
        batch_size=batch_size,
        shuffle=(validation_sampler is None),
        sampler=validation_sampler)

    return train_loader, validation_loader


def train_example(num_replicas=1, use_gpu=False):
    trainer1 = PyTorchTrainer(
        model_creator,
        data_creator,
        optimizer_creator,
        loss_creator=lambda config: nn.MSELoss(),
        num_replicas=num_replicas,
        use_gpu=use_gpu,
        batch_size=512,
        backend="gloo")
    trainer1.train()
    trainer1.shutdown()
    print("success!")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--redis-address",
        required=False,
        type=str,
        help="the address to use for Redis")
    parser.add_argument(
        "--num-replicas",
        "-n",
        type=int,
        default=1,
        help="Sets number of replicas for training.")
    parser.add_argument(
        "--use-gpu",
        action="store_true",
        default=False,
        help="Enables GPU training")
    parser.add_argument(
        "--tune", action="store_true", default=False, help="Tune training")

    args, _ = parser.parse_known_args()

    import ray

    ray.init(redis_address=args.redis_address)
    train_example(num_replicas=args.num_replicas, use_gpu=args.use_gpu)

Hyperparameter Optimization on Distributed Pytorch

PyTorchTrainer naturally integrates with Tune via the PyTorchTrainable interface. The same arguments to PyTorchTrainer should be passed into the tune.run(config=...) as shown below.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import torch
import torch.nn as nn
from torch import distributed
from torch.utils.data.distributed import DistributedSampler

import ray
from ray import tune
from ray.experimental.sgd.pytorch.pytorch_trainer import PyTorchTrainable


class LinearDataset(torch.utils.data.Dataset):
    """y = a * x + b"""

    def __init__(self, a, b, size=1000):
        x = np.random.random(size).astype(np.float32) * 10
        x = np.arange(0, 10, 10 / size, dtype=np.float32)
        self.x = torch.from_numpy(x)
        self.y = torch.from_numpy(a * x + b)

    def __getitem__(self, index):
        return self.x[index, None], self.y[index, None]

    def __len__(self):
        return len(self.x)


def model_creator(config):
    return nn.Linear(1, 1)


def optimizer_creator(model, config):
    """Returns optimizer."""
    return torch.optim.SGD(model.parameters(), lr=config.get("lr", 1e-4))


def data_creator(batch_size, config):
    """Returns training dataloader, validation dataloader."""
    train_dataset = LinearDataset(2, 5)
    validation_dataset = LinearDataset(2, 5, size=400)

    train_sampler = None
    if distributed.is_initialized():
        train_sampler = DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=(train_sampler is None),
        sampler=train_sampler)

    validation_sampler = None
    if distributed.is_initialized():
        validation_sampler = DistributedSampler(validation_dataset)
    validation_loader = torch.utils.data.DataLoader(
        validation_dataset,
        batch_size=batch_size,
        shuffle=(validation_sampler is None),
        sampler=validation_sampler)
    return train_loader, validation_loader


def tune_example(num_replicas=1, use_gpu=False):
    config = {
        "model_creator": tune.function(model_creator),
        "data_creator": tune.function(data_creator),
        "optimizer_creator": tune.function(optimizer_creator),
        "loss_creator": tune.function(lambda config: nn.MSELoss()),
        "num_replicas": num_replicas,
        "use_gpu": use_gpu,
        "batch_size": 512,
        "backend": "gloo"
    }

    analysis = tune.run(
        PyTorchTrainable,
        num_samples=12,
        config=config,
        stop={"training_iteration": 2},
        verbose=1)

    return analysis.get_best_config(metric="validation_loss", mode="min")


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--redis-address",
        type=str,
        help="the address to use for Redis")
    parser.add_argument(
        "--num-replicas",
        "-n",
        type=int,
        default=1,
        help="Sets number of replicas for training.")
    parser.add_argument(
        "--use-gpu",
        action="store_true",
        default=False,
        help="Enables GPU training")
    parser.add_argument(
        "--tune", action="store_true", default=False, help="Tune training")

    args, _ = parser.parse_known_args()

    ray.init(redis_address=args.redis_address)
    tune_example(num_replicas=args.num_replicas, use_gpu=args.use_gpu)

Package Reference

class ray.experimental.sgd.pytorch.PyTorchTrainer(model_creator, data_creator, optimizer_creator, loss_creator, train_function=None, validation_function=None, initialization_hook=None, config=None, num_replicas=1, use_gpu=False, batch_size=16, backend='auto')[source]

Train a PyTorch model using distributed PyTorch.

Launches a set of actors which connect via distributed PyTorch and coordinate gradient updates to train the provided model.

__init__(model_creator, data_creator, optimizer_creator, loss_creator, train_function=None, validation_function=None, initialization_hook=None, config=None, num_replicas=1, use_gpu=False, batch_size=16, backend='auto')[source]

Sets up the PyTorch trainer.

Parameters:
  • model_creator (dict -> torch.nn.Module) – creates the model using the config.
  • data_creator (int, dict -> DataLoader, DataLoader) – Function that takes in (batch_size, config) and returns two Torch DataLoader objects.
  • optimizer_creator (torch.nn.Module, dict -> optimizer) – creates the loss and optimizer using the model and the config.
  • loss_creator (dict -> loss) – Creates the loss function/criterion using the config.
  • train_function – Trains a model for a epoch. This takes in ( model, train_dataloader, criterion, optimizer, config), and returns a dict of training stats.
  • validation_function – Runs validation. This takes in ( model, val_dataloader, criterion, config) and returns a dict of validation stats.
  • config (dict) – configuration passed to “model_creator”, “data_creator”, “optimizer_creator”, and “loss_creator”.
  • num_replicas (int) – the number of workers used in distributed training.
  • use_gpu (bool) – Sets resource allocation for workers to 1 GPU if true.
  • batch_size (int) – batch size for an update.
  • backend (string) – backend used by distributed PyTorch.
train()[source]

Runs a training epoch.

validate()[source]

Evaluates the model on the validation data set.

get_model()[source]

Returns the learned model.

save(checkpoint)[source]

Saves the model at the provided checkpoint.

Parameters:checkpoint (str) – Path to target checkpoint file.
restore(checkpoint)[source]

Restores the model from the provided checkpoint.

Parameters:checkpoint (str) – Path to target checkpoint file.
shutdown()[source]

Shuts down workers and releases resources.

class ray.experimental.sgd.pytorch.PyTorchTrainable(config=None, logger_creator=None)[source]
classmethod default_resource_request(config)[source]

Returns the resource requirement for the given configuration.

This can be overriden by sub-classes to set the correct trial resource allocation, so the user does not need to.

Example

>>> def default_resource_request(cls, config):
>>>     return Resources(
>>>         cpu=0,
>>>         gpu=0,
>>>         extra_cpu=config["workers"],
>>>         extra_gpu=int(config["use_gpu"]) * config["workers"])