跳转至

Serialization

Abstract

由于 Ray 的进程之间不共享内存空间,在 Worker 和节点之间传输数据时需要进行序列化和反序列化。 Ray 使用 Plasma Object Store 来高效地在不同进程和不同节点之间传输对象。

在同一节点上,Object Store 中的 NumPy array 可以在多个 Worker 之间共享(零拷贝反序列化)。

概述

Ray 使用一个定制的 Pickle protocol version 5 来替代原先的 PyArrow 序列化器。 这消除了之前的一些限制(例如无法序列化递归对象)。

目前,Ray 兼容 Pickle protocol version 5,并借助 cloudpickle 支持序列化更广泛的对象类型(例如 lambda 函数、嵌套函数、动态类等)。


Plasma Object Store

Plasma 是一个基于内存的对象存储系统,最初作为 Apache Arrow 的一部分开发。 在 Ray 1.0.0 发布之前,Ray 将 Arrow 中的 Plasma 代码分叉(fork)到自身代码库中,以便更好地适配 Ray 的架构和性能需求。

Plasma 用于在不同进程和节点之间高效传输对象。所有存储在 Plasma Object Store 中的对象都是不可变的,并存储在共享内存中,这使得同一节点上的多个 Worker 可以高效访问这些对象。

数据流行为

每个节点都有自己的 Object Store。当数据被放入 Object Store 时,并不会自动广播到其他节点。数据会保留在本地节点,直到其他节点上的任务或 Actor 请求该数据时才会进行传输。

flowchart LR
    A[put] --> B[本地 object store]
    B -- 其他节点请求 --> C[才会传输]

序列化 ObjectRef

应当将使用 ray.cloudpickle 显式序列化 ObjectRef 作为最后的手段

推荐的方式

推荐的方式是通过 Ray 的 Task / Actor 方法参数和返回值来直接传递 ObjectRef

Ray 的 ObjectRef 可以通过 ray.cloudpickle 进行序列化。随后可以反序列化该 ObjectRef,并使用 ray.get() 获取其值。

注意事项

  • 必须使用 ray.cloudpickle 进行序列化,其他 pickle 工具(如标准库 pickle)不保证兼容。
  • 序列化与反序列化必须在同一个 Ray 集群内进行,否则反序列化的 ObjectRef 无法获取到正确的数据。
  • 被序列化的 ObjectRef 其值会一直固定(pin)在对象存储中,不会自动释放。
  • 需要手动释放:务必调用 ray._private.internal_api.free(obj_ref) 显式释放对象,以避免占用空间。
    • 注意:ray._private.internal_api.free(obj_ref) 是私有 API,未来可能变动。
示例:如何序列化 ObjectRef 到外部存储,再反序列化并释放对象
import ray
from ray import cloudpickle

FILE = "external_store.pickle"

ray.init()

my_dict = {"hello": "world"}

obj_ref = ray.put(my_dict)
with open(FILE, "wb+") as f:
    cloudpickle.dump(obj_ref, f)

# ObjectRef remains pinned in memory because
# it was serialized with ray.cloudpickle.
del obj_ref

with open(FILE, "rb") as f:
    new_obj_ref = cloudpickle.load(f)

# The deserialized ObjectRef works as expected.
assert ray.get(new_obj_ref) == my_dict

# Explicitly free the object.
ray._private.internal_api.free(new_obj_ref)

Numpy Arrays

Ray 针对 Numpy Arrays 进行了优化,使用带有带外数据(out-of-band data)的 Pickle protocol 5。 Numpy Arrays 会被存储为只读 Object,并且同一节点上的所有 Ray worker 都可以在不进行复制的情况下(零拷贝读取)从 Object Store 中读取该 Object。

Worker 进程中的每个 Numpy Array Object 都持有一个指针,指向共享内存中对应的 Array。 如果需要对该只读 Object 进行写操作,则必须先将其复制到本地进程内存中。

避免序列化问题的方式

  • 方法一:使用原生类型


    推荐使用原生类型,直接被 Ray 高效序列化和反序列化,跨进程、跨节点安全可靠。

    • Numpy Arrays
    • 基本类型(int / float / str
    • list / dict(由上述类型组成)
  • 方法二:使用 Actor 持有对象


    对于无法序列化的对象:

    • 不要传递
    • 放在 Actor 内部管理

核心思想:

  • ✅ 传“数据”(简单、可序列化)
  • ❌ 传“复杂对象”(容易失败 / 性能差)

修复 “assignment destination is read-only”

由于 Ray 会将 Numpy Arrays 放入 Object Store,当它们作为远程函数的参数被反序列化时,会变成只读对象。

例如,下面的代码会报错:

为避免这个问题,如果你需要修改 Numpy Array,可以在目标端手动复制该数组(arr = arr.copy())。

需要注意的是,这实际上相当于禁用了 Ray 提供的零拷贝反序列化特性。

import ray
import numpy as np

@ray.remote
def f(arr):
    # arr = arr.copy()  # 添加 copy 可以修复该错误
    arr[0] = 1

try:
    ray.get(f.remote(np.zeros(100)))
except ray.exceptions.RayTaskError as e:
    print(e)
# ray.exceptions.RayTaskError(ValueError): ray::f()
#   File "test.py", line 6, in f
#     arr[0] = 1
# ValueError: assignment destination is read-only

特别说明

  • Ray 当前使用 Pickle protocol version 5。

    Pickle 协议

    • Ray 使用:Pickle v5
    • Python 默认:v3
    • v4 / v5:更适合大对象,更高效
  • 对于非原生对象,即使一个对象在另一个对象中被多次引用,Ray 也只会保留一个副本:

    import ray
    import numpy as np
    
    obj = [np.zeros(42)] * 99
    l = ray.get(ray.put(obj))
    assert l[0] is l[1]  # 没问题!
    
  • 在可能的情况下,建议使用 Numpy Arrays 或由 Numpy Arrays 组成的 Python 集合,以获得最佳性能。

  • Lock Objects 通常是不可序列化的,因为复制一个 Lock 没有意义,并且可能导致严重的并发问题。如果你的 Object 中包含 Lock,你可能需要想办法绕过这个问题。

只读 Tensor 的零拷贝序列化

Ray 为只读的 PyTorch Tensor 提供了可选的零拷贝序列化。 Ray 会将这些 Tensor 转换为 Numpy Array,并利用 pickle5 的零拷贝缓冲区共享机制进行序列化。这避免了底层 Tensor 数据的复制,从而在跨 Task / Actor 传递大型 Tensor 时提升性能。

Warning

PyTorch 本身并不原生支持只读 Tensor,因此该功能需要谨慎使用。

当启用该功能时,Ray 不会复制数据,并允许对共享内存进行写操作。如果两个进程位于同一节点上,其中一个进程在 ray.get() 后修改了 Tensor,另一个进程可能会观察到该变化。

该功能在以下条件下效果最佳:

  • Tensor 的 requires_grad = False(即不参与 autograd)
  • Tensor 在内存中是连续的(tensor.is_contiguous()
  • Tensor 位于 CPU 内存中(性能收益更大)
  • 未使用 Ray Direct Transport

该功能默认关闭。可以通过设置环境变量 RAY_ENABLE_ZERO_COPY_TORCH_TENSORS 来启用。在运行脚本前设置该变量即可:

export RAY_ENABLE_ZERO_COPY_TORCH_TENSORS=1
示例:使用 ray.get() 计算一个 1GiB tensor 的和,并利用零拷贝序列化
import ray
import torch
import time

ray.init(runtime_env={"env_vars": {"RAY_ENABLE_ZERO_COPY_TORCH_TENSORS": "1"}})

@ray.remote
def process(tensor):
    return tensor.sum()

x = torch.ones(1024, 1024, 256)
start_time = time.perf_counter()
result = ray.get(process.remote(x))
elapsed_time = time.perf_counter() - start_time
print(f"Elapsed time: {elapsed_time}s")

assert result == x.sum()

在该示例中,启用零拷贝序列化后,端到端延迟降低了 66.3%:

# Without Zero-Copy Serialization
Elapsed time: 23.53883756196592s
# With Zero-Copy Serialization
Elapsed time: 7.933729998010676s

自定义序列化

有时你可能需要自定义序列化过程,因为 Ray 默认使用的序列化器(pickle5 + cloudpickle)无法满足需求(例如无法序列化某些对象,或对某些对象性能较差等)。

至少有三种方式可以定义自定义序列化:

在类中实现 __reduce__ 方法

如果你可以访问并修改类的代码,可以在类中定义 __reduce__ 方法。这是大多数 Python 库常用的方式。

import ray
import sqlite3

class DBConnection:
    def __init__(self, path):
        self.path = path
        self.conn = sqlite3.connect(path)

    # without '__reduce__', the instance is unserializable.
    def __reduce__(self):
        deserializer = DBConnection
        serialized_data = (self.path,)
        return deserializer, serialized_data

original = DBConnection("/tmp/db")
print(original.conn)

copied = ray.get(ray.put(original))
print(copied.conn)

注册自定义序列化器

如果你无法修改类定义,可以注册序列化器:

import ray
import threading

class A:
    def __init__(self, x):
        self.x = x
        self.lock = threading.Lock()  # could not be serialized!

try:
  ray.get(ray.put(A(1)))  # fail!
except TypeError:
  pass

def custom_serializer(a):
    return a.x

def custom_deserializer(b):
    return A(b)

# Register serializer and deserializer for class A:
ray.util.register_serializer(
  A, serializer=custom_serializer, deserializer=custom_deserializer)
ray.get(ray.put(A(1)))  # success!

# You can deregister the serializer at any time.
ray.util.deregister_serializer(A)
try:
  ray.get(ray.put(A(1)))  # fail!
except TypeError:
  pass

# Nothing happens when deregister an unavailable serializer.
ray.util.deregister_serializer(A)

注意

  • 序列化器是每个 worker 本地管理的
  • 每个 worker 都需要单独注册
  • 重新注册会覆盖旧的序列化器
  • API 是幂等的(重复注册不会有副作用)

仅针对特定对象自定义

可以通过包装对象的方式:

import threading

class A:
    def __init__(self, x):
        self.x = x
        self.lock = threading.Lock()  # could not serialize!

try:
   ray.get(ray.put(A(1)))  # fail!
except TypeError:
   pass

class SerializationHelperForA:
    """A helper class for serialization."""
    def __init__(self, a):
        self.a = a

    def __reduce__(self):
        return A, (self.a.x,)

ray.get(ray.put(SerializationHelperForA(A(1))))  # success!
# the serializer only works for a specific object, not all A
# instances, so we still expect failure here.
try:
   ray.get(ray.put(A(1)))  # still fail!
except TypeError:
   pass

自定义异常序列化

当 Ray 任务抛出的异常无法通过默认的 Pickle 机制进行序列化时,你可以注册自定义序列化器来处理这些异常 (注意:序列化器必须在 driver 和所有 worker 中注册)。

import ray
import threading

class CustomError(Exception):
    def __init__(self, message, data):
        self.message = message
        self.data = data
        self.lock = threading.Lock() # Cannot be serialized

def custom_serializer(exc):
    return {"message": exc.message, "data": str(exc.data)}

def custom_deserializer(state):
    return CustomError(state["message"], state["data"])

# Register in the driver
ray.util.register_serializer(
    CustomError,
    serializer=custom_serializer,
    deserializer=custom_deserializer
)

@ray.remote
def task_that_registers_serializer_and_raises():
    # Register the custom serializer in the worker
    ray.util.register_serializer(
        CustomError,
        serializer=custom_serializer,
        deserializer=custom_deserializer
    )

    # Now raise the custom exception
    raise CustomError("Something went wrong", {"complex": "data"})

# The custom exception will be properly serialized across worker boundaries
try:
    ray.get(task_that_registers_serializer_and_raises.remote())
except ray.exceptions.RayTaskError as e:
    print(f"Caught exception: {e.cause}")  # This will be our CustomError

当 Ray 任务抛出自定义异常时,Ray 会:

  1. 使用你提供的自定义序列化器对异常进行序列化
  2. 将其包装在 RayTaskError 中
  3. 反序列化后的异常可以通过 ray_task_error.cause 获取

当序列化失败时,Ray 会抛出一个 UnserializableException,其中包含原始堆栈信息的字符串表示。


故障排查

可以使用 ray.util.inspect_serializability 来定位棘手的 Pickle 序列化问题。 该函数可以用于追踪任意 Python 对象中的潜在不可序列化对象(无论它是函数、类,还是对象实例)。

示例:使用 inspect_serializability 检查一个包含不可序列化对象(threading.Lock)的函数

from ray.util import inspect_serializability
import threading

lock = threading.Lock()

def test():
    print(lock)

inspect_serializability(test, name="test")
=============================================================
Checking Serializability of <function test at 0x7ff130697e50>
=============================================================
!!! FAIL serialization: cannot pickle '_thread.lock' object
Detected 1 global variables. Checking serializability...
    Serializing 'lock' <unlocked _thread.lock object at 0x7ff1306a9f30>...
    !!! FAIL serialization: cannot pickle '_thread.lock' object
    WARNING: Did not find non-serializable object in <unlocked _thread.lock object at 0x7ff1306a9f30>. This may be an oversight.
=============================================================
Variable:

    FailTuple(lock [obj=<unlocked _thread.lock object at 0x7ff1306a9f30>, parent=<function test at 0x7ff130697e50>])

was found to be non-serializable. There may be multiple other undetected variables that were non-serializable.
Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class.
=============================================================
Check https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting for more information.
If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
=============================================================

如果需要更详细的信息,可以在导入 Ray 之前设置环境变量 RAY_PICKLE_VERBOSE_DEBUG='2'。 这会启用基于 Python 的序列化后端,而不是 C-Pickle,因此你可以在序列化过程中调试 Python 代码。不过,这会使序列化变得更慢。


已知问题

在某些 Python 3.8 和 3.9 版本中,用户可能会遇到内存泄漏问题。 这是由于 Python 的 pickle 模块中的一个 bug 导致的。

该问题已在以下版本中修复:

  • Python 3.8.2rc1
  • Python 3.9.0 alpha 4
  • 或更高版本