调度
Abstract
概述了 Ray 如何决定将 Task 和 Actor 调度到哪个节点。
标签(Labels)
标签提供了一种简化的方式,用于控制 task、actor 和 placement group bundle 的调度,可以使用默认标签或自定义标签。
标签目前是 beta 功能
随着该功能逐渐稳定,Ray 建议使用标签替代以下模式:
NodeAffinitySchedulingStrategy(当soft=false时),改用默认标签ray.io/node-idaccelerator_type选项,改用默认标签ray.io/accelerator-type
注意
过去推荐使用自定义资源来实现基于标签的调度,现在建议仅在需要基于数值管理调度时使用自定义资源。
资源(Resources)
每个 Task 或 Actor 都会声明其所需的资源。根据资源匹配情况,节点的调度状态可以划分为以下几类:
Feasible(可行):节点具备运行任务所需资源Available(可用):资源当前空闲Unavailable(不可用):资源存在但正在被使用
Infeasible(不可行):节点不具备所需资源(例如 GPU 任务调度到 CPU 节点)
资源需求属于硬约束,只有满足需求的节点才能被调度。
调度流程简要如下
- 若存在可行节点:
- Ray 会优先选择一个当前资源是
Available的节点运行任务; - 若全部节点是
Unavailable,则等待其中某个节点资源释放后调度;
- Ray 会优先选择一个当前资源是
- 若所有节点均是
Infeasible:- 任务将处于排队等待,只有当新的
Feasible节点加入集群后才会被调度执行。
- 任务将处于排队等待,只有当新的
调度策略(Scheduling Strategies)
Task 和 Actor 都可以通过 scheduling_strategy option 指定调度策略。
DEFAULT
DEFAULT 是 Ray 使用的默认调度策略,Ray 会将 Task 或 Actor 调度到一组前 k 个节点中。
具体来说,节点会按照以下优先级排序:
- 优先选择已经有 Task 或 Actor 在运行的节点(以提高数据局部性)
- 然后优先选择资源利用率较低的节点(以实现负载均衡)
在前 k 个节点中,Ray 会随机选择一个节点进行调度,以进一步改善负载均衡,并减少大规模集群中的冷启动延迟。
Ray 的 DEFAULT 调度策略的实现逻辑如下
-
资源利用率评分
- 对每个节点,Ray 根据逻辑资源利用率计算分数。
- 若利用率低于阈值(环境变量
RAY_scheduler_spread_threshold,默认 0.5),分数为 0; - 否则,分数等于当前资源利用率(比如完全占用时分数为 1)。
-
节点筛选与选择
- Ray 会从分数最低的前 k 个节点中,随机挑选一个用于调度任务。
-
k 的确定规则
- k 取下列两者最大值:
- 集群节点数 ×
RAY_scheduler_top_k_fraction RAY_scheduler_top_k_absolute
- 集群节点数 ×
- 默认情况下,k 为节点总数的 20%。
- k 取下列两者最大值:
这样的设计能兼顾数据局部性、资源负载均衡和调度效率。
不需要任何资源 Actor 的特殊处理
目前,Ray 对不需要任何资源的 Actor(即 num_cpus=0 且没有其他资源需求)有特殊处理:
- 会随机选择一个节点(不考虑资源利用率)
- 由于随机分布,这类 Actor 实际上会在整个集群中分散(
SPREAD)
@ray.remote
def func():
return 1
@ray.remote(num_cpus=1)
class Actor:
pass
# 如果未指定,则使用 "DEFAULT" 调度策略。
func.remote()
actor = Actor.remote()
# 显式设置调度策略为 "DEFAULT"。
func.options(scheduling_strategy="DEFAULT").remote()
actor = Actor.options(scheduling_strategy="DEFAULT").remote()
# 不需要任何资源的 Actor 会被随机分配到节点。
actor = Actor.options(num_cpus=0).remote()
SPREAD
SPREAD 策略会尝试将 Task 或 Actor 分散到可用节点上。
@ray.remote(scheduling_strategy="SPREAD")
def spread_func():
return 2
@ray.remote(num_cpus=1)
class SpreadActor:
pass
# 分散调度 Task 到集群中的所有节点
[spread_func.remote() for _ in range(10)]
# 分散调度 Actor 到集群中的所有节点
actors = [SpreadActor.options(scheduling_strategy="SPREAD").remote() for _ in range(10)]
Placement Group
PlacementGroupSchedulingStrategy 会将 Task 或 Actor 调度到 placement group 所在的位置。这对于 Actor 的组调度(gang scheduling)非常有用。
Node Affinity
NodeAffinitySchedulingStrategy 是一种低层级调度策略,它允许将 Task 或 Actor 调度到指定 node id 的特定节点上。
soft 标志的作用
soft 用于指定:当目标节点不存在(例如节点宕机)或不可行(没有足够资源)时,是否允许 Task 或 Actor 在其他节点上运行。
soft=True:当目标节点不可用或不可行时,Task 或 Actor 会被调度到其他可行节点soft=False:Task 或 Actor 会直接失败,并抛出TaskUnschedulableError或ActorUnschedulableError
只要指定的节点仍然存活且可行,Task 或 Actor 就只会在该节点上运行,与 soft 的取值无关。这意味着,如果该节点当前没有可用资源,Task 或 Actor 会等待资源释放后再执行。
已知限制
NodeAffinitySchedulingStrategy 仅应在其他高层调度策略(例如 placement group)无法满足需求时使用。其已知限制如下:
- 这是一个低层策略,会阻碍调度器进行优化
- 无法充分利用自动扩缩容集群(因为创建任务时必须提前知道 node id)
- 在多租户集群中很难做出最优的静态调度决策(例如应用无法知道其他任务在同一节点上的调度情况)
@ray.remote
def node_affinity_func():
return ray.get_runtime_context().get_node_id()
@ray.remote(num_cpus=1)
class NodeAffinityActor:
pass
# 仅在本地节点上运行 Task
node_affinity_func.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().get_node_id(),
soft=False,
)
).remote()
# 如果可能,在同一个节点上运行两个 node_affinity_func Task
node_affinity_func.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get(node_affinity_func.remote()),
soft=True,
)
).remote()
# 仅在本地节点上运行 Actor
actor = NodeAffinityActor.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().get_node_id(),
soft=False,
)
).remote()
数据局部性感知调度(Locality-Aware Scheduling)
默认情况下,Ray 会优先将任务调度到拥有最多本地输入数据的可用节点,从而减少网络数据传输、提升执行效率。若任务存在多个体积较大的参数,Ray 会优先选择那些本地存有最多相关对象数据(object bytes)的节点来运行任务。
该策略的优先级高于 DEFAULT 调度策略,这意味着 Ray 会优先尝试在具有数据局部性的节点上运行任务,而不考虑节点的资源利用率。
但是,如果该节点当前不可用,Ray 可能会将任务调度到其他节点。当指定了其他调度策略时,这些策略具有更高优先级,此时将不再考虑数据局部性。
Warning
数据局部性感知调度仅适用于 Task,不适用于 Actor。
@ray.remote
def large_object_func():
# 大型对象存储在本地对象存储中,并可在分布式内存中访问,而不是直接返回给调用者。
return [1] * (1024 * 1024)
@ray.remote
def small_object_func():
# 小型对象直接返回给调用者,而不是存储在分布式内存中。
return [1]
@ray.remote
def consume_func(data):
return len(data)
large_object = large_object_func.remote()
small_object = small_object_func.remote()
# Ray 会尝试在同一个节点上运行 consume_func,而不是在 large_object_func 所在的节点上运行。
consume_func.remote(large_object)
# Ray 会尝试将 consume_func 分散调度到整个集群,而不是只在 large_object_func 所在的节点上运行。
[
consume_func.options(scheduling_strategy="SPREAD").remote(large_object)
for i in range(10)
]
# Ray 不会考虑数据局部性来调度 consume_func,因为参数较小,会被直接发送到 worker 节点。
consume_func.remote(small_object)