Ray Generators#

Python generators are functions that behave like iterators, yielding one value per iteration. Ray also supports the generators API.

Any generator function decorated with ray.remote becomes a Ray generator task. Generator tasks stream outputs back to the caller before the task finishes.

+import ray
 import time

 # Takes 25 seconds to finish.
[email protected]
 def f():
     for i in range(5):
         time.sleep(5)
         yield i

-for obj in f():
+for obj_ref in f.remote():
     # Prints every 5 seconds and stops after 25 seconds.
-    print(obj)
+    print(ray.get(obj_ref))

The above Ray generator yields the output every 5 seconds 5 times. With a normal Ray task, you have to wait 25 seconds to access the output. With a Ray generator, the caller can access the object reference before the task f finishes.

The Ray generator is useful when

  • You want to reduce heap memory or object store memory usage by yielding and garbage collecting (GC) the output before the task finishes.

  • You are familiar with the Python generator and want the equivalent programming models.

Ray libraries use the Ray generator to support streaming use cases

  • Ray Serve uses Ray generators to support streaming responses.

  • Ray Data is a streaming data processing library, which uses Ray generators to control and reduce concurrent memory usages.

Ray generator works with existing Ray APIs seamlessly

Getting started#

Define a Python generator function and decorate it with ray.remote to create a Ray generator.

import ray
import time

@ray.remote
def task():
    for i in range(5):
        time.sleep(5)
        yield i

The Ray generator task returns an ObjectRefGenerator object, which is compatible with generator and async generator APIs. You can access the next, __iter__, __anext__, __aiter__ APIs from the class.

Whenever a task invokes yield, a corresponding output is ready and available from a generator as a Ray object reference. You can call next(gen) to obtain an object reference. If next has no more items to generate, it raises StopIteration. If __anext__ has no more items to generate, it raises StopAsyncIteration

The next API blocks the thread until the task generates a next object reference with yield. Since the ObjectRefGenerator is just a Python generator, you can also use a for loop to iterate object references.

If you want to avoid blocking a thread, you can either use asyncio or ray.wait API.

gen = task.remote()
# Blocks for 5 seconds.
ref = next(gen)
# return 0
ray.get(ref)
# Blocks for 5 seconds.
ref = next(gen)
# Return 1
ray.get(ref)

# Returns 2~4 every 5 seconds.
for ref in gen:
    print(ray.get(ref))

Note

For a normal Python generator, a generator function is paused and resumed when next function is called on a generator. Ray eagerly executes a generator task to completion regardless of whether the caller is polling the partial results or not.

Error handling#

If a generator task has a failure (by an application exception or system error such as an unexpected node failure), the next(gen) returns an object reference that contains an exception. When you call ray.get, Ray raises the exception.

@ray.remote
def task():
    for i in range(5):
        time.sleep(1)
        if i == 1:
            raise ValueError
        yield i

gen = task.remote()
# it's okay.
ray.get(next(gen))

# Raises an exception
try:
    ray.get(next(gen))
except ValueError as e:
    print(f"Exception is raised when i == 1 as expected {e}")

In the above example, if the an application fails the task, Ray returns the object reference with an exception in a correct order. For example, if Ray raises the exception after the second yield, the third next(gen) returns an object reference with an exception all the time. If a system error fails the task, (e.g., a node failure or worker process failure), next(gen) returns the object reference that contains the system level exception at any time without an ordering guarantee. It means when you have N yields, the generator can create from 1 to N + 1 object references (N output + ref with a system-level exception) when there failures occur.

Generator from Actor Tasks#

The Ray generator is compatible with all actor execution models. It seamlessly works with regular actors, async actors, and threaded actors.

@ray.remote
class Actor:
    def f(self):
        for i in range(5):
            yield i

@ray.remote
class AsyncActor:
    async def f(self):
        for i in range(5):
            yield i

@ray.remote(max_concurrency=5)
class ThreadedActor:
    def f(self):
        for i in range(5):
            yield i

actor = Actor.remote()
for ref in actor.f.remote():
    print(ray.get(ref))

actor = AsyncActor.remote()
for ref in actor.f.remote():
    print(ray.get(ref))

actor = ThreadedActor.remote()
for ref in actor.f.remote():
    print(ray.get(ref))

Using the Ray generator with asyncio#

The returned ObjectRefGenerator is also compatible with asyncio. You can use __anext__ or async for loops.

import asyncio

@ray.remote
def task():
    for i in range(5):
        time.sleep(1)
        yield i


async def main():
    async for ref in task.remote():
        print(await ref)

asyncio.run(main())

Garbage collection of object references#

The returned ref from next(generator) is just a regular Ray object reference and is distributed ref counted in the same way. If references are not consumed from a generator by the next API, references are garbage collected (GC’ed) when the generator is GC’ed.

@ray.remote
def task():
    for i in range(5):
        time.sleep(1)
        yield i

gen = task.remote()
ref1 = next(gen)
del gen

In the following example, Ray counts ref1 as a normal Ray object reference after Ray returns it. Other references that aren’t consumed with next(gen) are removed when the generator is GC’ed. In this example, garbage collection happens when you call del gen.

Fault tolerance#

Fault tolerance features work with Ray generator tasks and actor tasks. For example;

Cancellation#

The ray.cancel() function works with both Ray generator tasks and actor tasks. Semantic-wise, cancelling a generator task isn’t different from cancelling a regular task. When you cancel a task, next(gen) can return the reference that contains TaskCancelledError without any special ordering guarantee.

How to wait for generator without blocking a thread (compatibility to ray.wait and ray.get)#

When using a generator, next API blocks its thread until a next object reference is available. However, you may not want this behavior all the time. You may want to wait for a generator without blocking a thread. Unblocking wait is possible with the Ray generator in the following ways:

Wait until a generator task completes

ObjectRefGenerator has an API completed. It returns an object reference that is available when a generator task finishes or errors. For example, you can do ray.get(<generator_instance>.compelted()) to wait until a task completes. Note that using ray.get to ObjectRefGenerator isn’t allowed.

Use asyncio and await

ObjectRefGenerator is compatible with asyncio. You can create multiple asyncio tasks that create a generator task and wait for it to avoid blocking a thread.

import asyncio

@ray.remote
def task():
    for i in range(5):
        time.sleep(1)
        yield i


async def async_task():
    async for ref in task.remote():
        print(await ref)

async def main():
    t1 = async_task()
    t2 = async_task()
    await asyncio.gather(t1, t2)

asyncio.run(main())

Use ray.wait

You can pass ObjectRefGenerator as an input to ray.wait. The generator is “ready” if a next item is available. Once Ray finds from a ready list, next(gen) returns the next object reference immediately without blocking. See the example below for more details.

@ray.remote
def task():
    for i in range(5):
        time.sleep(5)
        yield i

gen = task.remote()

# Because it takes 5 seconds to make the first yield,
# with 0 timeout, the generator is unready.
ready, unready = ray.wait([gen], timeout=0)
print("timeout 0, nothing is ready.")
print(ready)
assert len(ready) == 0
assert len(unready) == 1

# Without a timeout argument, ray.wait waits until the given argument
# is ready. When a next item is ready, it returns.
ready, unready = ray.wait([gen])
print("Wait for 5 seconds. The next item is ready.")
assert len(ready) == 1
assert len(unready) == 0
next(gen)

# Because the second yield hasn't happened yet, 
ready, unready = ray.wait([gen], timeout=0)
print("Wait for 0 seconds. The next item is not ready.")
print(ready, unready)
assert len(ready) == 0
assert len(unready) == 1

All the input arguments (such as timeout, num_returns, and fetch_local) from ray.wait works with a generator.

ray.wait can mix regular Ray object references with generators for inputs. In this case, the application should handle all input arguments (such as timeout, num_returns, and fetch_local) from ray.wait work with generators.

from ray._raylet import ObjectRefGenerator

@ray.remote
def generator_task():
    for i in range(5):
        time.sleep(5)
        yield i

@ray.remote
def regular_task():
    for i in range(5):
        time.sleep(5)
    return

gen = [generator_task.remote()]
ref = [regular_task.remote()]
ready, unready = [], [*gen, *ref]
result = []

while unready:
    ready, unready = ray.wait(unready)
    for r in ready:
        if isinstance(r, ObjectRefGenerator):
            try:
                ref = next(r)
                result.append(ray.get(ref))
            except StopIteration:
                pass
            else:
                unready.append(r)
        else:
            result.append(ray.get(r))

Thread safety#

ObjectRefGenerator object is not thread-safe.

Limitation#

Ray generators don’t support these features:

  • throw, send, and close APIs.

  • return statements from generators.

  • Passing ObjectRefGenerator to another task or actor.

  • Ray Client