Actors
Abstract
Ray 将 API 从函数(Tasks)扩展到了类(Classes)。Actor 本质上是一个有状态的 Worker(或服务)。当你实例化一个新的 Actor 时,Ray 会创建一个新的 Worker 进程,并将该 Actor 的方法调度到这个专属的 Worker 上执行。Actor 的方法可以访问和修改该 Worker 的状态。
Reference: Ray Actors
Ray Actors Example
@ray.remote 装饰器表示 Counter 类的实例是 Actor。每个 Actor 运行在独立的 Python 进程中。
import ray
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
def get_counter(self):
return self.value
# Create an actor from this class.
counter = Counter.remote()
可以使用 State API 中的 ray list actors 查看 Actor 的状态
# 需要安装 ray[default]:pip install "ray[default]"
ray list actors
======== List: 2023-05-25 10:10:50.095099 ========
Stats:
------------------------------
Total: 1
Table:
------------------------------
ACTOR_ID CLASS_NAME STATE JOB_ID
0 9e783840250840f87328c9f201000000 Counter ALIVE 01000000
声明所需资源
与 Tasks 类似,可以在 @ray.remote 中为 Actor 指定资源需求。
@ray.remote(num_cpus=2, num_gpus=0.5)
class Actor:
pass
调用 Actor
通过 .remote() 调用 Actor 的方法,返回的是一个 Object Ref,再用 ray.get() 获取实际值。
obj_ref = counter.increment.remote()
print(ray.get(obj_ref))
# 1
Actor 方法的执行规则
- 不同 Actor 上的方法调用是并行执行的。
- 同一个 Actor 上的方法调用按照调用顺序串行执行,且共享内部状态。
# 创建 10 个 Counter Actor
counters = [Counter.remote() for _ in range(10)]
# 对每个 Counter 调用一次 increment,这些任务并行执行
results = ray.get([c.increment.remote() for c in counters])
print(results)
# [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
# 对第一个 Counter 连续调用 5 次,这些任务串行执行并共享状态
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)
# [2, 3, 4, 5, 6]
传递 Actor 句柄
Actor 句柄可以作为参数传递给其他 Task 或 Actor 方法,从而实现跨 Task 和 Actor 之间的交互。
import time
@ray.remote
def f(counter):
for _ in range(10):
time.sleep(0.1)
counter.increment.remote()
counter = Counter.remote()
# 启动 3 个任务,它们共享同一个 Counter Actor
[f.remote(counter) for _ in range(3)]
# 定期打印计数器的值
for _ in range(10):
time.sleep(0.1)
print(ray.get(counter.get_counter.remote()))
0
3
8
10
15
18
20
25
30
30
核心要点
多个 Task 可以同时持有同一个 Actor 的句柄,并并发地调用该 Actor 的方法。Actor 内部会按顺序处理这些请求,保证状态的一致性。
类型提示与静态类型检查
Ray 支持 Python 类型提示,以获得更好的 IDE 支持和静态类型检查。推荐做法:
- 使用
ray.remote(MyClass)代替@ray.remote装饰器,以保留原始类型信息。 - 使用
@ray.method装饰 Actor 方法。 - 使用
ActorClass和ActorProxy类型注解。
import ray
from ray.actor import ActorClass, ActorProxy
class Counter:
def __init__(self):
self.value = 0
@ray.method
def increment(self) -> int:
self.value += 1
return self.value
CounterActor: ActorClass[Counter] = ray.remote(Counter)
counter: ActorProxy[Counter] = CounterActor.remote()
# 类型检查器和 IDE 能正确推断远程方法的类型
obj_ref: ray.ObjectRef[int] = counter.increment.remote()
print(ray.get(obj_ref))
Generators
Ray 兼容 Python 生成器语法。
取消 Actor 任务
通过对返回的 ObjectRef 调用 ray.cancel() 来取消 Actor 任务。
import ray
import asyncio
import time
@ray.remote
class Actor:
async def f(self):
try:
await asyncio.sleep(5)
except asyncio.CancelledError:
print("Actor task canceled.")
actor = Actor.remote()
ref = actor.f.remote()
time.sleep(1)
ray.cancel(ref)
try:
ray.get(ref)
except ray.exceptions.RayTaskError:
print("Object reference was cancelled.")
取消行为取决于任务的当前状态
| 任务状态 | 取消行为 |
|---|---|
| 未调度的任务 (Unscheduled) | 如果 Ray 尚未调度该 Actor Task,Ray 会尝试取消调度。取消成功后,调用 ray.get(actor_task_ref) 会抛出 TaskCancelledError。 |
| 运行中的普通/多线程 Actor 任务 (Regular / Threaded Actor) | 对于单线程 Actor 或多线程 Actor 的任务,Ray 会设置一个取消标志(cancellation flag),任务内部可以通过 ray.get_runtime_context().is_canceled() 定期检查该标志,从而实现优雅取消(graceful cancellation)。 |
| 运行中的异步 Actor 任务 (Async Actor) | Ray 会尝试取消关联的 asyncio.Task,遵循 Python 标准的 asyncio 任务取消机制。注意:如果异步函数内部没有 await,asyncio.Task 不会在执行过程中被中断。另外,ray.get_runtime_context().is_canceled() 不支持异步 Actor,调用会抛出 RuntimeError。 |
取消保证 (Cancellation Guarantee)
Ray 以 best-effort 方式尝试取消任务,这意味着取消不一定总能成功。例如,如果取消请求没有到达执行器(executor),任务可能不会被取消。可以通过 ray.get(actor_task_ref) 检查任务是否被成功取消。
递归取消 (Recursive Cancellation)
Ray 会追踪所有子任务和 Actor 任务。当传入 recursive=True 参数时,Ray 会同时取消所有子任务和 Actor 任务。
检测运行中 Actor 任务的取消
对于非异步 Actor 任务,可以通过定期调用 ray.get_runtime_context().is_canceled() 来检测取消请求,从而在退出前执行清理操作。
iimport ray
import time
@ray.remote
class SyncActor:
def __init__(self):
self.is_canceled = False
def long_running_method(self):
"""A sync actor method that checks for cancellation periodically."""
for i in range(100):
# For sync actor tasks, is_canceled() can be checked in the task body
if ray.get_runtime_context().is_canceled():
self.is_canceled = True
print("Actor task canceled, cleaning up...")
return "canceled"
time.sleep(0.1)
return "completed"
def get_cancel_status(self):
return self.is_canceled
# Sync actor task cancellation with periodic checking
actor = SyncActor.remote()
actor_task_ref = actor.long_running_method.remote()
# Wait until task is scheduled.
time.sleep(1)
ray.cancel(actor_task_ref)
# The TaskCancelledError will be raised when calling ray.get
try:
result = ray.get(actor_task_ref)
except ray.exceptions.TaskCancelledError:
print("Actor task was cancelled")
# The get_cancel_status will return True after cancellation
cancel_status = ray.get(actor.get_cancel_status.remote())
print(f"Actor detected cancellation: {cancel_status}")
注意
- 对于非异步 Actor 任务,不支持直接中断,需要通过
is_canceled()定期检查来检测取消请求。 is_canceled()不支持异步 Actor 任务,调用会抛出RuntimeError。
调度 (Scheduling)
对于每一个 Actor,Ray 会选择一个节点来运行它。调度决策基于以下因素:
- Actor 的资源需求
- 指定的调度策略 (Scheduling Strategy)
容错机制 (Fault Tolerance)
默认行为
默认情况下,Actor 崩溃时不会自动重启,Actor 任务也不会自动重试。
可以通过在 ray.remote() 或 .options() 中设置以下参数来开启容错:
max_restarts:Actor 崩溃后的最大重启次数max_task_retries:Actor 任务失败后的最大重试次数
FAQ: Actors、Workers 与 Resources
Worker 和 Actor 有什么区别?
每个 "Ray Worker" 都是一个 Python 进程。Ray 对 Worker 在 Tasks 和 Actors 场景下的使用方式不同:
Tasks 场景:
- Ray 启动时会自动创建多个 Worker(默认每个 CPU 一个),类似进程池。
- 同一个 Worker 可以执行多个不同的 Task。
- 如果执行 8 个
num_cpus=2的任务,而集群共有 16 个 CPU,那么会有 8 个 Worker 处于空闲状态。
Actors 场景:
- Actor 在运行时通过
actor_cls.remote()实例化,每个 Actor 对应一个专属 Worker 进程。 - 该 Actor 的所有方法都在同一个进程中运行,使用 Actor 定义时指定的资源。
- 与 Task 不同,Ray 不会复用 Actor 的 Python 进程——当 Actor 被删除时,对应的进程也会被终止。
最佳实践
为了最大化资源利用率,应尽量让 Worker 保持忙碌,并分配足够的集群资源来运行所有 Actor 和 Task。如果你的场景不需要维护状态,使用 Task 会比 Actor 更灵活高效。
Task 事件 (Task Events)
默认情况下,Ray 会追踪 Actor 任务的执行过程,报告任务状态事件和性能分析事件,供 Ray Dashboard 和 State API 使用。
可以通过在 ray.remote() 或 .options() 中设置 enable_task_events=False 来禁用任务事件报告,以减少执行开销。
还可以为特定的 Actor 方法单独设置,方法级别的设置会覆盖 Actor 级别的设置:
@ray.remote
class FooActor:
# Disable task events reporting for this method.
@ray.method(enable_task_events=False)
def foo(self):
pass
foo_actor = FooActor.remote()
ray.get(foo_actor.foo.remote())