跳转至

Actor 并发

Abstract

在单个 Actor 进程内部,可以执行并发操作(例如多个线程)。

Ray 在 Actor 中提供了两种并发方式:

  • 异步执行(async execution)
  • 多线程(threading)

类型 并发方式 适用场景
Async Actor 协程(event loop) IO 密集、异步编程
Threaded Actor 多线程(thread pool) CPU 密集、阻塞任务

Warning

Remote Task:不能直接使用 async def,需要包装为同步函数

需要注意的是,Python 的 GIL(Global Interpreter Lock) 会限制:同一时间只能有一个线程执行 Python 代码。

这意味着:

  • 如果并行执行的仅是纯 Python 代码,由于 GIL 限制,无法实现真正的多线程并行计算。
  • 如果你的代码主要调用如 NumPy、Cython、TensorFlow、PyTorch 等底层会主动释放 GIL 的 C/C++ 扩展库,那么可以实现真正的多线程并行,从而加速计算密集型或 IO 密集型的操作。

注意

无论是基于线程的 Actor(Threaded Actors),还是基于 AsyncIO 的 Actor 模型,都无法绕过 Python 的 GIL(全局解释器锁)。


AsyncIO for Actors

Ray 原生集成 asyncio,可以无缝配合 async / await 以及各类异步框架使用,例如:aiohttpaioredis 等。

import ray
import asyncio

@ray.remote
class AsyncActor:
    def __init__(self, expected_num_tasks: int):
        self._event = asyncio.Event()
        self._curr_num_tasks = 0
        self._expected_num_tasks = expected_num_tasks

    # Multiple invocations of this method can run concurrently on the same event loop.
    async def run_concurrent(self):
        self._curr_num_tasks += 1
        if self._curr_num_tasks == self._expected_num_tasks:
            print("All coroutines are executing concurrently, unblocking.")
            self._event.set()
        else:
            print("Waiting for other coroutines to start.")

        await self._event.wait()
        print("All coroutines ran concurrently.")

actor = AsyncActor.remote(4)
refs = [actor.run_concurrent.remote() for _ in range(4)]

# Fetch results using regular `ray.get`.
ray.get(refs)

# Fetch results using `asyncio` APIs.
async def get_async():
    return await asyncio.gather(*refs)
asyncio.run(get_async())
(AsyncActor pid=9064) Waiting for other coroutines to start.
(AsyncActor pid=9064) Waiting for other coroutines to start.
(AsyncActor pid=9064) Waiting for other coroutines to start.
(AsyncActor pid=9064) All coroutines are executing concurrently, unblocking.
(AsyncActor pid=9064) All coroutines ran concurrently.
(AsyncActor pid=9064) All coroutines ran concurrently.
(AsyncActor pid=9064) All coroutines ran concurrently.
(AsyncActor pid=9064) All coroutines ran concurrently.

ObjectRef 作为 asyncio.Future

在 Ray 中,ObjectRef 可以转换为 asyncio.Future,从而可以在已有的异步程序中直接对 Ray 任务进行 await

同步写法(传统方式)
import ray

@ray.remote
def some_task():
    return 1

ray.get(some_task.remote())
ray.wait([some_task.remote()])

在 Python 3.9 和 3.10 中,可以直接对 ObjectRef 使用 await 语法:

异步写法(Python 3.9 / 3.10)
import ray
import asyncio

@ray.remote
def some_task():
    return 1

async def await_obj_ref():
    await some_task.remote()
    await asyncio.wait([some_task.remote()])

asyncio.run(await_obj_ref())

在 Python 3.11 及以上版本,可以将 ObjectRef 显式转换为 asyncio.Future

使用 asyncio.Future(Python 3.11+)
import asyncio

async def convert_to_asyncio_future():
    ref = some_task.remote()
    fut: asyncio.Future = asyncio.wrap_future(ref.future())
    print(await fut)
asyncio.run(convert_to_asyncio_future())

定义 Async Actor

在 Ray 中,只要在 Actor 中使用 async def 定义方法,Ray 就会自动识别该 Actor 为 Async Actor,并支持异步调用。

import ray
import asyncio


@ray.remote
class AsyncActor:
    def __init__(self, expected_num_tasks: int):
        self._event = asyncio.Event()
        self._curr_num_tasks = 0
        self._expected_num_tasks = expected_num_tasks

    async def run_task(self):
        print("Started task")
        self._curr_num_tasks += 1
        if self._curr_num_tasks == self._expected_num_tasks:
            self._event.set()
        else:
            # Yield the event loop for multiple coroutines to run concurrently.
            await self._event.wait()

        print("Finished task")

actor = AsyncActor.remote(5)
# All 5 tasks will start at once and run concurrently.
ray.get([actor.run_task.remote() for _ in range(5)])
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task

在底层实现中,Ray 会在单个 Python 事件循环(event loop)中运行 Async Actor 的所有方法。

Warning

在 Async Actor 方法中:

  • 不允许使用阻塞操作(如 ray.getray.wait
  • 原因是这些操作会阻塞事件循环的执行,从而影响整个 Actor 的并发能力

Note

  • 在 Async Actor 中,任意时刻只能有一个任务在执行
  • 但多个任务可以通过 await 实现协作式并发(multiplexing)
  • 整个 Actor 内部只有一个线程

设置 Async Actor 的并发度

在 Async Actor 中,可以通过 max_concurrency 参数来控制同时运行的“并发任务”数量。

默认情况下:最多允许 1000 个任务并发运行

import asyncio
import ray

@ray.remote
class AsyncActor:
    def __init__(self, batch_size: int):
        self._event = asyncio.Event()
        self._curr_tasks = 0
        self._batch_size = batch_size

    async def run_task(self):
        print("Started task")
        self._curr_tasks += 1
        if self._curr_tasks == self._batch_size:
            self._event.set()
        else:
            await self._event.wait()
            self._event.clear()
            self._curr_tasks = 0

        print("Finished task")

actor = AsyncActor.options(max_concurrency=2).remote(2)

# Only 2 tasks will run concurrently.
# Once 2 finish, the next 2 should run.
ray.get([actor.run_task.remote() for _ in range(8)])
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task

Threaded Actors

Warning

在某些场景下,asyncio 并不是理想选择。

例如:

  • 某些方法是计算密集型任务(CPU-bound)
  • 某些方法内部不会 await(无法主动让出事件循环)
  • 某些方法会阻塞 event loop

这会严重影响 Async Actor 的性能,因为 Async Actor 依赖 await 来进行任务切换,本质上同一时间只能执行一个任务。

此时,可以通过设置 max_concurrency(且不使用 async def)来启用线程级并发,即类似线程池的执行模型。

Warning

  • 只要 Actor 中存在任意一个 async def 方法
  • Ray 就会将其识别为 Async Actor(而不是 Threaded Actor)
@ray.remote
class ThreadedActor:
    def task_1(self): print("I'm running in a thread!")
    def task_2(self): print("I'm running in another thread!")

a = ThreadedActor.options(max_concurrency=2).remote()
ray.get([a.task_1.remote(), a.task_2.remote()])
I'm running in a thread!
I'm running in another thread!

Note

  • 每个 Actor 调用会在线程池中执行
  • 线程池大小由 max_concurrency 控制
  • 可实现真正的多线程并发执行

AsyncIO for Remote Tasks

Warning

Ray 不支持在 remote task 中直接使用 async def

@ray.remote
async def f():
    pass

正确做法:使用 wrapper function 包装 async 函数:

async def f():
    pass

@ray.remote
def wrapper():
    import asyncio
    asyncio.run(f())