Distributed Training (Experimental)

Ray’s PyTorchTrainer simplifies distributed model training for PyTorch. The PyTorchTrainer is a wrapper around torch.distributed.launch with a Python API to easily incorporate distributed training into a larger Python application, as opposed to needing to execute training outside of Python.


With Ray:

Wrap your training with this:

ray.init(args.address)
trainer1 = PyTorchTrainer(
    model_creator,
    data_creator,
    optimizer_creator,
    num_replicas=<NUM_GPUS_YOU_HAVE> * <NUM_NODES>,
    use_gpu=True,
    batch_size=512,
    backend="gloo")

trainer1.train()

Then, start a Ray cluster via autoscaler or manually.

ray up CLUSTER.yaml
python train.py --address="localhost:<PORT>"

Before, with Pytorch:

In your training program, insert the following:

torch.distributed.init_process_group(backend='YOUR BACKEND',
                                     init_method='env://')

model = torch.nn.parallel.DistributedDataParallel(model,
                                                  device_ids=[arg.local_rank],
                                                  output_device=arg.local_rank)

Then, separately, on each machine:

# Node 1: *(IP: 192.168.1.1, and has a free port: 1234)*
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=4 --node_rank=0 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
# Node 2:
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=4 --node_rank=1 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
# Node 3:
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=4 --node_rank=2 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
# Node 4:
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=4 --node_rank=3 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

PyTorchTrainer Example

Below is an example of using Ray’s PyTorchTrainer. Under the hood, PytorchTrainer will create replicas of your model (controlled by num_replicas) which are each managed by a worker.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import numpy as np
import torch
import torch.nn as nn

from ray.experimental.sgd.pytorch.pytorch_trainer import PyTorchTrainer


class LinearDataset(torch.utils.data.Dataset):
    """y = a * x + b"""

    def __init__(self, a, b, size=1000):
        x = np.random.random(size).astype(np.float32) * 10
        x = np.arange(0, 10, 10 / size, dtype=np.float32)
        self.x = torch.from_numpy(x)
        self.y = torch.from_numpy(a * x + b)

    def __getitem__(self, index):
        return self.x[index, None], self.y[index, None]

    def __len__(self):
        return len(self.x)


def model_creator(config):
    return nn.Linear(1, 1)


def optimizer_creator(model, config):
    """Returns criterion, optimizer"""
    criterion = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-4)
    return criterion, optimizer


def data_creator(config):
    """Returns training set, validation set"""
    return LinearDataset(2, 5), LinearDataset(2, 5, size=400)


def train_example(num_replicas=1, use_gpu=False):
    trainer1 = PyTorchTrainer(
        model_creator,
        data_creator,
        optimizer_creator,
        num_replicas=num_replicas,
        use_gpu=use_gpu,
        batch_size=512,
        backend="gloo")
    trainer1.train()
    trainer1.shutdown()
    print("success!")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--redis-address",
        required=False,
        type=str,
        help="the address to use for Redis")
    parser.add_argument(
        "--num-replicas",
        "-n",
        type=int,
        default=1,
        help="Sets number of replicas for training.")
    parser.add_argument(
        "--use-gpu",
        action="store_true",
        default=False,
        help="Enables GPU training")
    parser.add_argument(
        "--tune", action="store_true", default=False, help="Tune training")

    args, _ = parser.parse_known_args()

    import ray

    ray.init(redis_address=args.redis_address)
    train_example(num_replicas=args.num_replicas, use_gpu=args.use_gpu)

Hyperparameter Optimization on Distributed Pytorch

PyTorchTrainer naturally integrates with Tune via the PyTorchTrainable interface. The same arguments to PyTorchTrainer should be passed into the tune.run(config=...) as shown below.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import torch
import torch.nn as nn

import ray
from ray import tune
from ray.experimental.sgd.pytorch.pytorch_trainer import PyTorchTrainable


class LinearDataset(torch.utils.data.Dataset):
    """y = a * x + b"""

    def __init__(self, a, b, size=1000):
        x = np.random.random(size).astype(np.float32) * 10
        x = np.arange(0, 10, 10 / size, dtype=np.float32)
        self.x = torch.from_numpy(x)
        self.y = torch.from_numpy(a * x + b)

    def __getitem__(self, index):
        return self.x[index, None], self.y[index, None]

    def __len__(self):
        return len(self.x)


def model_creator(config):
    return nn.Linear(1, 1)


def optimizer_creator(model, config):
    """Returns criterion, optimizer"""
    criterion = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=config.get("lr", 1e-4))
    return criterion, optimizer


def data_creator(config):
    """Returns training set, validation set"""
    return LinearDataset(2, 5), LinearDataset(2, 5, size=400)


def tune_example(num_replicas=1, use_gpu=False):
    config = {
        "model_creator": tune.function(model_creator),
        "data_creator": tune.function(data_creator),
        "optimizer_creator": tune.function(optimizer_creator),
        "num_replicas": num_replicas,
        "use_gpu": use_gpu,
        "batch_size": 512,
        "backend": "gloo"
    }

    analysis = tune.run(
        PyTorchTrainable,
        num_samples=12,
        config=config,
        stop={"training_iteration": 2},
        verbose=1)

    return analysis.get_best_config(metric="validation_loss", mode="min")


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--redis-address",
        type=str,
        help="the address to use for Redis")
    parser.add_argument(
        "--num-replicas",
        "-n",
        type=int,
        default=1,
        help="Sets number of replicas for training.")
    parser.add_argument(
        "--use-gpu",
        action="store_true",
        default=False,
        help="Enables GPU training")
    parser.add_argument(
        "--tune", action="store_true", default=False, help="Tune training")

    args, _ = parser.parse_known_args()

    ray.init(redis_address=args.redis_address)
    tune_example(num_replicas=args.num_replicas, use_gpu=args.use_gpu)

Package Reference

class ray.experimental.sgd.pytorch.PyTorchTrainer(model_creator, data_creator, optimizer_creator=<function sgd_mse_optimizer>, config=None, num_replicas=1, use_gpu=False, batch_size=16, backend='auto')[source]

Train a PyTorch model using distributed PyTorch.

Launches a set of actors which connect via distributed PyTorch and coordinate gradient updates to train the provided model.

__init__(model_creator, data_creator, optimizer_creator=<function sgd_mse_optimizer>, config=None, num_replicas=1, use_gpu=False, batch_size=16, backend='auto')[source]

Sets up the PyTorch trainer.

Parameters:
  • model_creator (dict -> torch.nn.Module) – creates the model using the config.
  • data_creator (dict -> Dataset, Dataset) – creates the training and validation data sets using the config.
  • optimizer_creator (torch.nn.Module, dict -> loss, optimizer) – creates the loss and optimizer using the model and the config.
  • config (dict) – configuration passed to ‘model_creator’, ‘data_creator’, and ‘optimizer_creator’.
  • num_replicas (int) – the number of workers used in distributed training.
  • use_gpu (bool) – Sets resource allocation for workers to 1 GPU if true.
  • batch_size (int) – batch size for an update.
  • backend (string) – backend used by distributed PyTorch.
train()[source]

Runs a training epoch.

validate()[source]

Evaluates the model on the validation data set.

get_model()[source]

Returns the learned model.

save(checkpoint)[source]

Saves the model at the provided checkpoint.

Parameters:checkpoint (str) – Path to target checkpoint file.
restore(checkpoint)[source]

Restores the model from the provided checkpoint.

Parameters:checkpoint (str) – Path to target checkpoint file.
shutdown()[source]

Shuts down workers and releases resources.

class ray.experimental.sgd.pytorch.PyTorchTrainable(config=None, logger_creator=None)[source]
classmethod default_resource_request(config)[source]

Returns the resource requirement for the given configuration.

This can be overriden by sub-classes to set the correct trial resource allocation, so the user does not need to.

Example

>>> def default_resource_request(cls, config):
        return Resources(
            cpu=0,
            gpu=0,
            extra_cpu=config["workers"],
            extra_gpu=int(config["use_gpu"]) * config["workers"])