Signal API (Experimental)¶
This experimental API allows tasks and actors to generate signals which can be received by other tasks and actors. In addition, task failures and actor method failures generate error signals. The error signals enable applications to detect failures and potentially recover from failures.
The signal has a unique identifier that is computed from (1) the id of the actor or task sending this signal (i.e., the actor or task calling this function), and (2) an index that is incremented every time this source sends a signal. This index starts from 1.
Parameters: signal – Signal to be sent.
Here is a simple example of a remote function that sends a user-defined signal.
import ray.experimental.signal as signal # Define an application level signal. class UserSignal(signal.Signal): def __init__(self, value): self.value = value def get_value(self): return self.value # Define a remote function that sends a user-defined signal. @ray.remote def send_signal(value): signal.send(UserSignal(value))
Get all outstanding signals from sources.
A source can be either (1) an object ID returned by the task (we want to receive signals from), or (2) an actor handle.
When invoked by the same entity E (where E can be an actor, task or driver), for each source S in sources, this function returns all signals generated by S since the last receive() was invoked by E on S. If this is the first call on S, this function returns all past signals generated by S so far. Note that different actors, tasks or drivers that call receive() on the same source S will get independent copies of the signals generated by S.
- sources – List of sources from which the caller waits for signals. A source is either an object ID returned by a task (in this case the object ID is used to identify that task), or an actor handle. If the user passes the IDs of multiple objects returned by the same task, this function returns a copy of the signals generated by that task for each object ID.
- timeout – Maximum time (in seconds) this function waits to get a signal from a source in sources. If None, the timeout is infinite.
- A list of pairs (S, sig), where S is a source in the sources argument,
and sig is a signal generated by S since the last time receive() was called on S. Thus, for each S in sources, the return list can contain zero or multiple entries.
Here is a simple example of how to receive signals from an actor or task identified
a. Note that an actor is identified by its handle, and a task by one of its
object ID return values.
import ray.experimental.signal as signal # This returns a possibly empty list of all signals that have been sent by 'a' # since the last invocation of signal.receive from within this process. If 'a' # did not send any signals, then this will wait for up to 10 seconds to receive # a signal from 'a'. signal_list = signal.receive([a], timeout=10)
Reset the worker state associated with any signals that this worker has received so far.
If the worker calls receive() on a source next, it will get all the signals generated by that source starting with index = 1.
Example: sending a user signal¶
The code below show a simple example in which a task, called
sends a user signal and the driver gets it by invoking
import ray.experimental.signal as signal # Define a user signal. class UserSignal(signal.Signal): def __init__(self, value): self.value = value def get_value(self): return self.value @ray.remote def send_signal(value): signal.send(UserSignal(value)) return signal_value = 'simple signal' object_id = send_signal.remote(signal_value) # Wait up to 10sec to receive a signal from the task. Note the task is # identified by the object_id it returns. result_list = signal.receive([object_id], timeout=10) # Print signal values. This should print "simple_signal". # Note that result_list is the signal we expect from the task. # The signal is a tuple where the first element is the first object ID # returned by the task and the second element is the signal object. print(result_list.get_value())
Example: Getting an error signals¶
This is a simple example in which a driver gets an error signal caused
by the failure of
@ray.remote def task(): raise Exception('exception message') object_id = task.remote() try: ray.get(object_id) except Exception as e: pass finally: result_list = signal.receive([object_id], timeout=10) # Expected signal is 'ErrorSignal'. assert type(result_list) == signal.ErrorSignal # Print the error. print(result_list.get_error())
Example: Sending signals between multiple actors¶
This is a more involved example in which two actors
generate five signals, and another actor
b waits to receive all signals
a2, respectively. Note that
b recursively calls
its own method
get_signals() until it gets all signals it expects.
@ray.remote class ActorSendSignals(object): def send_signals(self, value, count): for i in range(count): signal.send(UserSignal(value + str(i))) @ray.remote class ActorGetAllSignals(object): def __init__(self, num_expected_signals, *source_ids): self.received_signals =  self.num_expected_signals = num_expected_signals self.source_ids = source_ids def register_handle(self, handle): self.this_actor = handle def get_signals(self): new_signals = signal.receive(self.source_ids, timeout=10) self.received_signals.extend(new_signals) if len(self.received_signals) < self.num_expected_signals: self.this_actor.get_signals.remote() def get_count(self): return len(self.received_signals) # Create two actors to send signals. a1 = ActorSendSignals.remote() a2 = ActorSendSignals.remote() signal_value = 'simple signal' count = 5 # Each actor sends five signals. a1.send_signals.remote(signal_value, count) a2.send_signals.remote(signal_value, count) # Create an actor that waits for all five signals sent by each actor. b = ActorGetAllSignals.remote(2 * count, *[a1, a2]) # Provide actor to its own handle, so it can recursively call itself # to get all signals from a1, and a2, respectively. This enables the actor # execute other methods if needed. ray.get(b.register_handle.remote(b)) b.get_signals.remote() # Print total number of signals. This should be 2*count = 10. print(ray.get(b.get_count.remote()))