Actor 并发
Abstract
在单个 Actor 进程内部,可以执行并发操作(例如多个线程)。
Ray 在 Actor 中提供了两种并发方式:
- 异步执行(async execution)
- 多线程(threading)
| 类型 | 并发方式 | 适用场景 |
|---|---|---|
| Async Actor | 协程(event loop) | IO 密集、异步编程 |
| Threaded Actor | 多线程(thread pool) | CPU 密集、阻塞任务 |
Warning
Remote Task:不能直接使用 async def,需要包装为同步函数
需要注意的是,Python 的 GIL(Global Interpreter Lock) 会限制:同一时间只能有一个线程执行 Python 代码。
这意味着:
- 如果并行执行的仅是纯 Python 代码,由于 GIL 限制,无法实现真正的多线程并行计算。
- 如果你的代码主要调用如 NumPy、Cython、TensorFlow、PyTorch 等底层会主动释放 GIL 的 C/C++ 扩展库,那么可以实现真正的多线程并行,从而加速计算密集型或 IO 密集型的操作。
注意
无论是基于线程的 Actor(Threaded Actors),还是基于 AsyncIO 的 Actor 模型,都无法绕过 Python 的 GIL(全局解释器锁)。
AsyncIO for Actors
Ray 原生集成 asyncio,可以无缝配合 async / await 以及各类异步框架使用,例如:aiohttp、aioredis 等。
import ray
import asyncio
@ray.remote
class AsyncActor:
def __init__(self, expected_num_tasks: int):
self._event = asyncio.Event()
self._curr_num_tasks = 0
self._expected_num_tasks = expected_num_tasks
# Multiple invocations of this method can run concurrently on the same event loop.
async def run_concurrent(self):
self._curr_num_tasks += 1
if self._curr_num_tasks == self._expected_num_tasks:
print("All coroutines are executing concurrently, unblocking.")
self._event.set()
else:
print("Waiting for other coroutines to start.")
await self._event.wait()
print("All coroutines ran concurrently.")
actor = AsyncActor.remote(4)
refs = [actor.run_concurrent.remote() for _ in range(4)]
# Fetch results using regular `ray.get`.
ray.get(refs)
# Fetch results using `asyncio` APIs.
async def get_async():
return await asyncio.gather(*refs)
asyncio.run(get_async())
(AsyncActor pid=9064) Waiting for other coroutines to start.
(AsyncActor pid=9064) Waiting for other coroutines to start.
(AsyncActor pid=9064) Waiting for other coroutines to start.
(AsyncActor pid=9064) All coroutines are executing concurrently, unblocking.
(AsyncActor pid=9064) All coroutines ran concurrently.
(AsyncActor pid=9064) All coroutines ran concurrently.
(AsyncActor pid=9064) All coroutines ran concurrently.
(AsyncActor pid=9064) All coroutines ran concurrently.
ObjectRef 作为 asyncio.Future
在 Ray 中,ObjectRef 可以转换为 asyncio.Future,从而可以在已有的异步程序中直接对 Ray 任务进行 await。
import ray
@ray.remote
def some_task():
return 1
ray.get(some_task.remote())
ray.wait([some_task.remote()])
在 Python 3.9 和 3.10 中,可以直接对 ObjectRef 使用 await 语法:
import ray
import asyncio
@ray.remote
def some_task():
return 1
async def await_obj_ref():
await some_task.remote()
await asyncio.wait([some_task.remote()])
asyncio.run(await_obj_ref())
在 Python 3.11 及以上版本,可以将 ObjectRef 显式转换为 asyncio.Future:
import asyncio
async def convert_to_asyncio_future():
ref = some_task.remote()
fut: asyncio.Future = asyncio.wrap_future(ref.future())
print(await fut)
asyncio.run(convert_to_asyncio_future())
定义 Async Actor
在 Ray 中,只要在 Actor 中使用 async def 定义方法,Ray 就会自动识别该 Actor 为 Async Actor,并支持异步调用。
import ray
import asyncio
@ray.remote
class AsyncActor:
def __init__(self, expected_num_tasks: int):
self._event = asyncio.Event()
self._curr_num_tasks = 0
self._expected_num_tasks = expected_num_tasks
async def run_task(self):
print("Started task")
self._curr_num_tasks += 1
if self._curr_num_tasks == self._expected_num_tasks:
self._event.set()
else:
# Yield the event loop for multiple coroutines to run concurrently.
await self._event.wait()
print("Finished task")
actor = AsyncActor.remote(5)
# All 5 tasks will start at once and run concurrently.
ray.get([actor.run_task.remote() for _ in range(5)])
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
在底层实现中,Ray 会在单个 Python 事件循环(event loop)中运行 Async Actor 的所有方法。
Warning
在 Async Actor 方法中:
- 不允许使用阻塞操作(如
ray.get、ray.wait) - 原因是这些操作会阻塞事件循环的执行,从而影响整个 Actor 的并发能力
Note
- 在 Async Actor 中,任意时刻只能有一个任务在执行
- 但多个任务可以通过
await实现协作式并发(multiplexing) - 整个 Actor 内部只有一个线程
设置 Async Actor 的并发度
在 Async Actor 中,可以通过 max_concurrency 参数来控制同时运行的“并发任务”数量。
默认情况下:最多允许 1000 个任务并发运行
import asyncio
import ray
@ray.remote
class AsyncActor:
def __init__(self, batch_size: int):
self._event = asyncio.Event()
self._curr_tasks = 0
self._batch_size = batch_size
async def run_task(self):
print("Started task")
self._curr_tasks += 1
if self._curr_tasks == self._batch_size:
self._event.set()
else:
await self._event.wait()
self._event.clear()
self._curr_tasks = 0
print("Finished task")
actor = AsyncActor.options(max_concurrency=2).remote(2)
# Only 2 tasks will run concurrently.
# Once 2 finish, the next 2 should run.
ray.get([actor.run_task.remote() for _ in range(8)])
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
Threaded Actors
Warning
在某些场景下,asyncio 并不是理想选择。
例如:
- 某些方法是计算密集型任务(CPU-bound)
- 某些方法内部不会
await(无法主动让出事件循环) - 某些方法会阻塞 event loop
这会严重影响 Async Actor 的性能,因为 Async Actor 依赖 await 来进行任务切换,本质上同一时间只能执行一个任务。
此时,可以通过设置 max_concurrency(且不使用 async def)来启用线程级并发,即类似线程池的执行模型。
Warning
- 只要 Actor 中存在任意一个
async def方法 - Ray 就会将其识别为 Async Actor(而不是 Threaded Actor)
@ray.remote
class ThreadedActor:
def task_1(self): print("I'm running in a thread!")
def task_2(self): print("I'm running in another thread!")
a = ThreadedActor.options(max_concurrency=2).remote()
ray.get([a.task_1.remote(), a.task_2.remote()])
I'm running in a thread!
I'm running in another thread!
Note
- 每个 Actor 调用会在线程池中执行
- 线程池大小由
max_concurrency控制 - 可实现真正的多线程并发执行
AsyncIO for Remote Tasks
Warning
Ray 不支持在 remote task 中直接使用 async def:
@ray.remote
async def f():
pass
正确做法:使用 wrapper function 包装 async 函数:
async def f():
pass
@ray.remote
def wrapper():
import asyncio
asyncio.run(f())