AsyncIO / Concurrency for Actors¶
Since Python 3.5, it is possible to write concurrent code using the
Ray natively integrates with asyncio. You can use ray alongside with popular
async frameworks like aiohttp, aioredis, etc.
You can try it about by running the following snippet in
ipython or a shell
that supports top level
import ray import asyncio ray.init() @ray.remote class AsyncActor: # multiple invocation of this method can be running in # the event loop at the same time async def run_concurrent(self): print("started") await asyncio.sleep(2) # concurrent workload here print("finished") actor = AsyncActor.remote() # regular ray.get ray.get([actor.run_concurrent.remote() for _ in range(4)]) # async ray.get await actor.run_concurrent.remote()
ObjectIDs as asyncio.Futures¶
ObjectIDs can be translated to asyncio.Future. This feature
make it possible to
await on ray futures in existing concurrent
@ray.remote def some_task(): return 1 ray.get(some_task.remote()) ray.wait([some_task.remote()])
you can do:
@ray.remote def some_task(): return 1 await some_task.remote() await asyncio.wait([some_task.remote()])
Please refer to asyncio doc
for more asyncio patterns including timeouts and
Ray also supports concurrent multitasking by executing many actor tasks at once. To do so, you can define an actor with async methods:
import asyncio @ray.remote class AsyncActor: async def run_task(self): print("started") await asyncio.sleep(1) # Network, I/O task here print("ended") actor = AsyncActor.remote() # All 50 tasks should start at once. After 1 second they should all finish. # they should finish at the same time ray.get([actor.run_task.remote() for _ in range(50)])
Under the hood, Ray runs all of the methods inside a single python event loop.
Please note that running blocking
ray.wait inside async
actor method is not allowed, because
ray.get will block the execution
of the event loop.
You can limit the number of concurrent task running at once using the
max_concurrency flag. By default, 1000 tasks can be running concurrently.
import asyncio @ray.remote class AsyncActor: async def run_task(self): print("started") await asyncio.sleep(1) # Network, I/O task here print("ended") actor = AsyncActor.options(max_concurrency=10).remote() # Only 10 tasks will be running concurrently. Once 10 finish, the next 10 should run. ray.get([actor.run_task.remote() for _ in range(50)])