跳转至

Actors

Abstract

Ray 将 API 从函数(Tasks)扩展到了类(Classes)。Actor 本质上是一个有状态的 Worker(或服务)。当你实例化一个新的 Actor 时,Ray 会创建一个新的 Worker 进程,并将该 Actor 的方法调度到这个专属的 Worker 上执行。Actor 的方法可以访问和修改该 Worker 的状态。

Reference: Ray Actors

Ray Actors Example

@ray.remote 装饰器表示 Counter 类的实例是 Actor。每个 Actor 运行在独立的 Python 进程中。

import ray

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

    def get_counter(self):
        return self.value

# Create an actor from this class.
counter = Counter.remote()
可以使用 State API 中的 ray list actors 查看 Actor 的状态
# 需要安装 ray[default]:pip install "ray[default]"
ray list actors
======== List: 2023-05-25 10:10:50.095099 ========
Stats:
------------------------------
Total: 1

Table:
------------------------------
    ACTOR_ID                          CLASS_NAME    STATE    JOB_ID
 0  9e783840250840f87328c9f201000000  Counter       ALIVE    01000000

声明所需资源

与 Tasks 类似,可以在 @ray.remote 中为 Actor 指定资源需求。

@ray.remote(num_cpus=2, num_gpus=0.5)
class Actor:
    pass

调用 Actor

通过 .remote() 调用 Actor 的方法,返回的是一个 Object Ref,再用 ray.get() 获取实际值。

obj_ref = counter.increment.remote()
print(ray.get(obj_ref))
# 1

Actor 方法的执行规则

  • 不同 Actor 上的方法调用是并行执行的。
  • 同一个 Actor 上的方法调用按照调用顺序串行执行,且共享内部状态。
# 创建 10 个 Counter Actor
counters = [Counter.remote() for _ in range(10)]

# 对每个 Counter 调用一次 increment,这些任务并行执行
results = ray.get([c.increment.remote() for c in counters])
print(results)
# [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

# 对第一个 Counter 连续调用 5 次,这些任务串行执行并共享状态
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)
# [2, 3, 4, 5, 6]

传递 Actor 句柄

Actor 句柄可以作为参数传递给其他 Task 或 Actor 方法,从而实现跨 Task 和 Actor 之间的交互。

import time

@ray.remote
def f(counter):
    for _ in range(10):
        time.sleep(0.1)
        counter.increment.remote()

counter = Counter.remote()

# 启动 3 个任务,它们共享同一个 Counter Actor
[f.remote(counter) for _ in range(3)]

# 定期打印计数器的值
for _ in range(10):
    time.sleep(0.1)
    print(ray.get(counter.get_counter.remote()))
0
3
8
10
15
18
20
25
30
30

核心要点

多个 Task 可以同时持有同一个 Actor 的句柄,并并发地调用该 Actor 的方法。Actor 内部会按顺序处理这些请求,保证状态的一致性。

类型提示与静态类型检查

Ray 支持 Python 类型提示,以获得更好的 IDE 支持和静态类型检查。推荐做法:

  • 使用 ray.remote(MyClass) 代替 @ray.remote 装饰器,以保留原始类型信息。
  • 使用 @ray.method 装饰 Actor 方法。
  • 使用 ActorClassActorProxy 类型注解。
import ray
from ray.actor import ActorClass, ActorProxy

class Counter:
    def __init__(self):
        self.value = 0

    @ray.method
    def increment(self) -> int:
        self.value += 1
        return self.value

CounterActor: ActorClass[Counter] = ray.remote(Counter)
counter: ActorProxy[Counter] = CounterActor.remote()

# 类型检查器和 IDE 能正确推断远程方法的类型
obj_ref: ray.ObjectRef[int] = counter.increment.remote()
print(ray.get(obj_ref))

Generators

Ray 兼容 Python 生成器语法。

取消 Actor 任务

通过对返回的 ObjectRef 调用 ray.cancel() 来取消 Actor 任务。

import ray
import asyncio
import time

@ray.remote
class Actor:
    async def f(self):
        try:
            await asyncio.sleep(5)
        except asyncio.CancelledError:
            print("Actor task canceled.")

actor = Actor.remote()
ref = actor.f.remote()

time.sleep(1)
ray.cancel(ref)

try:
    ray.get(ref)
except ray.exceptions.RayTaskError:
    print("Object reference was cancelled.")

取消行为取决于任务的当前状态

任务状态 取消行为
未调度的任务 (Unscheduled) 如果 Ray 尚未调度该 Actor Task,Ray 会尝试取消调度。取消成功后,调用 ray.get(actor_task_ref) 会抛出 TaskCancelledError
运行中的普通/多线程 Actor 任务 (Regular / Threaded Actor) 对于单线程 Actor 或多线程 Actor 的任务,Ray 会设置一个取消标志(cancellation flag),任务内部可以通过 ray.get_runtime_context().is_canceled() 定期检查该标志,从而实现优雅取消(graceful cancellation)。
运行中的异步 Actor 任务 (Async Actor) Ray 会尝试取消关联的 asyncio.Task,遵循 Python 标准的 asyncio 任务取消机制。注意:如果异步函数内部没有 awaitasyncio.Task 不会在执行过程中被中断。另外,ray.get_runtime_context().is_canceled() 不支持异步 Actor,调用会抛出 RuntimeError

取消保证 (Cancellation Guarantee)

Ray 以 best-effort 方式尝试取消任务,这意味着取消不一定总能成功。例如,如果取消请求没有到达执行器(executor),任务可能不会被取消。可以通过 ray.get(actor_task_ref) 检查任务是否被成功取消。

递归取消 (Recursive Cancellation)

Ray 会追踪所有子任务和 Actor 任务。当传入 recursive=True 参数时,Ray 会同时取消所有子任务和 Actor 任务。

检测运行中 Actor 任务的取消

对于非异步 Actor 任务,可以通过定期调用 ray.get_runtime_context().is_canceled() 来检测取消请求,从而在退出前执行清理操作。

iimport ray
import time


@ray.remote
class SyncActor:
    def __init__(self):
        self.is_canceled = False

    def long_running_method(self):
        """A sync actor method that checks for cancellation periodically."""
        for i in range(100):
            # For sync actor tasks, is_canceled() can be checked in the task body
            if ray.get_runtime_context().is_canceled():
                self.is_canceled = True
                print("Actor task canceled, cleaning up...")
                return "canceled"
            time.sleep(0.1)
        return "completed"

    def get_cancel_status(self):
        return self.is_canceled


# Sync actor task cancellation with periodic checking
actor = SyncActor.remote()
actor_task_ref = actor.long_running_method.remote()

# Wait until task is scheduled.
time.sleep(1)
ray.cancel(actor_task_ref)

# The TaskCancelledError will be raised when calling ray.get
try:
    result = ray.get(actor_task_ref)
except ray.exceptions.TaskCancelledError:
    print("Actor task was cancelled")

# The get_cancel_status will return True after cancellation
cancel_status = ray.get(actor.get_cancel_status.remote())
print(f"Actor detected cancellation: {cancel_status}")

注意

  • 对于非异步 Actor 任务,不支持直接中断,需要通过 is_canceled() 定期检查来检测取消请求。
  • is_canceled() 不支持异步 Actor 任务,调用会抛出 RuntimeError

调度 (Scheduling)

对于每一个 Actor,Ray 会选择一个节点来运行它。调度决策基于以下因素:

  • Actor 的资源需求
  • 指定的调度策略 (Scheduling Strategy)

容错机制 (Fault Tolerance)

默认行为

默认情况下,Actor 崩溃时不会自动重启,Actor 任务也不会自动重试

可以通过在 ray.remote().options() 中设置以下参数来开启容错:

  • max_restarts:Actor 崩溃后的最大重启次数
  • max_task_retries:Actor 任务失败后的最大重试次数

FAQ: Actors、Workers 与 Resources

Worker 和 Actor 有什么区别?

每个 "Ray Worker" 都是一个 Python 进程。Ray 对 Worker 在 Tasks 和 Actors 场景下的使用方式不同:

Tasks 场景:

  • Ray 启动时会自动创建多个 Worker(默认每个 CPU 一个),类似进程池。
  • 同一个 Worker 可以执行多个不同的 Task。
  • 如果执行 8 个 num_cpus=2 的任务,而集群共有 16 个 CPU,那么会有 8 个 Worker 处于空闲状态。

Actors 场景:

  • Actor 在运行时通过 actor_cls.remote() 实例化,每个 Actor 对应一个专属 Worker 进程。
  • 该 Actor 的所有方法都在同一个进程中运行,使用 Actor 定义时指定的资源。
  • 与 Task 不同,Ray 不会复用 Actor 的 Python 进程——当 Actor 被删除时,对应的进程也会被终止。

最佳实践

为了最大化资源利用率,应尽量让 Worker 保持忙碌,并分配足够的集群资源来运行所有 Actor 和 Task。如果你的场景不需要维护状态,使用 Task 会比 Actor 更灵活高效。

Task 事件 (Task Events)

默认情况下,Ray 会追踪 Actor 任务的执行过程,报告任务状态事件和性能分析事件,供 Ray Dashboard 和 State API 使用。

可以通过在 ray.remote().options() 中设置 enable_task_events=False 来禁用任务事件报告,以减少执行开销。

还可以为特定的 Actor 方法单独设置,方法级别的设置会覆盖 Actor 级别的设置:

@ray.remote
class FooActor:
    # Disable task events reporting for this method.
    @ray.method(enable_task_events=False)
    def foo(self):
        pass

foo_actor = FooActor.remote()
ray.get(foo_actor.foo.remote())