Tasks
Abstract
Ray 允许在独立的工作进程上异步执行任意函数。这类函数称为 Ray Remote Functions,它们的异步调用称为 Ray Tasks。
Reference: Ray Tasks
Ray Tasks Example
import ray
import time
# A regular Python function.
def normal_function():
return 1
# By adding the `@ray.remote` decorator, a regular Python function
# becomes a Ray remote function.
@ray.remote
def my_function():
return 1
# To invoke this remote function, use the `remote` method.
# This will immediately return an object ref (a future) and then create
# a task that will be executed on a worker process.
obj_ref = my_function.remote()
# The result can be retrieved with ``ray.get``.
assert ray.get(obj_ref) == 1
@ray.remote
def slow_function():
time.sleep(10)
return 1
# Ray tasks are executed in parallel.
# All computation is performed in the background, driven by Ray's internal event loop.
for _ in range(4):
# This doesn't block.
slow_function.remote()
可以使用 State API 中的 ray summary tasks 查看正在运行和已完成的任务以及数量:
ray summary tasks
======== Tasks Summary: 2023-05-26 11:09:32.092546 ========
Stats:
------------------------------------
total_actor_scheduled: 0
total_actor_tasks: 0
total_tasks: 5
Table (group by func_name):
------------------------------------
FUNC_OR_CLASS_NAME STATE_COUNTS TYPE
0 slow_function RUNNING: 4 NORMAL_TASK
1 my_function FINISHED: 1 NORMAL_TASK
声明所需资源
Ray 允许指定 Task 或 Actor 的逻辑资源需求(例如 CPU、GPU 和自定义资源)。Task 或 Actor 只有在有足够的逻辑资源可用时才会运行在节点上。
# Specify required resources.
@ray.remote(num_cpus=4, num_gpus=2)
def my_function():
return 1
# Override the default resource requirements.
my_function.options(num_cpus=3).remote()
向 Task 传递 Object Refs
除了传递具体的值,对象引用 (Object refs) 也可以作为参数传递给远程函数。当该任务执行时,在函数体内部,该参数将被解析为底层的实际值。
@ray.remote
def function_with_an_argument(value):
return value + 1
obj_ref1 = my_function.remote()
assert ray.get(obj_ref1) == 1
# You can pass an object ref as an argument to another Ray task.
obj_ref2 = function_with_an_argument.remote(obj_ref1)
assert ray.get(obj_ref2) == 2
请注意以下行为
- 任务依赖性:由于第二个任务依赖于第一个任务的输出,Ray 在第一个任务完成之前不会执行第二个任务。
- 跨机器调度:如果这两个任务被调度在不同的机器上,第一个任务的输出(即对应
obj_ref1/objRef1的值)将通过网络传输到调度第二个任务的机器上。
等待部分结果
在 Ray 任务结果上调用 ray.get 会导致程序阻塞,直到该任务执行完毕。当你启动了大量任务后,你可能希望知道哪些任务已经执行完成,而不必为了等待所有任务而陷入阻塞状态。
通过使用 ray.wait() 可以实现这一目标。该函数的工作方式如下:
object_refs = [slow_function.remote() for _ in range(2)]
# Return as soon as one of the tasks finished execution.
ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None)
生成器 (Generators)
Ray 与 Python 的生成器语法兼容。在分布式编程中,使用生成器(尤其是 yield 关键字)非常强大。这允许远程任务流式返回多个结果,而不是一次性返回一个巨大的列表。
import ray
import time
# Takes 25 seconds to finish.
@ray.remote
def f():
for i in range(5):
time.sleep(5)
yield i
for obj_ref in f.remote():
# Prints every 5 seconds and stops after 25 seconds.
print(ray.get(obj_ref))
yield 关键字的优势
- 降低内存占用:不需要在内存中积压所有结果。
- 减少延迟:下游任务可以在第一个结果产生时就立即开始处理,实现更高效的流水线。
多重返回值 (Multiple returns)
默认情况下,一个 Ray 任务只返回一个 对象引用 (Object Ref)。但是,你可以通过设置 num_returns 选项,配置 Ray 任务返回多个对象引用。
# By default, a Ray task only returns a single Object Ref.
@ray.remote
def return_single():
return 0, 1, 2
object_ref = return_single.remote()
assert ray.get(object_ref) == (0, 1, 2)
# However, you can configure Ray tasks to return multiple Object Refs.
@ray.remote(num_returns=3)
def return_multiple():
return 0, 1, 2
object_ref0, object_ref1, object_ref2 = return_multiple.remote()
assert ray.get(object_ref0) == 0
assert ray.get(object_ref1) == 1
assert ray.get(object_ref2) == 2
远程生成器 (Remote Generators)
对于需要返回多个 object refs 的任务,Ray 还支持远程生成器 (Remote Generators),允许任务一次返回一个 object ref,从而降低 Worker 节点的内存占用。此外,Ray 还支持动态设置返回值数量的选项,这在调用者无法预知会产生多少个返回值时非常有用。
@ray.remote(num_returns=3)
def return_multiple_as_generator():
for i in range(3):
yield i
# NOTE: Similar to normal functions, these objects will not be available
# until the full task is complete and all returns have been generated.
a, b, c = return_multiple_as_generator.remote()
取消任务 (Cancelling Tasks)
Ray 提供了 ray.cancel 函数,可以取消正在执行的 Task。
@ray.remote
def blocking_operation():
time.sleep(10e6)
obj_ref = blocking_operation.remote()
ray.cancel(obj_ref)
try:
ray.get(obj_ref)
except ray.exceptions.TaskCancelledError:
print("Object reference was cancelled.")
调度 (Scheduling)
对于每一个 Task,Ray 都会选择一个 Worker Node 来运行它。调度决策 (Scheduling decision) 主要基于以下几个因素:
- 任务的资源需求 (Resource requirements):例如 CPU、GPU 或自定义资源的需求量。
- 指定的调度策略 (Scheduling strategy):例如是否要求节点打散或集中(Spread vs. Pack)。
- 任务参数的位置 (Locations of task arguments):为了减少网络传输,Ray 会倾向于在数据所在的节点上运行任务。
容错机制 (Fault Tolerance)
Ray 默认会在任务因为 系统故障(System failures) 或 应用层异常(Application-level failures) 而失败时自动进行重试。
可以通过在 @ray.remote() 装饰器或 .options() 方法中设置 max_retries(最大重试次数)和 retry_exceptions(是否对异常进行重试)参数,灵活控制任务的重试策略。
Task 事件 (Task Events)
默认情况下,Ray 会追踪任务的执行过程,并报告任务状态事件(Status events)和性能分析事件(Profiling events)。这些数据会被 Ray Dashboard 和 State API 调用。
你可以通过在 @ray.remote() 或 .options() 中设置 enable_task_events 选项来禁用任务事件。这样做可以减少任务执行的开销,并降低任务发送给 Ray Dashboard 的数据量。
注意
嵌套任务(Nested tasks)不会继承父任务的任务事件设置,你需要为每个任务单独设置任务事件配置。
嵌套 remote 函数
远程函数可以调用其他远程函数,从而形成嵌套任务(nested tasks)。
import ray
@ray.remote
def f():
return 1
@ray.remote
def g():
# Call f 4 times and return the resulting object refs.
return [f.remote() for _ in range(4)]
@ray.remote
def h():
# Call f 4 times, block until those 4 tasks finish,
# retrieve the results, and return the values.
return ray.get([f.remote() for _ in range(4)])
调用 g 和 h 时会产生如下行为:
>>> ray.get(g.remote())
[ObjectRef(b1457ba0911ae84989aae86f89409e953dd9a80e),
ObjectRef(7c14a1d13a56d8dc01e800761a66f09201104275),
ObjectRef(99763728ffc1a2c0766a2000ebabded52514e9a6),
ObjectRef(9c2f372e1933b04b2936bb6f58161285829b9914)]
>>> ray.get(h.remote())
[1, 1, 1, 1]
限制: 被调用的远程函数(如 f)必须在引用它的远程函数(如 g 和 h)之前定义。
这是因为 g 和 h 在被注册为远程函数时,会被 pickle(序列化)并发送到 worker 节点。如果此时 f 尚未定义,则 g 或 h 中对 f 的引用会失败,导致序列化出的函数体不完整、运行报错。
阻塞时释放资源
当 Ray 任务因为阻塞(如调用 ray.get() 等)而等待结果时,Ray 会自动释放任务所占用的 CPU 资源。这一机制避免了如下的资源死锁问题:比如,嵌套任务在等待子任务完成时,如果一直占用着 CPU 资源,可能导致集群中的其他任务迟迟无法分配到资源,进而产生死锁。
来看下面的远程函数示例:
@ray.remote(num_cpus=1, num_gpus=1)
def g():
return ray.get(f.remote())
当 g 执行到 ray.get() 并进入阻塞状态时,Ray 会自动让出(释放)该任务占用的 CPU 资源。这样,其他任务可以使用这些 CPU,提升整体资源利用率。当 ray.get() 返回、阻塞解除后,g 会重新获取到所需 CPU 资源,继续运行后续代码。
注意
需要注意的是:GPU 资源不会像 CPU 那样被自动释放。整个任务生命周期中,申请到的 GPU 会始终处于占用状态,因为 GPU 内存通常需要在任务执行期间持续保持可用。