跳转至

Objects

Abstract

在 Ray 中,Task 和 Actor 创建并计算的数据统称为 对象(Object),我们将它们称为 远程对象(Remote Object),因为它们可以存储在 Ray 集群中的任何位置,同时我们使用 对象引用(Object Ref) 来引用它们。

Remote Object 被缓存在 Ray 的 分布式共享内存对象存储(Distributed Shared-Memory Object Store) 中,集群中的每个节点都拥有一个 Object Store。

Objects & ObjectRef

在集群环境下,一个 Remote Object 可以存在于一个或多个节点上,且与持有对应 Object Ref 的节点无关。

  • Remote Object


    • 由 Task / Actor 产生
    • 存储在分布式 object store 中
    • 可以位于集群任意节点(甚至多个节点)
  • ObjectRef


    • 对 Object 的引用(类似指针 / ID)
    • 不包含数据本身
    • 类似 Future

Object ref 可以通过两种方式创建:

  • 远程函数调用 返回
  • ray.put() 返回

    import ray
    
    # Put an object in Ray's object store.
    y = 1
    object_ref = ray.put(y)
    

不可变性(Immutability)

Remote Object 一旦创建 → 不可修改。


  • 为什么这样设计?


    • 支持多副本复制(replication)
    • 无需同步不同节点的数据
    • 避免一致性问题
  • 带来的好处


    • 更简单的分布式模型
    • 更高的读取性能(可多节点缓存)
    • 无锁 / 无同步开销

获取 Object 数据

你可以使用 ray.get() 方法通过 Object Ref 获取 Remote Object 的结果。 如果当前节点的 Object Store 中不包含该对象,则会从其他节点下载该对象。

import ray
import time

# Get the value of one object ref.
obj_ref = ray.put(1)
assert ray.get(obj_ref) == 1

# Get the values of multiple object refs in parallel.
assert ray.get([ray.put(i) for i in range(3)]) == [0, 1, 2]

# You can also set a timeout to return early from a ``get``
# that's blocking for too long.
from ray.exceptions import GetTimeoutError
# ``GetTimeoutError`` is a subclass of ``TimeoutError``.

@ray.remote
def long_running_function():
    time.sleep(8)

obj_ref = long_running_function.remote()
try:
    ray.get(obj_ref, timeout=4)
except GetTimeoutError:  # You can capture the standard "TimeoutError" instead
    print("`get` timed out.")

ray.get():对象类型与性能特性

对象类型 获取方式 性能特性
NumPy 数组/集合 零拷贝(zero-copy) 直接共享 Object Store 内存
其他 Python 对象 需反序列化 反序列化为新 Python 对象


传递 Object 参数

Ray 的 Object Ref 可以在整个 Ray 应用中自由传递。这意味着它们可以作为参数传递给 Task / Actor 方法,甚至可以存储在其他对象中。 Object 通过分布式引用计数进行跟踪,当所有对该 Object 的引用被删除后,其数据会被自动释放。

将 Object 传递给 Task / Actor 方法有两种不同方式。根据传递方式的不同,Ray 会决定在任务执行前是否对 Object 进行解引用(de-reference)。

作为顶层参数传递

当一个 Object Ref 作为顶层参数直接传递给 Task / Actor 方法时,Ray 会对其进行解引用。 这意味着 Ray 会获取所有顶层 Object Ref 参数的实际数据,并在数据完全可用之前不会执行 Task / Actor 方法。

import ray

@ray.remote
def echo(a: int, b: int, c: int):
    print(a, b, c)

# 直接传值
echo.remote(1, 2, 3)
# -> prints "1 2 3"

# 使用 object ref
a, b, c = ray.put(1), ray.put(2), ray.put(3)

# 顶层传参会自动解引用
echo.remote(a, b, c)
# -> prints "1 2 3"

作为嵌套参数传递

当对象被包含在一个嵌套结构中(例如 list),Ray 不会对其进行解引用。 这意味着任务需要显式调用 ray.get() 来获取具体值。

如果任务没有调用 ray.get(),那么该对象的数据就不会被传输到任务所在的机器上。

import ray

@ray.remote
def echo_and_get(x_list):
    print("args:", x_list)
    print("values:", ray.get(x_list))

a, b, c = ray.put(1), ray.put(2), ray.put(3)

# 嵌套传参不会自动解引用
echo_and_get.remote([a, b, c])
# -> args: [ObjectRef(...), ...]
# -> values: [1, 2, 3]
Actor 中的行为一致

同样的规则也适用于:

  • Actor 构造函数
  • Actor 方法调用
@ray.remote
class Actor:
    def __init__(self, arg):
        pass

    def method(self, arg):
        pass

obj = ray.put(2)

# Actor 构造
Actor.remote(obj)      # 按值(解引用)
Actor.remote([obj])    # 按引用

# Actor 方法
actor.method.remote(obj)    # 按值
actor.method.remote([obj])  # 按引用

闭包捕获 Object

你也可以通过闭包捕获(closure-capture)的方式将 Object 传递给 Task / Actor 方法。 当你有一个较大的 Object,希望在多个 Task / Actor 之间共享,并且不想反复作为参数传递时,这种方式会很方便。

注意

如果定义的 Task / Actor 方法闭包中捕获了 Object Ref,那么该 Object 会被引用计数机制“固定(pin)”住,因此在整个 job 结束之前,该 Object 不会被驱逐(evicted)。

import ray

# 将值 (1, 2, 3) 放入 Ray 的 Object Store 中。
a, b, c = ray.put(1), ray.put(2), ray.put(3)


@ray.remote
def print_via_capture():
    # 打印 (a, b, c) 的值
    print(ray.get([a, b, c]))


# 通过闭包捕获 Object Ref
# 在 `print_via_capture` 函数内部,可以获取并打印全局 Object Ref (a, b, c)。
print_via_capture.remote()
# -> prints [1, 2, 3]

嵌套 Object

Ray 支持在 Task / Actor 方法中嵌套 Object Ref,内部 Object Ref 会通过引用计数机制保持活跃,直到所有外部 Object Ref 都被删除。

object_ref_2 = ray.put([object_ref])

容错

Ray 可以通过血缘重建(lineage reconstruction) 自动恢复因数据丢失的 Object,但无法恢复 owner 失败。