并发组
Abstract
除了为 Actor 设置整体并发度(max_concurrency),Ray 还支持通过 Concurrency Groups(并发组) 对不同方法进行细粒度的并发控制。
总结:
max_concurrency:控制 Actor 整体并发上限concurrency_groups:控制方法级并发隔离- 默认组始终存在,可通过
max_concurrency调整 - 支持静态定义(
concurrency_group) + 动态切换(.options(concurrency_group="..."))
你可以将方法划分到不同的并发组中,每个组拥有独立的并发配额(线程 / 协程)。
使用场景:
- 为不同类型的方法设置不同并发策略
- 将轻量任务(如 health-check)与重任务(如请求处理)隔离
- 避免某类方法“占满”全部并发资源
Tip
Concurrency Groups 同时适用于:
- Async Actor
- Threaded Actor
使用方式完全一致。
定义 Concurrency Groups
你可以使用 concurrency_group 装饰器参数来定义并发组:
示例:定义 2 个并发组
import ray
@ray.remote(concurrency_groups={"io": 2, "compute": 4})
class AsyncIOActor:
def __init__(self):
pass
@ray.method(concurrency_group="io")
async def f1(self):
pass
@ray.method(concurrency_group="io")
async def f2(self):
pass
@ray.method(concurrency_group="compute")
async def f3(self):
pass
@ray.method(concurrency_group="compute")
async def f4(self):
pass
async def f5(self):
pass
a = AsyncIOActor.remote()
a.f1.remote() # executed in the "io" group.
a.f2.remote() # executed in the "io" group.
a.f3.remote() # executed in the "compute" group.
a.f4.remote() # executed in the "compute" group.
a.f5.remote() # executed in the default group.
默认并发组
Note
每个 Actor 都存在一个默认并发组(default group):
- Async Actor:默认并发度为 1000
- Threaded / 普通 Actor:默认并发度为 1
可以通过 max_concurrency 修改默认组:
示例:覆盖默认组的最大并发数
@ray.remote(concurrency_groups={"io": 2})
class AsyncIOActor:
async def f1(self):
pass
actor = AsyncIOActor.options(max_concurrency=10).remote()
运行时指定并发组
除了在定义时绑定方法,你还可以在调用时动态指定并发组:
# 使用 Actor 定义的并发组配置
a.f2.options().remote()
# 强制指定 "compute" 并发组
a.f2.options(concurrency_group="compute").remote()