RaySGD Pytorch

Warning

This is still an experimental API and is subject to change in the near future.

Tip

Help us make RaySGD better; take this 1 minute User Survey!

Ray’s PyTorchTrainer simplifies distributed model training for PyTorch. The PyTorchTrainer is a wrapper around torch.distributed.launch with a Python API to easily incorporate distributed training into a larger Python application, as opposed to needing to execute training outside of Python.


With Ray:

Wrap your training with this:

ray.init(args.address)

trainer1 = PyTorchTrainer(
    model_creator,
    data_creator,
    optimizer_creator,
    loss_creator,
    num_replicas=<NUM_GPUS_YOU_HAVE> * <NUM_NODES>,
    use_gpu=True,
    batch_size=512,
    backend="nccl")

stats = trainer1.train()
print(stats)
trainer1.shutdown()
print("success!")

Then, start a Ray cluster via autoscaler or manually.

ray up CLUSTER.yaml
python train.py --address="localhost:<PORT>"

Before, with Pytorch:

In your training program, insert the following:

torch.distributed.init_process_group(backend='YOUR BACKEND',
                                     init_method='env://')

model = torch.nn.parallel.DistributedDataParallel(model,
                                                  device_ids=[arg.local_rank],
                                                  output_device=arg.local_rank)

Then, separately, on each machine:

# Node 1: *(IP: 192.168.1.1, and has a free port: 1234)*
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=4 --node_rank=0 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
# Node 2:
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=4 --node_rank=1 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
# Node 3:
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=4 --node_rank=2 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
# Node 4:
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=4 --node_rank=3 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

PyTorchTrainer Example

Below is an example of using Ray’s PyTorchTrainer. Under the hood, PytorchTrainer will create replicas of your model (controlled by num_replicas) which are each managed by a worker.

import argparse
import numpy as np
import torch
import torch.nn as nn
from torch import distributed
from torch.utils.data.distributed import DistributedSampler

from ray.experimental.sgd.pytorch.pytorch_trainer import PyTorchTrainer


class LinearDataset(torch.utils.data.Dataset):
    """y = a * x + b"""

    def __init__(self, a, b, size=1000):
        x = np.arange(0, 10, 10 / size, dtype=np.float32)
        self.x = torch.from_numpy(x)
        self.y = torch.from_numpy(a * x + b)

    def __getitem__(self, index):
        return self.x[index, None], self.y[index, None]

    def __len__(self):
        return len(self.x)


def model_creator(config):
    return nn.Linear(1, 1)


def optimizer_creator(model, config):
    """Returns optimizer."""
    return torch.optim.SGD(model.parameters(), lr=1e-2)


def data_creator(batch_size, config):
    """Returns training dataloader, validation dataloader."""
    train_dataset = LinearDataset(2, 5)
    validation_dataset = LinearDataset(2, 5, size=400)

    train_sampler = None
    if distributed.is_initialized():
        train_sampler = DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=(train_sampler is None),
        sampler=train_sampler)

    validation_sampler = None
    if distributed.is_initialized():
        validation_sampler = DistributedSampler(validation_dataset)
    validation_loader = torch.utils.data.DataLoader(
        validation_dataset,
        batch_size=batch_size,
        shuffle=(validation_sampler is None),
        sampler=validation_sampler)

    return train_loader, validation_loader


def train_example(num_replicas=1, use_gpu=False):
    trainer1 = PyTorchTrainer(
        model_creator,
        data_creator,
        optimizer_creator,
        loss_creator=lambda config: nn.MSELoss(),
        num_replicas=num_replicas,
        use_gpu=use_gpu,
        batch_size=num_replicas * 4,
        backend="gloo")
    for i in range(5):
        stats = trainer1.train()
        print(stats)

    print(trainer1.validate())
    m = trainer1.get_model()
    print("trained weight: % .2f, bias: % .2f" % (
        m.weight.item(), m.bias.item()))
    trainer1.shutdown()
    print("success!")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--address",
        required=False,
        type=str,
        help="the address to use for Ray")
    parser.add_argument(
        "--num-replicas",
        "-n",
        type=int,
        default=1,
        help="Sets number of replicas for training.")
    parser.add_argument(
        "--use-gpu",
        action="store_true",
        default=False,
        help="Enables GPU training")
    parser.add_argument(
        "--tune", action="store_true", default=False, help="Tune training")

    args, _ = parser.parse_known_args()

    import ray

    ray.init(address=args.address)
    train_example(num_replicas=args.num_replicas, use_gpu=args.use_gpu)

Hyperparameter Optimization on Distributed Pytorch

PyTorchTrainer naturally integrates with Tune via the PyTorchTrainable interface. The same arguments to PyTorchTrainer should be passed into the tune.run(config=...) as shown below.

import numpy as np
import torch
import torch.nn as nn
from torch import distributed
from torch.utils.data.distributed import DistributedSampler

import ray
from ray import tune
from ray.experimental.sgd.pytorch.pytorch_trainer import PyTorchTrainable


class LinearDataset(torch.utils.data.Dataset):
    """y = a * x + b"""

    def __init__(self, a, b, size=1000):
        x = np.random.random(size).astype(np.float32) * 10
        x = np.arange(0, 10, 10 / size, dtype=np.float32)
        self.x = torch.from_numpy(x)
        self.y = torch.from_numpy(a * x + b)

    def __getitem__(self, index):
        return self.x[index, None], self.y[index, None]

    def __len__(self):
        return len(self.x)


def model_creator(config):
    return nn.Linear(1, 1)


def optimizer_creator(model, config):
    """Returns optimizer."""
    return torch.optim.SGD(model.parameters(), lr=config.get("lr", 1e-4))


def data_creator(batch_size, config):
    """Returns training dataloader, validation dataloader."""
    train_dataset = LinearDataset(2, 5)
    validation_dataset = LinearDataset(2, 5, size=400)

    train_sampler = None
    if distributed.is_initialized():
        train_sampler = DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=(train_sampler is None),
        sampler=train_sampler)

    validation_sampler = None
    if distributed.is_initialized():
        validation_sampler = DistributedSampler(validation_dataset)
    validation_loader = torch.utils.data.DataLoader(
        validation_dataset,
        batch_size=batch_size,
        shuffle=(validation_sampler is None),
        sampler=validation_sampler)
    return train_loader, validation_loader


def tune_example(num_replicas=1, use_gpu=False):
    config = {
        "model_creator": tune.function(model_creator),
        "data_creator": tune.function(data_creator),
        "optimizer_creator": tune.function(optimizer_creator),
        "loss_creator": tune.function(lambda config: nn.MSELoss()),
        "num_replicas": num_replicas,
        "use_gpu": use_gpu,
        "batch_size": 512,
        "backend": "gloo"
    }

    analysis = tune.run(
        PyTorchTrainable,
        num_samples=12,
        config=config,
        stop={"training_iteration": 2},
        verbose=1)

    return analysis.get_best_config(metric="validation_loss", mode="min")


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--address",
        type=str,
        help="the address to use for Ray")
    parser.add_argument(
        "--num-replicas",
        "-n",
        type=int,
        default=1,
        help="Sets number of replicas for training.")
    parser.add_argument(
        "--use-gpu",
        action="store_true",
        default=False,
        help="Enables GPU training")
    parser.add_argument(
        "--tune", action="store_true", default=False, help="Tune training")

    args, _ = parser.parse_known_args()

    ray.init(address=args.address)
    tune_example(num_replicas=args.num_replicas, use_gpu=args.use_gpu)

Package Reference

class ray.experimental.sgd.pytorch.PyTorchTrainer(model_creator, data_creator, optimizer_creator, loss_creator, train_function=None, validation_function=None, initialization_hook=None, config=None, num_replicas=1, use_gpu=False, batch_size=16, backend='auto')[source]

Train a PyTorch model using distributed PyTorch.

Launches a set of actors which connect via distributed PyTorch and coordinate gradient updates to train the provided model.

__init__(model_creator, data_creator, optimizer_creator, loss_creator, train_function=None, validation_function=None, initialization_hook=None, config=None, num_replicas=1, use_gpu=False, batch_size=16, backend='auto')[source]

Sets up the PyTorch trainer.

Parameters:
  • model_creator (dict -> torch.nn.Module) – creates the model using the config.
  • data_creator (int, dict -> DataLoader, DataLoader) – Function that takes in (batch_size, config) and returns two Torch DataLoader objects.
  • optimizer_creator (torch.nn.Module, dict -> optimizer) – creates the loss and optimizer using the model and the config.
  • loss_creator (dict -> loss) – Creates the loss function/criterion using the config.
  • train_function – Trains a model for a epoch. This takes in ( model, train_dataloader, criterion, optimizer, config), and returns a dict of training stats.
  • validation_function – Runs validation. This takes in ( model, val_dataloader, criterion, config) and returns a dict of validation stats.
  • config (dict) – configuration passed to “model_creator”, “data_creator”, “optimizer_creator”, and “loss_creator”.
  • num_replicas (int) – the number of workers used in distributed training.
  • use_gpu (bool) – Sets resource allocation for workers to 1 GPU if true.
  • batch_size (int) – batch size for an update.
  • backend (string) – backend used by distributed PyTorch.
train(max_retries=10, checkpoint='auto')[source]

Runs a training epoch.

Runs an average over all values returned from workers. Set max_retries to enable fault handling in case of instance preemption.

Parameters:
  • max_retries (int) – Must be non-negative. If set to N, will kill all current workers, query the Ray global state for total available resources, and re-launch up to the available resources. Behavior is not well-defined in case of shared cluster usage.
  • checkpoint (str) – Path to checkpoint to restore from if retrying. If max_retries is set and checkpoint == “auto”, PyTorchTrainer will save a checkpoint before starting to train.
validate()[source]

Evaluates the model on the validation data set.

get_model()[source]

Returns the learned model(s).

save(checkpoint)[source]

Saves the model(s) to the provided checkpoint.

Parameters:checkpoint (str) – Path to target checkpoint file.
restore(checkpoint)[source]

Restores the model from the provided checkpoint.

Parameters:checkpoint (str) – Path to target checkpoint file.
shutdown(force=False)[source]

Shuts down workers and releases resources.

class ray.experimental.sgd.pytorch.PyTorchTrainable(config=None, logger_creator=None)[source]
classmethod default_resource_request(config)[source]

Returns the resource requirement for the given configuration.

This can be overriden by sub-classes to set the correct trial resource allocation, so the user does not need to.

Example

>>> def default_resource_request(cls, config):
>>>     return Resources(
>>>         cpu=0,
>>>         gpu=0,
>>>         extra_cpu=config["workers"],
>>>         extra_gpu=int(config["use_gpu"]) * config["workers"])