跳转至

并发组

Abstract

除了为 Actor 设置整体并发度(max_concurrency),Ray 还支持通过 Concurrency Groups(并发组) 对不同方法进行细粒度的并发控制。

总结:

  • max_concurrency:控制 Actor 整体并发上限
  • concurrency_groups:控制方法级并发隔离
  • 默认组始终存在,可通过 max_concurrency 调整
  • 支持静态定义(concurrency_group) + 动态切换(.options(concurrency_group="...")

你可以将方法划分到不同的并发组中,每个组拥有独立的并发配额(线程 / 协程)。

使用场景:

  • 为不同类型的方法设置不同并发策略
  • 轻量任务(如 health-check)与重任务(如请求处理)隔离
  • 避免某类方法“占满”全部并发资源

Tip

Concurrency Groups 同时适用于:

  • Async Actor
  • Threaded Actor

使用方式完全一致。


定义 Concurrency Groups

你可以使用 concurrency_group 装饰器参数来定义并发组:

示例:定义 2 个并发组

import ray

@ray.remote(concurrency_groups={"io": 2, "compute": 4})
class AsyncIOActor:
    def __init__(self):
        pass

    @ray.method(concurrency_group="io")
    async def f1(self):
        pass

    @ray.method(concurrency_group="io")
    async def f2(self):
        pass

    @ray.method(concurrency_group="compute")
    async def f3(self):
        pass

    @ray.method(concurrency_group="compute")
    async def f4(self):
        pass

    async def f5(self):
        pass

a = AsyncIOActor.remote()
a.f1.remote()  # executed in the "io" group.
a.f2.remote()  # executed in the "io" group.
a.f3.remote()  # executed in the "compute" group.
a.f4.remote()  # executed in the "compute" group.
a.f5.remote()  # executed in the default group.

默认并发组

Note

每个 Actor 都存在一个默认并发组(default group):

  • Async Actor:默认并发度为 1000
  • Threaded / 普通 Actor:默认并发度为 1

可以通过 max_concurrency 修改默认组:

示例:覆盖默认组的最大并发数

@ray.remote(concurrency_groups={"io": 2})
class AsyncIOActor:
    async def f1(self):
        pass

actor = AsyncIOActor.options(max_concurrency=10).remote()

运行时指定并发组

除了在定义时绑定方法,你还可以在调用时动态指定并发组:

# 使用 Actor 定义的并发组配置
a.f2.options().remote()

# 强制指定 "compute" 并发组
a.f2.options(concurrency_group="compute").remote()