跳转至

调度

Abstract

概述了 Ray 如何决定将 Task 和 Actor 调度到哪个节点。

标签(Labels)

标签提供了一种简化的方式,用于控制 task、actor 和 placement group bundle 的调度,可以使用默认标签或自定义标签。

标签目前是 beta 功能

随着该功能逐渐稳定,Ray 建议使用标签替代以下模式:

  • NodeAffinitySchedulingStrategy(当 soft=false 时),改用默认标签 ray.io/node-id
  • accelerator_type 选项,改用默认标签 ray.io/accelerator-type

注意

过去推荐使用自定义资源来实现基于标签的调度,现在建议仅在需要基于数值管理调度时使用自定义资源。


资源(Resources)

每个 Task 或 Actor 都会声明其所需的资源。根据资源匹配情况,节点的调度状态可以划分为以下几类:

  • Feasible(可行):节点具备运行任务所需资源
    • Available(可用):资源当前空闲
    • Unavailable(不可用):资源存在但正在被使用
  • Infeasible(不可行):节点不具备所需资源(例如 GPU 任务调度到 CPU 节点)

资源需求属于硬约束,只有满足需求的节点才能被调度。

调度流程简要如下

  • 若存在可行节点:
    • Ray 会优先选择一个当前资源是 Available 的节点运行任务;
    • 若全部节点是 Unavailable ,则等待其中某个节点资源释放后调度;
  • 若所有节点均是 Infeasible
    • 任务将处于排队等待,只有当新的 Feasible 节点加入集群后才会被调度执行。

调度策略(Scheduling Strategies)

Task 和 Actor 都可以通过 scheduling_strategy option 指定调度策略。


DEFAULT

DEFAULT 是 Ray 使用的默认调度策略,Ray 会将 Task 或 Actor 调度到一组前 k 个节点中。

具体来说,节点会按照以下优先级排序:

  • 优先选择已经有 Task 或 Actor 在运行的节点(以提高数据局部性)
  • 然后优先选择资源利用率较低的节点(以实现负载均衡)

在前 k 个节点中,Ray 会随机选择一个节点进行调度,以进一步改善负载均衡,并减少大规模集群中的冷启动延迟。

Ray 的 DEFAULT 调度策略的实现逻辑如下

  1. 资源利用率评分

    • 对每个节点,Ray 根据逻辑资源利用率计算分数。
    • 若利用率低于阈值(环境变量 RAY_scheduler_spread_threshold,默认 0.5),分数为 0;
    • 否则,分数等于当前资源利用率(比如完全占用时分数为 1)。
  2. 节点筛选与选择

    • Ray 会从分数最低的前 k 个节点中,随机挑选一个用于调度任务。
  3. k 的确定规则

    • k 取下列两者最大值:
      • 集群节点数 × RAY_scheduler_top_k_fraction
      • RAY_scheduler_top_k_absolute
    • 默认情况下,k 为节点总数的 20%。

这样的设计能兼顾数据局部性资源负载均衡调度效率

不需要任何资源 Actor 的特殊处理

目前,Ray 对不需要任何资源的 Actor(即 num_cpus=0 且没有其他资源需求)有特殊处理:

  • 随机选择一个节点(不考虑资源利用率)
  • 由于随机分布,这类 Actor 实际上会在整个集群中分散(SPREAD
@ray.remote
def func():
    return 1

@ray.remote(num_cpus=1)
class Actor:
    pass

# 如果未指定,则使用 "DEFAULT" 调度策略。
func.remote()
actor = Actor.remote()

# 显式设置调度策略为 "DEFAULT"。
func.options(scheduling_strategy="DEFAULT").remote()
actor = Actor.options(scheduling_strategy="DEFAULT").remote()

# 不需要任何资源的 Actor 会被随机分配到节点。
actor = Actor.options(num_cpus=0).remote()

SPREAD

SPREAD 策略会尝试将 Task 或 Actor 分散到可用节点上。

@ray.remote(scheduling_strategy="SPREAD")
def spread_func():
    return 2

@ray.remote(num_cpus=1)
class SpreadActor:
    pass

# 分散调度 Task 到集群中的所有节点
[spread_func.remote() for _ in range(10)]
# 分散调度 Actor 到集群中的所有节点
actors = [SpreadActor.options(scheduling_strategy="SPREAD").remote() for _ in range(10)]

Placement Group

PlacementGroupSchedulingStrategy 会将 Task 或 Actor 调度到 placement group 所在的位置。这对于 Actor 的组调度(gang scheduling)非常有用。


Node Affinity

NodeAffinitySchedulingStrategy 是一种低层级调度策略,它允许将 Task 或 Actor 调度到指定 node id 的特定节点上。

soft 标志的作用

soft 用于指定:当目标节点不存在(例如节点宕机)或不可行(没有足够资源)时,是否允许 Task 或 Actor 在其他节点上运行。

  • soft=True:当目标节点不可用或不可行时,Task 或 Actor 会被调度到其他可行节点
  • soft=False:Task 或 Actor 会直接失败,并抛出 TaskUnschedulableErrorActorUnschedulableError

只要指定的节点仍然存活且可行,Task 或 Actor 就只会在该节点上运行,与 soft 的取值无关。这意味着,如果该节点当前没有可用资源,Task 或 Actor 会等待资源释放后再执行。

已知限制

NodeAffinitySchedulingStrategy 仅应在其他高层调度策略(例如 placement group)无法满足需求时使用。其已知限制如下:

  • 这是一个低层策略,会阻碍调度器进行优化
  • 无法充分利用自动扩缩容集群(因为创建任务时必须提前知道 node id)
  • 在多租户集群中很难做出最优的静态调度决策(例如应用无法知道其他任务在同一节点上的调度情况)
@ray.remote
def node_affinity_func():
    return ray.get_runtime_context().get_node_id()


@ray.remote(num_cpus=1)
class NodeAffinityActor:
    pass


# 仅在本地节点上运行 Task
node_affinity_func.options(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get_runtime_context().get_node_id(),
        soft=False,
    )
).remote()

# 如果可能,在同一个节点上运行两个 node_affinity_func Task
node_affinity_func.options(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get(node_affinity_func.remote()),
        soft=True,
    )
).remote()

# 仅在本地节点上运行 Actor
actor = NodeAffinityActor.options(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get_runtime_context().get_node_id(),
        soft=False,
    )
).remote()

数据局部性感知调度(Locality-Aware Scheduling)

默认情况下,Ray 会优先将任务调度到拥有最多本地输入数据的可用节点,从而减少网络数据传输、提升执行效率。若任务存在多个体积较大的参数,Ray 会优先选择那些本地存有最多相关对象数据(object bytes)的节点来运行任务。

该策略的优先级高于 DEFAULT 调度策略,这意味着 Ray 会优先尝试在具有数据局部性的节点上运行任务,而不考虑节点的资源利用率。

但是,如果该节点当前不可用,Ray 可能会将任务调度到其他节点。当指定了其他调度策略时,这些策略具有更高优先级,此时将不再考虑数据局部性。

Warning

数据局部性感知调度仅适用于 Task,不适用于 Actor。

@ray.remote
def large_object_func():
    # 大型对象存储在本地对象存储中,并可在分布式内存中访问,而不是直接返回给调用者。
    return [1] * (1024 * 1024)

@ray.remote
def small_object_func():
    # 小型对象直接返回给调用者,而不是存储在分布式内存中。
    return [1]

@ray.remote
def consume_func(data):
    return len(data)

large_object = large_object_func.remote()
small_object = small_object_func.remote()

# Ray 会尝试在同一个节点上运行 consume_func,而不是在 large_object_func 所在的节点上运行。
consume_func.remote(large_object)

# Ray 会尝试将 consume_func 分散调度到整个集群,而不是只在 large_object_func 所在的节点上运行。
[
    consume_func.options(scheduling_strategy="SPREAD").remote(large_object)
    for i in range(10)
]

# Ray 不会考虑数据局部性来调度 consume_func,因为参数较小,会被直接发送到 worker 节点。
consume_func.remote(small_object)