Signal API (Experimental)

This experimental API allows tasks and actors to generate signals which can be received by other tasks and actors. In addition, task failures and actor method failures generate error signals. The error signals enable applications to detect failures and potentially recover from failures.

ray.experimental.signal.send(signal)

Send signal.

The signal has a unique identifier that is computed from (1) the id of the actor or task sending this signal (i.e., the actor or task calling this function), and (2) an index that is incremented every time this source sends a signal. This index starts from 1.

Parameters:signal – Signal to be sent.

Here is a simple example of a remote function that sends a user-defined signal.

import ray.experimental.signal as signal

# Define an application level signal.
class UserSignal(signal.Signal):
    def __init__(self, value):
          self.value = value

    def get_value(self):
          return self.value

# Define a remote function that sends a user-defined signal.
@ray.remote
def send_signal(value):
    signal.send(UserSignal(value))
ray.experimental.signal.receive(sources, timeout=None)

Get all outstanding signals from sources.

A source can be either (1) an object ID returned by the task (we want to receive signals from), or (2) an actor handle.

When invoked by the same entity E (where E can be an actor, task or driver), for each source S in sources, this function returns all signals generated by S since the last receive() was invoked by E on S. If this is the first call on S, this function returns all past signals generated by S so far. Note that different actors, tasks or drivers that call receive() on the same source S will get independent copies of the signals generated by S.

Parameters:
  • sources – List of sources from which the caller waits for signals. A source is either an object ID returned by a task (in this case the object ID is used to identify that task), or an actor handle. If the user passes the IDs of multiple objects returned by the same task, this function returns a copy of the signals generated by that task for each object ID.
  • timeout – Maximum time (in seconds) this function waits to get a signal from a source in sources. If None, the timeout is infinite.
Returns:

A list of pairs (S, sig), where S is a source in the sources argument,

and sig is a signal generated by S since the last time receive() was called on S. Thus, for each S in sources, the return list can contain zero or multiple entries.

Here is a simple example of how to receive signals from an actor or task identified by a. Note that an actor is identified by its handle, and a task by one of its object ID return values.

import ray.experimental.signal as signal

# This returns a possibly empty list of all signals that have been sent by 'a'
# since the last invocation of signal.receive from within this process. If 'a'
# did not send any signals, then this will wait for up to 10 seconds to receive
# a signal from 'a'.
signal_list = signal.receive([a], timeout=10)
ray.experimental.signal.reset()

Reset the worker state associated with any signals that this worker has received so far.

If the worker calls receive() on a source next, it will get all the signals generated by that source starting with index = 1.

Example: sending a user signal

The code below show a simple example in which a task, called send_signal() sends a user signal and the driver gets it by invoking signal.receive().

import ray.experimental.signal as signal

# Define a user signal.
class UserSignal(signal.Signal):
    def __init__(self, value):
          self.value = value

    def get_value(self):
          return self.value

@ray.remote
def send_signal(value):
    signal.send(UserSignal(value))
    return

signal_value = 'simple signal'
object_id = send_signal.remote(signal_value)
# Wait up to 10sec to receive a signal from the task. Note the task is
# identified by the object_id it returns.
result_list = signal.receive([object_id], timeout=10)
# Print signal values. This should print "simple_signal".
# Note that result_list[0] is the signal we expect from the task.
# The signal is a tuple where the first element is the first object ID
# returned by the task and the second element is the signal object.
print(result_list[0][1].get_value())

Example: Getting an error signals

This is a simple example in which a driver gets an error signal caused by the failure of task().

@ray.remote
def task():
    raise Exception('exception message')

object_id = task.remote()
try:
    ray.get(object_id)
except Exception as e:
    pass
finally:
    result_list = signal.receive([object_id], timeout=10)
    # Expected signal is 'ErrorSignal'.
    assert type(result_list[0][1]) == signal.ErrorSignal
    # Print the error.
    print(result_list[0][1].get_error())

Example: Sending signals between multiple actors

This is a more involved example in which two actors a1 and a2 each generate five signals, and another actor b waits to receive all signals generated by a1 and a2, respectively. Note that b recursively calls its own method get_signals() until it gets all signals it expects.

@ray.remote
class ActorSendSignals(object):
    def send_signals(self, value, count):
        for i in range(count):
            signal.send(UserSignal(value + str(i)))

@ray.remote
class ActorGetAllSignals(object):
    def __init__(self, num_expected_signals, *source_ids):
        self.received_signals = []
        self.num_expected_signals = num_expected_signals
        self.source_ids = source_ids

    def register_handle(self, handle):
        self.this_actor = handle

    def get_signals(self):
        new_signals = signal.receive(self.source_ids, timeout=10)
        self.received_signals.extend(new_signals)
        if len(self.received_signals) < self.num_expected_signals:
            self.this_actor.get_signals.remote()

    def get_count(self):
        return len(self.received_signals)

# Create two actors to send signals.
a1 = ActorSendSignals.remote()
a2 = ActorSendSignals.remote()
signal_value = 'simple signal'
count = 5
# Each actor sends five signals.
a1.send_signals.remote(signal_value, count)
a2.send_signals.remote(signal_value, count)

# Create an actor that waits for all five signals sent by each actor.
b = ActorGetAllSignals.remote(2 * count, *[a1, a2])
# Provide actor to its own handle, so it can recursively call itself
# to get all signals from a1, and a2, respectively. This enables the actor
# execute other methods if needed.
ray.get(b.register_handle.remote(b))
b.get_signals.remote()
# Print total number of signals. This should be 2*count = 10.
print(ray.get(b.get_count.remote()))

Note

A failed actor (e.g., an actor that crashed) generates an error message only when another actor or task invokes one of its methods.

Please let us know any issues you encounter.