跳转至

容错

Abstract

Ray 是一个分布式系统,这意味着故障是可能发生的。通常,Ray 将故障分为两类:

  • 应用级故障(application-level failures), 例如用户代码中的 bug 或外部系统故障
  • 系统级故障(system-level failures),例如节点故障、网络故障,或 Ray 本身的 bug

为了处理应用级故障,Ray 提供了以下机制:

  • 捕获错误
  • 重试失败的代码
  • 处理异常行为的代码

Ray 还提供了多种机制,用于自动恢复系统级故障,例如节点故障。特别地,Ray 可以自动恢复分布式 object store 中的部分故障。


编写具备容错能力的 Ray 应用

为了让 Ray 应用具备容错能力,有几个建议:

必要时手动兜底

若 Ray 提供的容错机制不适用,你始终可以捕获由故障引起的异常并手动恢复。

@ray.remote
class Actor:
    def read_only(self):
        import sys
        import random

        rand = random.random()
        if rand < 0.2:
            return 2 / 0
        elif rand < 0.3:
            sys.exit(1)

        return 2


actor = Actor.remote()
# Manually retry the actor task.
while True:
    try:
        print(ray.get(actor.read_only.remote()))
        break
    except ZeroDivisionError:
        pass
    except ray.exceptions.RayActorError:
        # Manually restart the actor
        actor = Actor.remote()

避免 ObjectRef 超过 owner 的生命周期

确保 ObjectRef 的生命周期不要超过其 owner(task 或 actor)的生命周期。

Note

这里的 owner 指的是:最初通过 ray.put()foo.remote() 创建该 ObjectRef 的 task 或 actor。

当仍有引用指向某个 ObjectRef 时,即使创建该对象的 task 或 actor 已经结束,其 owner worker 进程仍会继续存活以维持对象的可用性。

但如果 owner worker 进程发生异常退出,Ray 无法自动恢复该对象,之后任何对该 ObjectRef 的访问都会失败

Example

一个典型的情况是:在 task 中返回由 ray.put() 创建的 ObjectRef:

import ray

# 不具容错能力的版本
@ray.remote
def a():
    x_ref = ray.put(1)
    return x_ref

x_ref = ray.get(a.remote())

# Object x 的生命周期超过了其 owner task a
try:
    print(ray.get(x_ref))
except ray.exceptions.OwnerDiedError:
    pass

在该示例中:

  • Object x 的 owner 是执行 task a 的 worker
  • 一旦该 worker 进程失败
  • 后续 ray.get(x_ref) 将抛出 OwnerDiedError,且无法恢复

推荐直接返回值,而不是返回 ray.put() 产生的 ObjectRef:

# 具容错能力的版本
@ray.remote
def a():
    return 1

# x 的 owner 是 driver
x_ref = a.remote()
print(ray.get(x_ref))

在该示例中:

  • Object x 的 owner 是 driver
  • 在 driver 生命周期内始终可访问
  • 如果数据丢失,Ray 可通过 lineage reconstruction 自动恢复

避免把任务“绑死”到某台机器

避免使用只能由特定节点满足的资源约束。

Warning

如果该节点发生故障,Ray 将无法在其他节点上重试任务或 Actor,从而导致任务失败。

@ray.remote
def b():
    return 1

# 如果 ip 为 127.0.0.3 的节点在 task b 运行时故障,
# Ray 无法在其他节点上重试任务 b
b.options(resources={"node:127.0.0.3": 1}).remote()

这种写法会将任务“硬绑定”到某个节点,一旦该节点不可用,任务将无法恢复。

如果你只是“倾向于”让任务运行在某个节点(而不是强制要求),可以使用 Node Affinity Scheduling Strategy 并设置为软约束:

# 优先在指定 node id 的节点上运行
# 但如果目标节点失败,也可以在其他节点上运行
b.options(
        scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
            node_id=ray.get_runtime_context().get_node_id(),
            soft=True
        )
    ).remote()