AsyncIO / Concurrency for Actors

Since Python 3.5, it is possible to write concurrent code using the async/await syntax. Ray natively integrates with asyncio. You can use ray alongside with popular async frameworks like aiohttp, aioredis, etc.

You can try it about by running the following snippet in ipython or a shell that supports top level await:

import ray
import asyncio
ray.init()

@ray.remote
class AsyncActor:
    # multiple invocation of this method can be running in
    # the event loop at the same time
    async def run_concurrent(self):
        print("started")
        await asyncio.sleep(2) # concurrent workload here
        print("finished")

actor = AsyncActor.remote()

# regular ray.get
ray.get([actor.run_concurrent.remote() for _ in range(4)])

# async ray.get
await actor.run_concurrent.remote()

ObjectIDs as asyncio.Futures

ObjectIDs can be translated to asyncio.Future. This feature make it possible to await on ray futures in existing concurrent applications.

Instead of:

@ray.remote
def some_task():
    return 1

ray.get(some_task.remote())
ray.wait([some_task.remote()])

you can do:

@ray.remote
def some_task():
    return 1

await some_task.remote()
await asyncio.wait([some_task.remote()])

Please refer to asyncio doc for more asyncio patterns including timeouts and asyncio.gather.

Async Actor

Ray also supports concurrent multitasking by executing many actor tasks at once. To do so, you can define an actor with async methods:

import asyncio

@ray.remote
class AsyncActor:
    async def run_task(self):
        print("started")
        await asyncio.sleep(1) # Network, I/O task here
        print("ended")

actor = AsyncActor.remote()
# All 50 tasks should start at once. After 1 second they should all finish.
# they should finish at the same time
ray.get([actor.run_task.remote() for _ in range(50)])

Under the hood, Ray runs all of the methods inside a single python event loop. Please note that running blocking ray.get or ray.wait inside async actor method is not allowed, because ray.get will block the execution of the event loop.

You can limit the number of concurrent task running at once using the max_concurrency flag. By default, 1000 tasks can be running concurrently.

import asyncio

@ray.remote
class AsyncActor:
    async def run_task(self):
        print("started")
        await asyncio.sleep(1) # Network, I/O task here
        print("ended")

actor = AsyncActor.options(max_concurreny=10).remote()

# Only 10 tasks will be running concurrently. Once 10 finish, the next 10 should run.
ray.get([actor.run_task.remote() for _ in range(50)])

Known Issues

Async API support is experimental, and we are working to improve it. Please let us know any issues you encounter.