Objects
Abstract
在 Ray 中,Task 和 Actor 创建并计算的数据统称为 对象(Object),我们将它们称为 远程对象(Remote Object),因为它们可以存储在 Ray 集群中的任何位置,同时我们使用 对象引用(Object Ref) 来引用它们。
Remote Object 被缓存在 Ray 的 分布式共享内存对象存储(Distributed Shared-Memory Object Store) 中,集群中的每个节点都拥有一个 Object Store。
Objects & ObjectRef
在集群环境下,一个 Remote Object 可以存在于一个或多个节点上,且与持有对应 Object Ref 的节点无关。
-
Remote Object
- 由 Task / Actor 产生
- 存储在分布式 object store 中
- 可以位于集群任意节点(甚至多个节点)
-
ObjectRef
- 对 Object 的引用(类似指针 / ID)
- 不包含数据本身
- 类似 Future
Object ref 可以通过两种方式创建:
- 由 远程函数调用 返回
-
由
ray.put()返回import ray # Put an object in Ray's object store. y = 1 object_ref = ray.put(y)
不可变性(Immutability)
Remote Object 一旦创建 → 不可修改。
-
为什么这样设计?
- 支持多副本复制(replication)
- 无需同步不同节点的数据
- 避免一致性问题
-
带来的好处
- 更简单的分布式模型
- 更高的读取性能(可多节点缓存)
- 无锁 / 无同步开销
获取 Object 数据
你可以使用 ray.get() 方法通过 Object Ref 获取 Remote Object 的结果。
如果当前节点的 Object Store 中不包含该对象,则会从其他节点下载该对象。
import ray
import time
# Get the value of one object ref.
obj_ref = ray.put(1)
assert ray.get(obj_ref) == 1
# Get the values of multiple object refs in parallel.
assert ray.get([ray.put(i) for i in range(3)]) == [0, 1, 2]
# You can also set a timeout to return early from a ``get``
# that's blocking for too long.
from ray.exceptions import GetTimeoutError
# ``GetTimeoutError`` is a subclass of ``TimeoutError``.
@ray.remote
def long_running_function():
time.sleep(8)
obj_ref = long_running_function.remote()
try:
ray.get(obj_ref, timeout=4)
except GetTimeoutError: # You can capture the standard "TimeoutError" instead
print("`get` timed out.")
ray.get():对象类型与性能特性
| 对象类型 | 获取方式 | 性能特性 |
|---|---|---|
| NumPy 数组/集合 | 零拷贝(zero-copy) | 直接共享 Object Store 内存 |
| 其他 Python 对象 | 需反序列化 | 反序列化为新 Python 对象 |
传递 Object 参数
Ray 的 Object Ref 可以在整个 Ray 应用中自由传递。这意味着它们可以作为参数传递给 Task / Actor 方法,甚至可以存储在其他对象中。 Object 通过分布式引用计数进行跟踪,当所有对该 Object 的引用被删除后,其数据会被自动释放。
将 Object 传递给 Task / Actor 方法有两种不同方式。根据传递方式的不同,Ray 会决定在任务执行前是否对 Object 进行解引用(de-reference)。
作为顶层参数传递
当一个 Object Ref 作为顶层参数直接传递给 Task / Actor 方法时,Ray 会对其进行解引用。 这意味着 Ray 会获取所有顶层 Object Ref 参数的实际数据,并在数据完全可用之前不会执行 Task / Actor 方法。
import ray
@ray.remote
def echo(a: int, b: int, c: int):
print(a, b, c)
# 直接传值
echo.remote(1, 2, 3)
# -> prints "1 2 3"
# 使用 object ref
a, b, c = ray.put(1), ray.put(2), ray.put(3)
# 顶层传参会自动解引用
echo.remote(a, b, c)
# -> prints "1 2 3"
作为嵌套参数传递
当对象被包含在一个嵌套结构中(例如 list),Ray 不会对其进行解引用。
这意味着任务需要显式调用 ray.get() 来获取具体值。
如果任务没有调用 ray.get(),那么该对象的数据就不会被传输到任务所在的机器上。
import ray
@ray.remote
def echo_and_get(x_list):
print("args:", x_list)
print("values:", ray.get(x_list))
a, b, c = ray.put(1), ray.put(2), ray.put(3)
# 嵌套传参不会自动解引用
echo_and_get.remote([a, b, c])
# -> args: [ObjectRef(...), ...]
# -> values: [1, 2, 3]
Actor 中的行为一致
同样的规则也适用于:
- Actor 构造函数
- Actor 方法调用
@ray.remote
class Actor:
def __init__(self, arg):
pass
def method(self, arg):
pass
obj = ray.put(2)
# Actor 构造
Actor.remote(obj) # 按值(解引用)
Actor.remote([obj]) # 按引用
# Actor 方法
actor.method.remote(obj) # 按值
actor.method.remote([obj]) # 按引用
闭包捕获 Object
你也可以通过闭包捕获(closure-capture)的方式将 Object 传递给 Task / Actor 方法。 当你有一个较大的 Object,希望在多个 Task / Actor 之间共享,并且不想反复作为参数传递时,这种方式会很方便。
注意
如果定义的 Task / Actor 方法闭包中捕获了 Object Ref,那么该 Object 会被引用计数机制“固定(pin)”住,因此在整个 job 结束之前,该 Object 不会被驱逐(evicted)。
import ray
# 将值 (1, 2, 3) 放入 Ray 的 Object Store 中。
a, b, c = ray.put(1), ray.put(2), ray.put(3)
@ray.remote
def print_via_capture():
# 打印 (a, b, c) 的值
print(ray.get([a, b, c]))
# 通过闭包捕获 Object Ref
# 在 `print_via_capture` 函数内部,可以获取并打印全局 Object Ref (a, b, c)。
print_via_capture.remote()
# -> prints [1, 2, 3]
嵌套 Object
Ray 支持在 Task / Actor 方法中嵌套 Object Ref,内部 Object Ref 会通过引用计数机制保持活跃,直到所有外部 Object Ref 都被删除。
object_ref_2 = ray.put([object_ref])
容错
Ray 可以通过血缘重建(lineage reconstruction) 自动恢复因数据丢失的 Object,但无法恢复 owner 失败。