Parallel Iterator API (Experimental)

ray.experimental.iter provides a parallel iterator API for simple data ingest and processing. It can be thought of as syntactic sugar around Ray actors and ray.wait loops.

Parallel iterators are lazy and can operate over infinite sequences of items. Iterator transformations are only executed when the user calls next() to fetch the next output item from the iterator.

Concepts

Parallel Iterators: You can create a ParallelIterator object from an existing set of items, range of numbers, set of iterators, or set of worker actors. Ray will create a worker actor that produces the data for each shard of the iterator:

# Create an iterator with 2 worker actors over the list [1, 2, 3, 4].
>>> it = ray.experimental.iter.from_items([1, 2, 3, 4], num_shards=2)
ParallelIterator[from_items[int, 4, shards=2]]

# Create an iterator with 32 worker actors over range(1000000).
>>> it = ray.experimental.iter.from_range(1000000, num_shards=32)
ParallelIterator[from_range[1000000, shards=32]]

# Create an iterator over two range(10) generators.
>>> it = ray.experimental.iter.from_iterators([range(10), range(10)])
ParallelIterator[from_iterators[shards=2]]

# Create an iterator from existing worker actors. These actors must
# implement the ParallelIteratorWorker interface.
>>> it = ray.experimental.iter.from_actors([a1, a2, a3, a4])
ParallelIterator[from_actors[shards=4]]

Simple transformations can be chained on the iterator, such as mapping, filtering, and batching. These will be executed in parallel on the workers:

# Apply a transformation to each element of the iterator.
>>> it = it.for_each(lambda x: x ** 2)
ParallelIterator[...].for_each()

# Batch together items into a lists of 32 elements.
>>> it = it.batch(32)
ParallelIterator[...].for_each().batch(32)

# Filter out items with odd values.
>>> it = it.filter(lambda x: x % 2 == 0)
ParallelIterator[...].for_each().batch(32).filter()

Local Iterators: To read elements from a parallel iterator, it has to be converted to a LocalIterator by calling gather_sync() or gather_async(). These correspond to ray.get and ray.wait loops over the actors respectively:

# Gather items synchronously (deterministic round robin across shards):
>>> it = ray.experimental.iter.from_range(1000000, 1)
>>> it = it.gather_sync()
LocalIterator[ParallelIterator[from_range[1000000, shards=1]].gather_sync()]

# Local iterators can be used as any other Python iterator.
>>> it.take(5)
[0, 1, 2, 3, 4]

# They also support chaining of transformations. Unlike transformations
# applied on a ParallelIterator, they will be executed in the current process.
>>> it.filter(lambda x: x % 2 == 0).take(5)
[0, 2, 4, 6, 8]

# Async gather can be used for better performance, but it is non-deterministic.
>>> it = ray.experimental.iter.from_range(1000, 4).gather_async()
>>> it.take(5)
[0, 250, 500, 750, 1]

Passing iterators to remote functions: Both ParallelIterator and LocalIterator are serializable. They can be passed to any Ray remote function. However, note that each shard should only be read by one process at a time:

# Get local iterators representing the shards of this ParallelIterator:
>>> it = ray.experimental.iter.from_range(10000, 3)
>>> [s0, s1, s2] = it.shards()
[LocalIterator[from_range[10000, shards=3].shard[0]],
 LocalIterator[from_range[10000, shards=3].shard[1]],
 LocalIterator[from_range[10000, shards=3].shard[2]]]

# Iterator shards can be passed to remote functions.
>>> @ray.remote
... def do_sum(it):
...     return sum(it)
...
>>> ray.get([do_sum.remote(s) for s in it.shards()])
[5552778, 16661667, 27780555]

Semantic Guarantees

The parallel iterator API guarantees the following semantics:

Fetch ordering: When using it.gather_sync().foreach(fn) or it.gather_async().foreach(fn) (or any other transformation after a gather), fn(x_i) will be called on the element x_i before the next element x_{i+1} is fetched from the source actor. This is useful if you need to update the source actor between iterator steps. Note that for async gather, this ordering only applies per shard.

Operator state: Operator state is preserved for each shard. This means that you can pass a stateful callable to .foreach():

class CumulativeSum:
    def __init__(self):
        self.total = 0

    def __call__(self, x):
        self.total += x
        return (self.total, x)

it = ray.experimental.iter.from_range(5, 1)
for x in it.for_each(CumulativeSum()).gather_sync():
    print(x)

## This prints:
#(0, 0)
#(1, 1)
#(3, 2)
#(6, 3)
#(10, 4)

Example: Streaming word frequency count

Parallel iterators can be used for simple data processing use cases such as streaming grep:

import ray
import glob
import gzip
import numpy as np

ray.init()

file_list = glob.glob("/var/log/syslog*.gz")
it = (
    ray.experimental.iter.from_items(file_list, num_shards=4)
       .for_each(lambda f: gzip.open(f).readlines())
       .flatten()
       .for_each(lambda line: line.decode("utf-8"))
       .for_each(lambda line: 1 if "cron" in line else 0)
       .batch(1024)
       .for_each(np.mean)
)

# Show the probability of a log line containing "cron", with a
# sliding window of 1024 lines.
for freq in it.gather_async():
    print(freq)

Example: Passing iterator shards to remote functions

Both parallel iterators and local iterators are fully serializable, so once created you can pass them to Ray tasks and actors. This can be useful for distributed training:

import ray
import numpy as np

ray.init()

@ray.remote
def train(data_shard):
    for batch in data_shard:
        print("train on", batch)  # perform model update with batch

it = (
    ray.experimental.iter.from_range(1000000, num_shards=4, repeat=True)
        .batch(1024)
        .for_each(np.array)
)

work = [train.remote(shard) for shard in it.shards()]
ray.get(work)

API Reference

ray.experimental.iter.from_items(items: List[T], num_shards: int = 2, repeat: bool = False) → ray.experimental.iter.ParallelIterator[~T][T][source]

Create a parallel iterator from an existing set of objects.

The objects will be divided round-robin among the number of shards.

Parameters:
  • items (list) – The list of items to iterate over.
  • num_shards (int) – The number of worker actors to create.
  • repeat (bool) – Whether to cycle over the items forever.
ray.experimental.iter.from_range(n: int, num_shards: int = 2, repeat: bool = False) → ray.experimental.iter.ParallelIterator[int][int][source]

Create a parallel iterator over the range 0..n.

The range will be partitioned sequentially among the number of shards.

Parameters:
  • n (int) – The max end of the range of numbers.
  • num_shards (int) – The number of worker actors to create.
  • repeat (bool) – Whether to cycle over the range forever.
ray.experimental.iter.from_iterators(generators: List[Iterable[T]], repeat: bool = False, name=None) → ray.experimental.iter.ParallelIterator[~T][T][source]

Create a parallel iterator from a set of iterators.

An actor will be created for each iterator.

Examples

>>> # Create using a list of generators.
>>> from_iterators([range(100), range(100)])
>>> # Equivalent to the above.
>>> from_iterators([lambda: range(100), lambda: range(100)])
Parameters:
  • generators (list) – A list of Python generator objects or lambda functions that produced a generator when called. We allow lambda functions since the generator itself might not be serializable, but a lambda that returns it can be.
  • repeat (bool) – Whether to cycle over the iterators forever.
  • name (str) – Optional name to give the iterator.
ray.experimental.iter.from_actors(actors: List[ray.actor.ActorHandle], name=None) → ray.experimental.iter.ParallelIterator[~T][T][source]

Create a parallel iterator from an existing set of actors.

Each actor must subclass the ParallelIteratorWorker interface.

Parameters:
  • actors (list) – List of actors that each implement ParallelIteratorWorker.
  • name (str) – Optional name to give the iterator.
class ray.experimental.iter.ParallelIterator(actor_sets: List[_ActorSet], name: str)[source]

Bases: typing.Generic

A parallel iterator over a set of remote actors.

This can be used to iterate over a fixed set of task results (like an actor pool), or a stream of data (e.g., a fixed range of numbers, an infinite stream of RLlib rollout results).

This class is serializable and can be passed to other remote tasks and actors. However, each shard should be read from at most one process at a time.

Examples

>>> # Applying a function over items in parallel.
>>> it = ray.experimental.iter.from_items([1, 2, 3], num_shards=2)
... <__main__.ParallelIterator object>
>>> it = it.for_each(lambda x: x * 2).gather_sync()
... <__main__.LocalIterator object>
>>> print(list(it))
... [2, 4, 6]
>>> # Creating from generators.
>>> it = ray.experimental.iter.from_iterators([range(3), range(3)])
... <__main__.ParallelIterator object>
>>> print(list(it.gather_sync()))
... [0, 0, 1, 1, 2, 2]
>>> # Accessing the individual shards of an iterator.
>>> it = ray.experimental.iter.from_range(10, num_shards=2)
... <__main__.ParallelIterator object>
>>> it0 = it.get_shard(0)
... <__main__.LocalIterator object>
>>> print(list(it0))
... [0, 1, 2, 3, 4]
>>> it1 = it.get_shard(1)
... <__main__.LocalIterator object>
>>> print(list(it1))
... [5, 6, 7, 8, 9]
>>> # Gathering results from actors synchronously in parallel.
>>> it = ray.experimental.iter.from_actors(workers)
... <__main__.ParallelIterator object>
>>> it = it.batch_across_shards()
... <__main__.LocalIterator object>
>>> print(next(it))
... [worker_1_result_1, worker_2_result_1]
>>> print(next(it))
... [worker_1_result_2, worker_2_result_2]
__init__(actor_sets: List[_ActorSet], name: str)[source]

Initialize self. See help(type(self)) for accurate signature.

__str__()[source]

Return str(self).

__repr__()[source]

Return repr(self).

for_each(fn: Callable[[T], U]) → ray.experimental.iter.ParallelIterator[~U][U][source]

Remotely apply fn to each item in this iterator.

Parameters:fn (func) – function to apply to each item.

Examples

>>> next(from_range(4).for_each(lambda x: x * 2).gather_sync())
... [0, 2, 4, 8]
filter(fn: Callable[[T], bool]) → ray.experimental.iter.ParallelIterator[~T][T][source]

Remotely filter items from this iterator.

Parameters:fn (func) – returns False for items to drop from the iterator.

Examples

>>> it = from_items([0, 1, 2]).filter(lambda x: x > 0)
>>> next(it.gather_sync())
... [1, 2]
batch(n: int) → ray.experimental.iter.ParallelIterator[typing.List[~T]][List[T]][source]

Remotely batch together items in this iterator.

Parameters:n (int) – Number of items to batch together.

Examples

>>> next(from_range(10, 1).batch(4).gather_sync())
... [0, 1, 2, 3]
flatten() → ParallelIterator[T[0]][source]

Flatten batches of items into individual items.

Examples

>>> next(from_range(10, 1).batch(4).flatten())
... 0
combine(fn: Callable[[T], List[U]]) → ray.experimental.iter.ParallelIterator[~U][U][source]

Transform and then combine items horizontally.

This is the equivalent of for_each(fn).flatten() (flat map).

gather_sync() → ray.experimental.iter.LocalIterator[~T][T][source]

Returns a local iterable for synchronous iteration.

New items will be fetched from the shards on-demand as the iterator is stepped through.

This is the equivalent of batch_across_shards().flatten().

Examples

>>> it = from_range(100, 1).gather_sync()
>>> next(it)
... 0
>>> next(it)
... 1
>>> next(it)
... 2
batch_across_shards() → ray.experimental.iter.LocalIterator[typing.List[~T]][List[T]][source]

Iterate over the results of multiple shards in parallel.

Examples

>>> it = from_iterators([range(3), range(3)])
>>> next(it.batch_across_shards())
... [0, 0]
gather_async() → ray.experimental.iter.LocalIterator[~T][T][source]

Returns a local iterable for asynchronous iteration.

New items will be fetched from the shards asynchronously as soon as the previous one is computed. Items arrive in non-deterministic order.

Examples

>>> it = from_range(100, 1).gather_async()
>>> next(it)
... 3
>>> next(it)
... 0
>>> next(it)
... 1
take(n: int) → List[T][source]

Return up to the first n items from this iterator.

show(n: int = 20)[source]

Print up to the first n items from this iterator.

union(other: ray.experimental.iter.ParallelIterator[~T][T]) → ray.experimental.iter.ParallelIterator[~T][T][source]

Return an iterator that is the union of this and the other.

num_shards() → int[source]

Return the number of worker actors backing this iterator.

shards() → List[ray.experimental.iter.LocalIterator[~T][T]][source]

Return the list of all shards.

get_shard(shard_index: int) → ray.experimental.iter.LocalIterator[~T][T][source]

Return a local iterator for the given shard.

The iterator is guaranteed to be serializable and can be passed to remote tasks or actors.

__weakref__

list of weak references to the object (if defined)

class ray.experimental.iter.LocalIterator(base_iterator: Callable[[], Iterable[T]], local_transforms: List[Callable[[Iterable[T_co]], Any]] = None, timeout: int = None, name=None)[source]

Bases: typing.Generic

An iterator over a single shard of data.

It implements similar transformations as ParallelIterator[T], but the transforms will be applied locally and not remotely in parallel.

This class is serializable and can be passed to other remote tasks and actors. However, it should be read from at most one process at a time.

__init__(base_iterator: Callable[[], Iterable[T]], local_transforms: List[Callable[[Iterable[T_co]], Any]] = None, timeout: int = None, name=None)[source]

Create a local iterator (this is an internal function).

Parameters:
  • base_iterator (func) – A function that produces the base iterator. This is a function so that we can ensure LocalIterator is serializable.
  • local_transforms (list) – A list of transformation functions to be applied on top of the base iterator. When iteration begins, we create the base iterator and apply these functions. This lazy creation ensures LocalIterator is serializable until you start iterating over it.
  • timeout (int) – Optional timeout in seconds for this iterator, after which _NextValueNotReady will be returned. This avoids blocking.
  • name (str) – Optional name for this iterator.
__str__()[source]

Return str(self).

__repr__()[source]

Return repr(self).

take(n: int) → List[T][source]

Return up to the first n items from this iterator.

show(n: int = 20)[source]

Print up to the first n items from this iterator.

union(other: ray.experimental.iter.LocalIterator[~T][T]) → ray.experimental.iter.LocalIterator[~T][T][source]

Return an iterator that is the union of this and the other.

There are no ordering guarantees between the two iterators. We make a best-effort attempt to return items from both as they become ready, preventing starvation of any particular iterator.

__weakref__

list of weak references to the object (if defined)

class ray.experimental.iter.ParallelIteratorWorker(item_generator: Any, repeat: bool)[source]

Bases: object

Worker actor for a ParallelIterator.

Actors that are passed to iter.from_actors() must subclass this interface.

__init__(item_generator: Any, repeat: bool)[source]

Create an iterator worker.

Subclasses must call this init function.

Parameters:
  • item_generator (obj) – A Python generator objects or lambda function that produces a generator when called. We allow lambda functions since the generator itself might not be serializable, but a lambda that returns it can be.
  • repeat (bool) – Whether to loop over the iterator forever.
par_iter_init(transforms)[source]

Implements ParallelIterator worker init.

par_iter_next()[source]

Implements ParallelIterator worker item fetch.

__weakref__

list of weak references to the object (if defined)