Walkthrough

This walkthrough will overview the core concepts of Ray:

  1. Using remote functions (tasks) [ray.remote]
  2. Fetching results (object IDs) [ray.put, ray.get, ray.wait]
  3. Using remote classes (actors) [ray.remote]

With Ray, your code will work on a single machine and can be easily scaled to a large cluster. To run this walkthrough, install Ray with pip install -U ray.

import ray

# Start Ray. If you're connecting to an existing cluster, you would use
# ray.init(address=<cluster-address>) instead.
ray.init()

See the Configuration documentation for the various ways to configure Ray. To start a multi-node Ray cluster, see the cluster setup page. You can stop ray by calling ray.shutdown(). To check if Ray is initialized, you can call ray.is_initialized().

Remote functions (Tasks)

Ray enables arbitrary Python functions to be executed asynchronously. These asynchronous Ray functions are called “remote functions”. The standard way to turn a Python function into a remote function is to add the @ray.remote decorator. Here is an example.

# A regular Python function.
def regular_function():
    return 1

# A Ray remote function.
@ray.remote
def remote_function():
    return 1

This causes a few changes in behavior:

  1. Invocation: The regular version is called with regular_function(), whereas the remote version is called with remote_function.remote().
  2. Return values: regular_function immediately executes and returns 1, whereas remote_function immediately returns an object ID (a future) and then creates a task that will be executed on a worker process. The result can be retrieved with ray.get.
assert regular_function() == 1

object_id = remote_function.remote()

# The value of the original `regular_function`
assert ray.get(object_id) == 1
  1. Parallelism: Invocations of regular_function happen serially, for example

    # These happen serially.
    for _ in range(4):
        regular_function()
    

    whereas invocations of remote_function happen in parallel, for example

    # These happen in parallel.
    for _ in range(4):
        remote_function.remote()
    

See the ray.remote package reference page for specific documentation on how to use ray.remote.

Object IDs can also be passed into remote functions. When the function actually gets executed, the argument will be a retrieved as a regular Python object. For example, take this function:

@ray.remote
def remote_chain_function(value):
    return value + 1


y1_id = remote_function.remote()
assert ray.get(y1_id) == 1

chained_id = remote_chain_function.remote(y1_id)
assert ray.get(chained_id) == 2

Note the following behaviors:

  • The second task will not be executed until the first task has finished executing because the second task depends on the output of the first task.
  • If the two tasks are scheduled on different machines, the output of the first task (the value corresponding to y1_id) will be sent over the network to the machine where the second task is scheduled.

Oftentimes, you may want to specify a task’s resource requirements (for example one task may require a GPU). The ray.init() command will automatically detect the available GPUs and CPUs on the machine. However, you can override this default behavior by passing in specific resources, e.g., ray.init(num_cpus=8, num_gpus=4, resources={'Custom': 2}).

To specify a task’s CPU and GPU requirements, pass the num_cpus and num_gpus arguments into the remote decorator. The task will only run on a machine if there are enough CPU and GPU (and other custom) resources available to execute the task. Ray can also handle arbitrary custom resources.

Note

  • If you do not specify any resources in the @ray.remote decorator, the default is 1 CPU resource and no other resources.
  • If specifying CPUs, Ray does not enforce isolation (i.e., your task is expected to honor its request).
  • If specifying GPUs, Ray does provide isolation in forms of visible devices (setting the environment variable CUDA_VISIBLE_DEVICES), but it is the task’s responsibility to actually use the GPUs (e.g., through a deep learning framework like TensorFlow or PyTorch).
@ray.remote(num_cpus=4, num_gpus=2)
def f():
    return 1

The resource requirements of a task have implications for the Ray’s scheduling concurrency. In particular, the sum of the resource requirements of all of the concurrently executing tasks on a given node cannot exceed the node’s total resources.

Below are more examples of resource specifications:

# Ray also supports fractional resource requirements
@ray.remote(num_gpus=0.5)
def h():
    return 1

# Ray support custom resources too.
@ray.remote(resources={'Custom': 1})
def f():
    return 1

Further, remote functions can return multiple object IDs.

@ray.remote(num_return_vals=3)
def return_multiple():
    return 1, 2, 3

a_id, b_id, c_id = return_multiple.remote()

Objects in Ray

In Ray, we can create and compute on objects. We refer to these objects as remote objects, and we use object IDs to refer to them. Remote objects are stored in shared-memory object stores, and there is one object store per node in the cluster. In the cluster setting, we may not actually know which machine each object lives on.

An object ID is essentially a unique ID that can be used to refer to a remote object. If you’re familiar with futures, our object IDs are conceptually similar.

Object IDs can be created in multiple ways.

  1. They are returned by remote function calls.
  2. They are returned by ray.put.
y = 1
object_id = ray.put(y)
ray.put(value, weakref=False)[source]

Store an object in the object store.

The object may not be evicted while a reference to the returned ID exists. Note that this pinning only applies to the particular object ID returned by put, not object IDs in general.

Parameters:
  • value – The Python object to be stored.
  • weakref – If set, allows the object to be evicted while a reference to the returned ID exists. You might want to set this if putting a lot of objects that you might not need in the future.
Returns:

The object ID assigned to this value.

Important

Remote objects are immutable. That is, their values cannot be changed after creation. This allows remote objects to be replicated in multiple object stores without needing to synchronize the copies.

Fetching Results

The command ray.get(x_id, timeout=None) takes an object ID and creates a Python object from the corresponding remote object. First, if the current node’s object store does not contain the object, the object is downloaded. Then, if the object is a numpy array or a collection of numpy arrays, the get call is zero-copy and returns arrays backed by shared object store memory. Otherwise, we deserialize the object data into a Python object.

y = 1
obj_id = ray.put(y)
assert ray.get(obj_id) == 1

You can also set a timeout to return early from a get that’s blocking for too long.

from ray.exceptions import RayTimeoutException

@ray.remote
def long_running_function()
    time.sleep(8)

obj_id = long_running_function.remote()
try:
    ray.get(obj_id, timeout=4)
except RayTimeoutError:
    print("`get` timed out.")
ray.get(object_ids, timeout=None)[source]

Get a remote object or a list of remote objects from the object store.

This method blocks until the object corresponding to the object ID is available in the local object store. If this object is not in the local object store, it will be shipped from an object store that has it (once the object has been created). If object_ids is a list, then the objects corresponding to each object in the list will be returned.

Parameters:
  • object_ids – Object ID of the object to get or a list of object IDs to get.
  • timeout (float) – The maximum amount of time in seconds to wait before returning.
Returns:

A Python object or a list of Python objects.

Raises:
  • RayTimeoutError – A RayTimeoutError is raised if a timeout is set and the get takes longer than timeout to return.
  • Exception – An exception is raised if the task that created the object or that created one of the objects raised an exception.

After launching a number of tasks, you may want to know which ones have finished executing. This can be done with ray.wait. The function works as follows.

ready_ids, remaining_ids = ray.wait(object_ids, num_returns=1, timeout=None)
ray.wait(object_ids, num_returns=1, timeout=None)[source]

Return a list of IDs that are ready and a list of IDs that are not.

Warning

The timeout argument used to be in milliseconds (up through ray==0.6.1) and now it is in seconds.

If timeout is set, the function returns either when the requested number of IDs are ready or when the timeout is reached, whichever occurs first. If it is not set, the function simply waits until that number of objects is ready and returns that exact number of object IDs.

This method returns two lists. The first list consists of object IDs that correspond to objects that are available in the object store. The second list corresponds to the rest of the object IDs (which may or may not be ready).

Ordering of the input list of object IDs is preserved. That is, if A precedes B in the input list, and both are in the ready list, then A will precede B in the ready list. This also holds true if A and B are both in the remaining list.

Parameters:
  • object_ids (List[ObjectID]) – List of object IDs for objects that may or may not be ready. Note that these IDs must be unique.
  • num_returns (int) – The number of object IDs that should be returned.
  • timeout (float) – The maximum amount of time in seconds to wait before returning.
Returns:

A list of object IDs that are ready and a list of the remaining object IDs.

Object Eviction

When the object store gets full, objects will be evicted to make room for new objects. This happens in approximate LRU (least recently used) order. To avoid objects from being evicted, you can call ray.get and store their values instead. Numpy array objects cannot be evicted while they are mapped in any Python process. You can also configure memory limits to control object store usage by actors.

Note

Objects created with ray.put are pinned in memory while a Python reference to the object ID returned by the put exists. This only applies to the specific ID returned by put, not IDs in general or copies of that IDs.

Remote Classes (Actors)

Actors extend the Ray API from functions (tasks) to classes. The ray.remote decorator indicates that instances of the Counter class will be actors. An actor is essentially a stateful worker. Each actor runs in its own Python process.

@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

To create a couple actors, we can instantiate this class as follows:

a1 = Counter.remote()
a2 = Counter.remote()

When an actor is instantiated, the following events happen.

  1. A worker Python process is started on a node of the cluster.
  2. A Counter object is instantiated on that worker.

You can specify resource requirements in Actors too (see the Actors section for more details.)

@ray.remote(num_cpus=2, num_gpus=0.5)
class Actor(object):
    pass

We can interact with the actor by calling its methods with the .remote operator. We can then call ray.get on the object ID to retrieve the actual value.

obj_id = a1.increment.remote()
ray.get(obj_id) == 1

Methods called on different actors can execute in parallel, and methods called on the same actor are executed serially in the order that they are called. Methods on the same actor will share state with one another, as shown below.

# Create ten Counter actors.
counters = [Counter.remote() for _ in range(10)]

# Increment each Counter once and get the results. These tasks all happen in
# parallel.
results = ray.get([c.increment.remote() for c in counters])
print(results)  # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

# Increment the first Counter five times. These tasks are executed serially
# and share state.
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)  # prints [2, 3, 4, 5, 6]

To learn more about Ray Actors, see the Actors section.