Actor 执行顺序
Abstract
在 Ray 中,Actor 会接收来自多个提交方(submitter)的任务,包括:
- driver
- worker
任务的执行顺序取决于 Actor 的类型(同步 / 异步 / 线程化)以及任务来源。
| Actor 类型 | 是否保证顺序 | 说明 |
|---|---|---|
| 同步 Actor(单线程) | ✅(同一 submitter) | 默认顺序执行 |
| 同步 Actor(多 submitter) | ❌ | 可能乱序 |
| Async Actor | ❌ | 协程调度,不保证顺序 |
| Threaded Actor | ❌ | 多线程执行 |
同步单线程 Actor
同一 submitter 的任务
对于同步(非 async)Actor:
- 按提交顺序执行
- 后提交的任务必须等待前一个任务执行完成
import ray
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def add(self, addition):
self.value += addition
return self.value
counter = Counter.remote()
# For tasks from the same submitter,
# they are executed according to submission order.
value0 = counter.add.remote(1)
value1 = counter.add.remote(2)
# Output: 1. The first submitted task is executed first.
print(ray.get(value0))
# Output: 3. The later submitted task is executed later.
print(ray.get(value1))
Warning
以下情况不会保证执行顺序:
- 设置了
allow_out_of_order_execution - 设置了
max_task_retries > 0(发生重试时)
不同 submitter 的任务
对于同步(非 async)Actor:
- 不保证执行顺序
- 后提交的任务可能先执行
import time
import ray
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def add(self, addition):
self.value += addition
return self.value
counter = Counter.remote()
# Submit task from a worker
@ray.remote
def submitter(value):
return ray.get(counter.add.remote(value))
# Simulate delayed result resolution.
@ray.remote
def delayed_resolution(value):
time.sleep(1)
return value
# Submit tasks from different workers, with
# the first submitted task waiting for
# dependency resolution.
value0 = submitter.remote(delayed_resolution.remote(1))
value1 = submitter.remote(2)
# Output: 3. The first submitted task is executed later.
print(ray.get(value0))
# Output: 2. The later submitted task is executed first.
print(ray.get(value1))
原因:
- 第一个任务被依赖阻塞(参数未 ready)
- 第二个任务可以先执行
异步 / 线程化 Actor
对于以下 Actor:
- Async Actor(async def)
- Threaded Actor(max_concurrency > 1)
不能保证任务执行顺序,这意味着后提交的任务可能先执行。
import time
import ray
@ray.remote
class AsyncCounter:
def __init__(self):
self.value = 0
async def add(self, addition):
self.value += addition
return self.value
counter = AsyncCounter.remote()
# Simulate delayed result resolution.
@ray.remote
def delayed_resolution(value):
time.sleep(1)
return value
# Submit tasks from the driver, with
# the first submitted task waiting for
# dependency resolution.
value0 = counter.add.remote(delayed_resolution.remote(1))
value1 = counter.add.remote(2)
# Output: 3. The first submitted task is executed later.
print(ray.get(value0))
# Output: 2. The later submitted task is executed first.
print(ray.get(value1))