跳转至

Tasks

Abstract

Ray 允许在独立的工作进程上异步执行任意函数。这类函数称为 Ray Remote Functions,它们的异步调用称为 Ray Tasks。

Reference: Ray Tasks

Ray Tasks Example
import ray
import time

# A regular Python function.
def normal_function():
    return 1

# By adding the `@ray.remote` decorator, a regular Python function
# becomes a Ray remote function.
@ray.remote
def my_function():
    return 1

# To invoke this remote function, use the `remote` method.
# This will immediately return an object ref (a future) and then create
# a task that will be executed on a worker process.
obj_ref = my_function.remote()

# The result can be retrieved with ``ray.get``.
assert ray.get(obj_ref) == 1

@ray.remote
def slow_function():
    time.sleep(10)
    return 1

# Ray tasks are executed in parallel.
# All computation is performed in the background, driven by Ray's internal event loop.
for _ in range(4):
    # This doesn't block.
    slow_function.remote()

可以使用 State API 中的 ray summary tasks 查看正在运行和已完成的任务以及数量:

ray summary tasks
======== Tasks Summary: 2023-05-26 11:09:32.092546 ========
Stats:
------------------------------------
total_actor_scheduled: 0
total_actor_tasks: 0
total_tasks: 5


Table (group by func_name):
------------------------------------
    FUNC_OR_CLASS_NAME    STATE_COUNTS    TYPE
0   slow_function         RUNNING: 4      NORMAL_TASK
1   my_function           FINISHED: 1     NORMAL_TASK

声明所需资源

Ray 允许指定 Task 或 Actor 的逻辑资源需求(例如 CPU、GPU 和自定义资源)。Task 或 Actor 只有在有足够的逻辑资源可用时才会运行在节点上。

# Specify required resources.
@ray.remote(num_cpus=4, num_gpus=2)
def my_function():
    return 1


# Override the default resource requirements.
my_function.options(num_cpus=3).remote()

向 Task 传递 Object Refs

除了传递具体的值,对象引用 (Object refs) 也可以作为参数传递给远程函数。当该任务执行时,在函数体内部,该参数将被解析为底层的实际值。

@ray.remote
def function_with_an_argument(value):
    return value + 1


obj_ref1 = my_function.remote()
assert ray.get(obj_ref1) == 1

# You can pass an object ref as an argument to another Ray task.
obj_ref2 = function_with_an_argument.remote(obj_ref1)
assert ray.get(obj_ref2) == 2

请注意以下行为

  • 任务依赖性:由于第二个任务依赖于第一个任务的输出,Ray 在第一个任务完成之前不会执行第二个任务。
  • 跨机器调度:如果这两个任务被调度在不同的机器上,第一个任务的输出(即对应 obj_ref1 / objRef1 的值)将通过网络传输到调度第二个任务的机器上。

等待部分结果

在 Ray 任务结果上调用 ray.get 会导致程序阻塞,直到该任务执行完毕。当你启动了大量任务后,你可能希望知道哪些任务已经执行完成,而不必为了等待所有任务而陷入阻塞状态。

通过使用 ray.wait() 可以实现这一目标。该函数的工作方式如下:

object_refs = [slow_function.remote() for _ in range(2)]
# Return as soon as one of the tasks finished execution.
ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None)

生成器 (Generators)

Ray 与 Python 的生成器语法兼容。在分布式编程中,使用生成器(尤其是 yield 关键字)非常强大。这允许远程任务流式返回多个结果,而不是一次性返回一个巨大的列表。

import ray
import time

# Takes 25 seconds to finish.
@ray.remote
def f():
    for i in range(5):
        time.sleep(5)
        yield i

for obj_ref in f.remote():
    # Prints every 5 seconds and stops after 25 seconds.
    print(ray.get(obj_ref))

yield 关键字的优势

  • 降低内存占用:不需要在内存中积压所有结果。
  • 减少延迟:下游任务可以在第一个结果产生时就立即开始处理,实现更高效的流水线。

多重返回值 (Multiple returns)

默认情况下,一个 Ray 任务只返回一个 对象引用 (Object Ref)。但是,你可以通过设置 num_returns 选项,配置 Ray 任务返回多个对象引用。

# By default, a Ray task only returns a single Object Ref.
@ray.remote
def return_single():
    return 0, 1, 2


object_ref = return_single.remote()
assert ray.get(object_ref) == (0, 1, 2)


# However, you can configure Ray tasks to return multiple Object Refs.
@ray.remote(num_returns=3)
def return_multiple():
    return 0, 1, 2


object_ref0, object_ref1, object_ref2 = return_multiple.remote()
assert ray.get(object_ref0) == 0
assert ray.get(object_ref1) == 1
assert ray.get(object_ref2) == 2

远程生成器 (Remote Generators)

对于需要返回多个 object refs 的任务,Ray 还支持远程生成器 (Remote Generators),允许任务一次返回一个 object ref,从而降低 Worker 节点的内存占用。此外,Ray 还支持动态设置返回值数量的选项,这在调用者无法预知会产生多少个返回值时非常有用。

@ray.remote(num_returns=3)
def return_multiple_as_generator():
    for i in range(3):
        yield i


# NOTE: Similar to normal functions, these objects will not be available
# until the full task is complete and all returns have been generated.
a, b, c = return_multiple_as_generator.remote()

取消任务 (Cancelling Tasks)

Ray 提供了 ray.cancel 函数,可以取消正在执行的 Task。

@ray.remote
def blocking_operation():
    time.sleep(10e6)


obj_ref = blocking_operation.remote()
ray.cancel(obj_ref)

try:
    ray.get(obj_ref)
except ray.exceptions.TaskCancelledError:
    print("Object reference was cancelled.")

调度 (Scheduling)

对于每一个 Task,Ray 都会选择一个 Worker Node 来运行它。调度决策 (Scheduling decision) 主要基于以下几个因素:

  • 任务的资源需求 (Resource requirements):例如 CPU、GPU 或自定义资源的需求量。
  • 指定的调度策略 (Scheduling strategy):例如是否要求节点打散或集中(Spread vs. Pack)。
  • 任务参数的位置 (Locations of task arguments):为了减少网络传输,Ray 会倾向于在数据所在的节点上运行任务。

容错机制 (Fault Tolerance)

Ray 默认会在任务因为 系统故障(System failures)应用层异常(Application-level failures) 而失败时自动进行重试。

可以通过在 @ray.remote() 装饰器或 .options() 方法中设置 max_retries(最大重试次数)和 retry_exceptions(是否对异常进行重试)参数,灵活控制任务的重试策略。

Task 事件 (Task Events)

默认情况下,Ray 会追踪任务的执行过程,并报告任务状态事件(Status events)和性能分析事件(Profiling events)。这些数据会被 Ray Dashboard 和 State API 调用。

你可以通过在 @ray.remote().options() 中设置 enable_task_events 选项来禁用任务事件。这样做可以减少任务执行的开销,并降低任务发送给 Ray Dashboard 的数据量。

注意

嵌套任务(Nested tasks)不会继承父任务的任务事件设置,你需要为每个任务单独设置任务事件配置。


嵌套 remote 函数

远程函数可以调用其他远程函数,从而形成嵌套任务(nested tasks)。

import ray

@ray.remote
def f():
    return 1

@ray.remote
def g():
    # Call f 4 times and return the resulting object refs.
    return [f.remote() for _ in range(4)]

@ray.remote
def h():
    # Call f 4 times, block until those 4 tasks finish,
    # retrieve the results, and return the values.
    return ray.get([f.remote() for _ in range(4)])

调用 g 和 h 时会产生如下行为:

>>> ray.get(g.remote())
[ObjectRef(b1457ba0911ae84989aae86f89409e953dd9a80e),
 ObjectRef(7c14a1d13a56d8dc01e800761a66f09201104275),
 ObjectRef(99763728ffc1a2c0766a2000ebabded52514e9a6),
 ObjectRef(9c2f372e1933b04b2936bb6f58161285829b9914)]

>>> ray.get(h.remote())
[1, 1, 1, 1]

限制: 被调用的远程函数(如 f)必须在引用它的远程函数(如 gh)之前定义。

这是因为 gh 在被注册为远程函数时,会被 pickle(序列化)并发送到 worker 节点。如果此时 f 尚未定义,则 gh 中对 f 的引用会失败,导致序列化出的函数体不完整、运行报错。

阻塞时释放资源

当 Ray 任务因为阻塞(如调用 ray.get() 等)而等待结果时,Ray 会自动释放任务所占用的 CPU 资源。这一机制避免了如下的资源死锁问题:比如,嵌套任务在等待子任务完成时,如果一直占用着 CPU 资源,可能导致集群中的其他任务迟迟无法分配到资源,进而产生死锁。

来看下面的远程函数示例:

@ray.remote(num_cpus=1, num_gpus=1)
def g():
    return ray.get(f.remote())

g 执行到 ray.get() 并进入阻塞状态时,Ray 会自动让出(释放)该任务占用的 CPU 资源。这样,其他任务可以使用这些 CPU,提升整体资源利用率。当 ray.get() 返回、阻塞解除后,g 会重新获取到所需 CPU 资源,继续运行后续代码。

注意

需要注意的是:GPU 资源不会像 CPU 那样被自动释放。整个任务生命周期中,申请到的 GPU 会始终处于占用状态,因为 GPU 内存通常需要在任务执行期间持续保持可用。