跳转至

Actor 执行顺序

Abstract

在 Ray 中,Actor 会接收来自多个提交方(submitter)的任务,包括:

  • driver
  • worker

任务的执行顺序取决于 Actor 的类型(同步 / 异步 / 线程化)以及任务来源。

Actor 类型 是否保证顺序 说明
同步 Actor(单线程) ✅(同一 submitter) 默认顺序执行
同步 Actor(多 submitter) 可能乱序
Async Actor 协程调度,不保证顺序
Threaded Actor 多线程执行


同步单线程 Actor

同一 submitter 的任务

对于同步(非 async)Actor:

  • 提交顺序执行
  • 后提交的任务必须等待前一个任务执行完成
import ray

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def add(self, addition):
        self.value += addition
        return self.value

counter = Counter.remote()

# For tasks from the same submitter,
# they are executed according to submission order.
value0 = counter.add.remote(1)
value1 = counter.add.remote(2)

# Output: 1. The first submitted task is executed first.
print(ray.get(value0))
# Output: 3. The later submitted task is executed later.
print(ray.get(value1))

Warning

以下情况不会保证执行顺序:

  • 设置了 allow_out_of_order_execution
  • 设置了 max_task_retries > 0(发生重试时)

不同 submitter 的任务

对于同步(非 async)Actor:

  • 不保证执行顺序
  • 后提交的任务可能先执行
import time
import ray

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def add(self, addition):
        self.value += addition
        return self.value

counter = Counter.remote()

# Submit task from a worker
@ray.remote
def submitter(value):
    return ray.get(counter.add.remote(value))

# Simulate delayed result resolution.
@ray.remote
def delayed_resolution(value):
    time.sleep(1)
    return value

# Submit tasks from different workers, with
# the first submitted task waiting for
# dependency resolution.
value0 = submitter.remote(delayed_resolution.remote(1))
value1 = submitter.remote(2)

# Output: 3. The first submitted task is executed later.
print(ray.get(value0))
# Output: 2. The later submitted task is executed first.
print(ray.get(value1))

原因:

  • 第一个任务被依赖阻塞(参数未 ready)
  • 第二个任务可以先执行

异步 / 线程化 Actor

对于以下 Actor:

  • Async Actor(async def)
  • Threaded Actor(max_concurrency > 1)

不能保证任务执行顺序,这意味着后提交的任务可能先执行。

import time
import ray

@ray.remote
class AsyncCounter:
    def __init__(self):
        self.value = 0

    async def add(self, addition):
        self.value += addition
        return self.value

counter = AsyncCounter.remote()

# Simulate delayed result resolution.
@ray.remote
def delayed_resolution(value):
    time.sleep(1)
    return value

# Submit tasks from the driver, with
# the first submitted task waiting for
# dependency resolution.
value0 = counter.add.remote(delayed_resolution.remote(1))
value1 = counter.add.remote(2)

# Output: 3. The first submitted task is executed later.
print(ray.get(value0))
# Output: 2. The later submitted task is executed first.
print(ray.get(value1))