Distributed SGD (Experimental)

Ray includes an implementation of synchronous distributed stochastic gradient descent (SGD), which is competitive in performance with implementations in Horovod and Distributed TensorFlow.

Ray SGD is built on top of the Ray task and actor abstractions to provide seamless integration into existing Ray applications.

Interface

To use Ray SGD, define a model class:

class ray.experimental.sgd.Model

Your class must implement this interface to be used with Ray SGD.

This supports any form of input pipeline: it is up to you to define it using TensorFlow. For an example implementation, see tfbench/test_model.py

Then, pass a model creator function to the ray.experimental.sgd.DistributedSGD class. To drive the distributed training, sgd.step() can be called repeatedly:

model_creator = lambda worker_idx, device_idx: YourModelClass()

sgd = DistributedSGD(
    model_creator,
    num_workers=2,
    devices_per_worker=4,
    gpu=True,
    strategy="ps")

for i in range(NUM_ITERS):
    sgd.step()

Under the hood, Ray SGD will create replicas of your model onto each hardware device (GPU) allocated to workers (controlled by num_workers). Multiple devices can be managed by each worker process (controlled by devices_per_worker). Each model instance will be in a separate TF variable scope. The DistributedSGD class coordinates the distributed computation and application of gradients to improve the model.

There are two distributed SGD strategies available for use:
  • strategy="simple": Gradients are averaged centrally on the driver before being applied to each model replica. This is a reference implementation for debugging purposes.
  • strategy="ps": Gradients are computed and averaged within each node. Gradients are then averaged across nodes through a number of parameter server actors. To pipeline the computation of gradients and transmission across the network, we use a custom TensorFlow op that can read and write to the Ray object store directly.

Note that when num_workers=1, only local allreduce will be used and the choice of distributed strategy is irrelevant.

The full documentation for DistributedSGD is as follows:

class ray.experimental.sgd.DistributedSGD(model_creator, num_workers, devices_per_worker, gpu=True, strategy='ps', grad_shard_bytes=10000000, all_reduce_alg='simple')

Experimental distributed SGD implementation in Ray.

This supports two modes:
‘simple’: centralized gradient aggregation ‘ps’: sharded parameter-server implementation

To use this class, you’ll have to implement model.py:Model.

Parameters:
  • model_creator (func) – Function that returns a model given worker and device indexes as arguments. Each model replica will be created within its own variable scope.
  • num_workers (int) – Number of Ray actors to use for SGD.
  • devices_per_worker (int) – Number of GPU or CPU devices to use per worker. One model replica will be created per device.
  • gpu (bool) – Whether to use GPU devices.
  • strategy (str) – Strategy to use for distributed gradient aggregation. This only applies if num_workers > 1.
  • grad_shard_bytes (int) – Fuse gradient tensors into chunks of at most this size (if applicable).
  • all_reduce_alg (str) – TensorFlow strategy to use for gradient synchronization within the same worker (if applicable). See modified_allreduce.py for options.

Examples

>>> # Setup distributed SGD
>>> model_creator = (
...   lambda worker_idx, device_idx: YourModelClass(...))
>>> sgd = DistributedSGD(
...   model_creator, num_workers=2,
...   devices_per_worker=4, gpu=True, strategy="ps")
>>> # To train
>>> for i in range(100):
...   stats = sgd.step(fetch_stats=i % 10 == 0)
>>> # To access or update model state
>>> sgd.foreach_model(lambda model: ...)
>>> # To access or update worker state
>>> sgd.foreach_worker(lambda worker: ...)

Examples

For examples of end-to-end usage, check out the ImageNet synthetic data test and also the simple MNIST training example, which includes examples of how access the model weights and monitor accuracy as training progresses.

Performance

When using the new Ray backend (which will be enabled by default in Ray 0.6+), we expect performance competitive with other synchronous SGD implementations on 25Gbps Ethernet.

_images/sgd.png

Images per second reached when distributing the training of a ResNet-101 TensorFlow model (from the official TF benchmark). All experiments were run on p3.16xl instances connected by 25Gbps Ethernet, and workers allocated 4 GPUs per node as done in the Horovod benchmark.