The Ray API

ray.init(redis_address=None, num_cpus=None, num_gpus=None, resources=None, object_store_memory=None, node_ip_address=None, object_id_seed=None, num_workers=None, local_mode=False, driver_mode=None, redirect_worker_output=False, redirect_output=True, ignore_reinit_error=False, num_redis_shards=None, redis_max_clients=None, redis_password=None, plasma_directory=None, huge_pages=False, include_webui=True, driver_id=None, configure_logging=True, logging_level=20, logging_format='%(message)s', plasma_store_socket_name=None, raylet_socket_name=None, temp_dir=None, _internal_config=None, use_raylet=None)

Connect to an existing Ray cluster or start one and connect to it.

This method handles two cases. Either a Ray cluster already exists and we just attach this driver to it, or we start all of the processes associated with a Ray cluster and attach to the newly started cluster.

To start Ray and all of the relevant processes, use this as follows:

ray.init()

To connect to an existing Ray cluster, use this as follows (substituting in the appropriate address):

ray.init(redis_address="123.45.67.89:6379")
Parameters:
  • redis_address (str) – The address of the Redis server to connect to. If this address is not provided, then this command will start Redis, a global scheduler, a local scheduler, a plasma store, a plasma manager, and some workers. It will also kill these processes when Python exits.
  • num_cpus (int) – Number of cpus the user wishes all local schedulers to be configured with.
  • num_gpus (int) – Number of gpus the user wishes all local schedulers to be configured with.
  • resources – A dictionary mapping the name of a resource to the quantity of that resource available.
  • object_store_memory – The amount of memory (in bytes) to start the object store with.
  • node_ip_address (str) – The IP address of the node that we are on.
  • object_id_seed (int) – Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the same job in order to generate the object IDs in a consistent manner. However, the same ID should not be used for different jobs.
  • local_mode (bool) – True if the code should be executed serially without Ray. This is useful for debugging.
  • redirect_worker_output – True if the stdout and stderr of worker processes should be redirected to files.
  • redirect_output (bool) – True if stdout and stderr for non-worker processes should be redirected to files and false otherwise.
  • ignore_reinit_error – True if we should suppress errors from calling ray.init() a second time.
  • num_redis_shards – The number of Redis shards to start in addition to the primary Redis shard.
  • redis_max_clients – If provided, attempt to configure Redis with this maxclients number.
  • redis_password (str) – Prevents external clients without the password from connecting to Redis if provided.
  • plasma_directory – A directory where the Plasma memory mapped files will be created.
  • huge_pages – Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory.
  • include_webui – Boolean flag indicating whether to start the web UI, which is a Jupyter notebook.
  • driver_id – The ID of driver.
  • configure_logging – True if allow the logging cofiguration here. Otherwise, the users may want to configure it by their own.
  • logging_level – Logging level, default will be logging.INFO.
  • logging_format – Logging format, default will be “%(message)s” which means only contains the message.
  • plasma_store_socket_name (str) – If provided, it will specify the socket name used by the plasma store.
  • raylet_socket_name (str) – If provided, it will specify the socket path used by the raylet process.
  • temp_dir (str) – If provided, it will specify the root temporary directory for the Ray process.
  • _internal_config (str) – JSON configuration for overriding RayConfig defaults. For testing purposes ONLY.
Returns:

Address information about the started processes.

Raises:

Exception – An exception is raised if an inappropriate combination of arguments is passed in.

ray.is_initialized()

Check if ray.init has been called yet.

Returns:True if ray.init has already been called and false otherwise.
ray.remote(*args, **kwargs)

Define a remote function or an actor class.

This can be used with no arguments to define a remote function or actor as follows:

@ray.remote
def f():
    return 1

@ray.remote
class Foo(object):
    def method(self):
        return 1

It can also be used with specific keyword arguments:

  • num_return_vals: This is only for remote functions. It specifies the number of object IDs returned by the remote function invocation.
  • num_cpus: The quantity of CPU cores to reserve for this task or for the lifetime of the actor.
  • num_gpus: The quantity of GPUs to reserve for this task or for the lifetime of the actor.
  • resources: The quantity of various custom resources to reserve for this task or for the lifetime of the actor. This is a dictionary mapping strings (resource names) to numbers.
  • max_calls: Only for remote functions. This specifies the maximum number of times that a given worker can execute the given remote function before it must exit (this can be used to address memory leaks in third-party libraries or to reclaim resources that cannot easily be released, e.g., GPU memory that was acquired by TensorFlow). By default this is infinite.

This can be done as follows:

@ray.remote(num_gpus=1, max_calls=1, num_return_vals=2)
def f():
    return 1, 2

@ray.remote(num_cpus=2, resources={"CustomResource": 1})
class Foo(object):
    def method(self):
        return 1
ray.get(object_ids, worker=<ray.worker.Worker object>)

Get a remote object or a list of remote objects from the object store.

This method blocks until the object corresponding to the object ID is available in the local object store. If this object is not in the local object store, it will be shipped from an object store that has it (once the object has been created). If object_ids is a list, then the objects corresponding to each object in the list will be returned.

Parameters:object_ids – Object ID of the object to get or a list of object IDs to get.
Returns:A Python object or a list of Python objects.
Raises:Exception – An exception is raised if the task that created the object or that created one of the objects raised an exception.
ray.wait(object_ids, num_returns=1, timeout=None, worker=<ray.worker.Worker object>)

Return a list of IDs that are ready and a list of IDs that are not.

If timeout is set, the function returns either when the requested number of IDs are ready or when the timeout is reached, whichever occurs first. If it is not set, the function simply waits until that number of objects is ready and returns that exact number of object IDs.

This method returns two lists. The first list consists of object IDs that correspond to objects that are available in the object store. The second list corresponds to the rest of the object IDs (which may or may not be ready).

Ordering of the input list of object IDs is preserved. That is, if A precedes B in the input list, and both are in the ready list, then A will precede B in the ready list. This also holds true if A and B are both in the remaining list.

Parameters:
  • object_ids (List[ObjectID]) – List of object IDs for objects that may or may not be ready. Note that these IDs must be unique.
  • num_returns (int) – The number of object IDs that should be returned.
  • timeout (int) – The maximum amount of time in milliseconds to wait before returning.
Returns:

A list of object IDs that are ready and a list of the remaining object IDs.

ray.put(value, worker=<ray.worker.Worker object>)

Store an object in the object store.

Parameters:value – The Python object to be stored.
Returns:The object ID assigned to this value.
ray.get_gpu_ids()

Get the IDs of the GPUs that are available to the worker.

If the CUDA_VISIBLE_DEVICES environment variable was set when the worker started up, then the IDs returned by this method will be a subset of the IDs in CUDA_VISIBLE_DEVICES. If not, the IDs will fall in the range [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has.

Returns:A list of GPU IDs.
ray.get_resource_ids()

Get the IDs of the resources that are available to the worker.

Returns:A dictionary mapping the name of a resource to a list of pairs, where each pair consists of the ID of a resource and the fraction of that resource reserved for this worker.
ray.get_webui_url()

Get the URL to access the web UI.

Note that the URL does not specify which node the web UI is on.

Returns:The URL of the web UI as a string.
ray.shutdown(worker=<ray.worker.Worker object>)

Disconnect the worker, and terminate processes started by ray.init().

This will automatically run at the end when a Python process that uses Ray exits. It is ok to run this twice in a row. The primary use case for this function is to cleanup state between tests.

Note that this will clear any remote function definitions, actor definitions, and existing actors, so if you wish to use any previously defined remote functions or actors after calling ray.shutdown(), then you need to redefine them. If they were defined in an imported module, then you will need to reload the module.

ray.register_custom_serializer(cls, use_pickle=False, use_dict=False, serializer=None, deserializer=None, local=False, driver_id=None, class_id=None, worker=<ray.worker.Worker object>)

Enable serialization and deserialization for a particular class.

This method runs the register_class function defined below on every worker, which will enable ray to properly serialize and deserialize objects of this class.

Parameters:
  • cls (type) – The class that ray should use this custom serializer for.
  • use_pickle (bool) – If true, then objects of this class will be serialized using pickle.
  • use_dict – If true, then objects of this class be serialized turning their __dict__ fields into a dictionary. Must be False if use_pickle is true.
  • serializer – The custom serializer to use. This should be provided if and only if use_pickle and use_dict are False.
  • deserializer – The custom deserializer to use. This should be provided if and only if use_pickle and use_dict are False.
  • local – True if the serializers should only be registered on the current worker. This should usually be False.
  • driver_id – ID of the driver that we want to register the class for.
  • class_id – ID of the class that we are registering. If this is not specified, we will calculate a new one inside the function.
Raises:

Exception – An exception is raised if pickle=False and the class cannot be efficiently serialized by Ray. This can also raise an exception if use_dict is true and cls is not pickleable.

ray.profile(event_type, extra_data=None, worker=None)

Profile a span of time so that it appears in the timeline visualization.

Note that this only works in the raylet code path.

This function can be used as follows (both on the driver or within a task).

with ray.profile("custom event", extra_data={'key': 'value'}):
    # Do some computation here.

Optionally, a dictionary can be passed as the “extra_data” argument, and it can have keys “name” and “cname” if you want to override the default timeline display text and box color. Other values will appear at the bottom of the chrome tracing GUI when you click on the box corresponding to this profile span.

Parameters:
  • event_type – A string describing the type of the event.
  • extra_data – This must be a dictionary mapping strings to strings. This data will be added to the json objects that are used to populate the timeline, so if you want to set a particular color, you can simply set the “cname” attribute to an appropriate color. Similarly, if you set the “name” attribute, then that will set the text displayed on the box in the timeline.
Returns:

An object that can profile a span of time via a “with” statement.

ray.method(*args, **kwargs)

Annotate an actor method.

@ray.remote
class Foo(object):
    @ray.method(num_return_vals=2)
    def bar(self):
        return 1, 2

f = Foo.remote()

_, _ = f.bar.remote()
Parameters:num_return_vals – The number of object IDs that should be returned by invocations of this actor method.

The Ray Command Line API

ray start

ray start [OPTIONS]

Options

--node-ip-address <node_ip_address>

the IP address of this node

--redis-address <redis_address>

the address to use for connecting to Redis

--redis-port <redis_port>

the port to use for starting Redis

--num-redis-shards <num_redis_shards>

the number of additional Redis shards to use in addition to the primary Redis shard

--redis-max-clients <redis_max_clients>

If provided, attempt to configure Redis with this maximum number of clients.

--redis-password <redis_password>

If provided, secure Redis ports with this password

--redis-shard-ports <redis_shard_ports>

the port to use for the Redis shards other than the primary Redis shard

--object-manager-port <object_manager_port>

the port to use for starting the object manager

--node-manager-port <node_manager_port>

the port to use for starting the node manager

--object-store-memory <object_store_memory>

the maximum amount of memory (in bytes) to allow the object store to use

--num-workers <num_workers>

The initial number of workers to start on this node, note that the local scheduler may start additional workers. If you wish to control the total number of concurent tasks, then use –resources instead and specify the CPU field.

--num-cpus <num_cpus>

the number of CPUs on this node

--num-gpus <num_gpus>

the number of GPUs on this node

--resources <resources>

a JSON serialized dictionary mapping resource name to resource quantity

--head

provide this argument for the head node

--no-ui

provide this argument if the UI should not be started

--block

provide this argument to block forever in this command

--plasma-directory <plasma_directory>

object store directory for memory mapped files

--huge-pages

enable support for huge pages in the object store

--autoscaling-config <autoscaling_config>

the file that contains the autoscaling config

--no-redirect-worker-output

do not redirect worker stdout and stderr to files

--no-redirect-output

do not redirect non-worker stdout and stderr to files

--plasma-store-socket-name <plasma_store_socket_name>

manually specify the socket name of the plasma store

--raylet-socket-name <raylet_socket_name>

manually specify the socket path of the raylet process

--temp-dir <temp_dir>

manually specify the root temporary dir of the Ray process

--internal-config <internal_config>

Do NOT use this. This is for debugging/development purposes ONLY.

ray stop

ray stop [OPTIONS]

ray up

ray up [OPTIONS] CLUSTER_CONFIG_FILE

Options

--no-restart

Whether to skip restarting Ray services during the update. This avoids interrupting running jobs.

--restart-only

Whether to skip running setup commands and only restart Ray. This cannot be used with ‘no-restart’.

--min-workers <min_workers>

Override the configured min worker node count for the cluster.

--max-workers <max_workers>

Override the configured max worker node count for the cluster.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

-y, --yes

Don’t ask for confirmation.

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray down

ray down [OPTIONS] CLUSTER_CONFIG_FILE

Options

--workers-only

Only destroy the workers.

-y, --yes

Don’t ask for confirmation.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray exec

ray exec [OPTIONS] CLUSTER_CONFIG_FILE CMD

Options

--stop

Stop the cluster after the command finishes running.

--start

Start the cluster if needed.

--screen

Run the command in a screen.

--tmux

Run the command in tmux.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

--port-forward <port_forward>

Port to forward.

Arguments

CLUSTER_CONFIG_FILE

Required argument

CMD

Required argument

ray attach

ray attach [OPTIONS] CLUSTER_CONFIG_FILE

Options

--start

Start the cluster if needed.

--tmux

Run the command in tmux.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

-N, --new

Force creation of a new screen.

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray get_head_ip

ray get_head_ip [OPTIONS] CLUSTER_CONFIG_FILE

Options

-n, --cluster-name <cluster_name>

Override the configured cluster name.

Arguments

CLUSTER_CONFIG_FILE

Required argument