Serialization
Abstract
由于 Ray 的进程之间不共享内存空间,在 Worker 和节点之间传输数据时需要进行序列化和反序列化。 Ray 使用 Plasma Object Store 来高效地在不同进程和不同节点之间传输对象。
在同一节点上,Object Store 中的 NumPy array 可以在多个 Worker 之间共享(零拷贝反序列化)。
概述
Ray 使用一个定制的 Pickle protocol version 5 来替代原先的 PyArrow 序列化器。 这消除了之前的一些限制(例如无法序列化递归对象)。
目前,Ray 兼容 Pickle protocol version 5,并借助 cloudpickle 支持序列化更广泛的对象类型(例如 lambda 函数、嵌套函数、动态类等)。
Plasma Object Store
Plasma 是一个基于内存的对象存储系统,最初作为 Apache Arrow 的一部分开发。 在 Ray 1.0.0 发布之前,Ray 将 Arrow 中的 Plasma 代码分叉(fork)到自身代码库中,以便更好地适配 Ray 的架构和性能需求。
Plasma 用于在不同进程和节点之间高效传输对象。所有存储在 Plasma Object Store 中的对象都是不可变的,并存储在共享内存中,这使得同一节点上的多个 Worker 可以高效访问这些对象。
数据流行为
每个节点都有自己的 Object Store。当数据被放入 Object Store 时,并不会自动广播到其他节点。数据会保留在本地节点,直到其他节点上的任务或 Actor 请求该数据时才会进行传输。
flowchart LR
A[put] --> B[本地 object store]
B -- 其他节点请求 --> C[才会传输]
序列化 ObjectRef
应当将使用 ray.cloudpickle 显式序列化 ObjectRef 作为最后的手段。
推荐的方式
推荐的方式是通过 Ray 的 Task / Actor 方法参数和返回值来直接传递 ObjectRef。
Ray 的 ObjectRef 可以通过 ray.cloudpickle 进行序列化。随后可以反序列化该 ObjectRef,并使用 ray.get() 获取其值。
注意事项
- 必须使用
ray.cloudpickle进行序列化,其他 pickle 工具(如标准库 pickle)不保证兼容。 - 序列化与反序列化必须在同一个 Ray 集群内进行,否则反序列化的 ObjectRef 无法获取到正确的数据。
- 被序列化的 ObjectRef 其值会一直固定(pin)在对象存储中,不会自动释放。
- 需要手动释放:务必调用
ray._private.internal_api.free(obj_ref)显式释放对象,以避免占用空间。- 注意:
ray._private.internal_api.free(obj_ref)是私有 API,未来可能变动。
- 注意:
import ray
from ray import cloudpickle
FILE = "external_store.pickle"
ray.init()
my_dict = {"hello": "world"}
obj_ref = ray.put(my_dict)
with open(FILE, "wb+") as f:
cloudpickle.dump(obj_ref, f)
# ObjectRef remains pinned in memory because
# it was serialized with ray.cloudpickle.
del obj_ref
with open(FILE, "rb") as f:
new_obj_ref = cloudpickle.load(f)
# The deserialized ObjectRef works as expected.
assert ray.get(new_obj_ref) == my_dict
# Explicitly free the object.
ray._private.internal_api.free(new_obj_ref)
Numpy Arrays
Ray 针对 Numpy Arrays 进行了优化,使用带有带外数据(out-of-band data)的 Pickle protocol 5。 Numpy Arrays 会被存储为只读 Object,并且同一节点上的所有 Ray worker 都可以在不进行复制的情况下(零拷贝读取)从 Object Store 中读取该 Object。
Worker 进程中的每个 Numpy Array Object 都持有一个指针,指向共享内存中对应的 Array。 如果需要对该只读 Object 进行写操作,则必须先将其复制到本地进程内存中。
避免序列化问题的方式
-
方法一:使用原生类型
推荐使用原生类型,直接被 Ray 高效序列化和反序列化,跨进程、跨节点安全可靠。
- Numpy Arrays
- 基本类型(
int/float/str) list/dict(由上述类型组成)
-
方法二:使用 Actor 持有对象
对于无法序列化的对象:
- 不要传递
- 放在 Actor 内部管理
核心思想:
- ✅ 传“数据”(简单、可序列化)
- ❌ 传“复杂对象”(容易失败 / 性能差)
修复 “assignment destination is read-only”
由于 Ray 会将 Numpy Arrays 放入 Object Store,当它们作为远程函数的参数被反序列化时,会变成只读对象。
例如,下面的代码会报错:
为避免这个问题,如果你需要修改 Numpy Array,可以在目标端手动复制该数组(arr = arr.copy())。
需要注意的是,这实际上相当于禁用了 Ray 提供的零拷贝反序列化特性。
import ray
import numpy as np
@ray.remote
def f(arr):
# arr = arr.copy() # 添加 copy 可以修复该错误
arr[0] = 1
try:
ray.get(f.remote(np.zeros(100)))
except ray.exceptions.RayTaskError as e:
print(e)
# ray.exceptions.RayTaskError(ValueError): ray::f()
# File "test.py", line 6, in f
# arr[0] = 1
# ValueError: assignment destination is read-only
特别说明
-
Ray 当前使用 Pickle protocol version 5。
Pickle 协议
- Ray 使用:Pickle v5
- Python 默认:v3
- v4 / v5:更适合大对象,更高效
-
对于非原生对象,即使一个对象在另一个对象中被多次引用,Ray 也只会保留一个副本:
import ray import numpy as np obj = [np.zeros(42)] * 99 l = ray.get(ray.put(obj)) assert l[0] is l[1] # 没问题! -
在可能的情况下,建议使用 Numpy Arrays 或由 Numpy Arrays 组成的 Python 集合,以获得最佳性能。
- Lock Objects 通常是不可序列化的,因为复制一个 Lock 没有意义,并且可能导致严重的并发问题。如果你的 Object 中包含 Lock,你可能需要想办法绕过这个问题。
只读 Tensor 的零拷贝序列化
Ray 为只读的 PyTorch Tensor 提供了可选的零拷贝序列化。 Ray 会将这些 Tensor 转换为 Numpy Array,并利用 pickle5 的零拷贝缓冲区共享机制进行序列化。这避免了底层 Tensor 数据的复制,从而在跨 Task / Actor 传递大型 Tensor 时提升性能。
Warning
PyTorch 本身并不原生支持只读 Tensor,因此该功能需要谨慎使用。
当启用该功能时,Ray 不会复制数据,并允许对共享内存进行写操作。如果两个进程位于同一节点上,其中一个进程在 ray.get() 后修改了 Tensor,另一个进程可能会观察到该变化。
该功能在以下条件下效果最佳:
- Tensor 的
requires_grad = False(即不参与 autograd) - Tensor 在内存中是连续的(
tensor.is_contiguous()) - Tensor 位于 CPU 内存中(性能收益更大)
- 未使用 Ray Direct Transport
该功能默认关闭。可以通过设置环境变量 RAY_ENABLE_ZERO_COPY_TORCH_TENSORS 来启用。在运行脚本前设置该变量即可:
export RAY_ENABLE_ZERO_COPY_TORCH_TENSORS=1
示例:使用 ray.get() 计算一个 1GiB tensor 的和,并利用零拷贝序列化
import ray
import torch
import time
ray.init(runtime_env={"env_vars": {"RAY_ENABLE_ZERO_COPY_TORCH_TENSORS": "1"}})
@ray.remote
def process(tensor):
return tensor.sum()
x = torch.ones(1024, 1024, 256)
start_time = time.perf_counter()
result = ray.get(process.remote(x))
elapsed_time = time.perf_counter() - start_time
print(f"Elapsed time: {elapsed_time}s")
assert result == x.sum()
在该示例中,启用零拷贝序列化后,端到端延迟降低了 66.3%:
# Without Zero-Copy Serialization
Elapsed time: 23.53883756196592s
# With Zero-Copy Serialization
Elapsed time: 7.933729998010676s
自定义序列化
有时你可能需要自定义序列化过程,因为 Ray 默认使用的序列化器(pickle5 + cloudpickle)无法满足需求(例如无法序列化某些对象,或对某些对象性能较差等)。
至少有三种方式可以定义自定义序列化:
在类中实现 __reduce__ 方法
如果你可以访问并修改类的代码,可以在类中定义 __reduce__ 方法。这是大多数 Python 库常用的方式。
import ray
import sqlite3
class DBConnection:
def __init__(self, path):
self.path = path
self.conn = sqlite3.connect(path)
# without '__reduce__', the instance is unserializable.
def __reduce__(self):
deserializer = DBConnection
serialized_data = (self.path,)
return deserializer, serialized_data
original = DBConnection("/tmp/db")
print(original.conn)
copied = ray.get(ray.put(original))
print(copied.conn)
注册自定义序列化器
如果你无法修改类定义,可以注册序列化器:
import ray
import threading
class A:
def __init__(self, x):
self.x = x
self.lock = threading.Lock() # could not be serialized!
try:
ray.get(ray.put(A(1))) # fail!
except TypeError:
pass
def custom_serializer(a):
return a.x
def custom_deserializer(b):
return A(b)
# Register serializer and deserializer for class A:
ray.util.register_serializer(
A, serializer=custom_serializer, deserializer=custom_deserializer)
ray.get(ray.put(A(1))) # success!
# You can deregister the serializer at any time.
ray.util.deregister_serializer(A)
try:
ray.get(ray.put(A(1))) # fail!
except TypeError:
pass
# Nothing happens when deregister an unavailable serializer.
ray.util.deregister_serializer(A)
注意
- 序列化器是每个 worker 本地管理的
- 每个 worker 都需要单独注册
- 重新注册会覆盖旧的序列化器
- API 是幂等的(重复注册不会有副作用)
仅针对特定对象自定义
可以通过包装对象的方式:
import threading
class A:
def __init__(self, x):
self.x = x
self.lock = threading.Lock() # could not serialize!
try:
ray.get(ray.put(A(1))) # fail!
except TypeError:
pass
class SerializationHelperForA:
"""A helper class for serialization."""
def __init__(self, a):
self.a = a
def __reduce__(self):
return A, (self.a.x,)
ray.get(ray.put(SerializationHelperForA(A(1)))) # success!
# the serializer only works for a specific object, not all A
# instances, so we still expect failure here.
try:
ray.get(ray.put(A(1))) # still fail!
except TypeError:
pass
自定义异常序列化
当 Ray 任务抛出的异常无法通过默认的 Pickle 机制进行序列化时,你可以注册自定义序列化器来处理这些异常 (注意:序列化器必须在 driver 和所有 worker 中注册)。
import ray
import threading
class CustomError(Exception):
def __init__(self, message, data):
self.message = message
self.data = data
self.lock = threading.Lock() # Cannot be serialized
def custom_serializer(exc):
return {"message": exc.message, "data": str(exc.data)}
def custom_deserializer(state):
return CustomError(state["message"], state["data"])
# Register in the driver
ray.util.register_serializer(
CustomError,
serializer=custom_serializer,
deserializer=custom_deserializer
)
@ray.remote
def task_that_registers_serializer_and_raises():
# Register the custom serializer in the worker
ray.util.register_serializer(
CustomError,
serializer=custom_serializer,
deserializer=custom_deserializer
)
# Now raise the custom exception
raise CustomError("Something went wrong", {"complex": "data"})
# The custom exception will be properly serialized across worker boundaries
try:
ray.get(task_that_registers_serializer_and_raises.remote())
except ray.exceptions.RayTaskError as e:
print(f"Caught exception: {e.cause}") # This will be our CustomError
当 Ray 任务抛出自定义异常时,Ray 会:
- 使用你提供的自定义序列化器对异常进行序列化
- 将其包装在 RayTaskError 中
- 反序列化后的异常可以通过
ray_task_error.cause获取
当序列化失败时,Ray 会抛出一个 UnserializableException,其中包含原始堆栈信息的字符串表示。
故障排查
可以使用 ray.util.inspect_serializability 来定位棘手的 Pickle 序列化问题。
该函数可以用于追踪任意 Python 对象中的潜在不可序列化对象(无论它是函数、类,还是对象实例)。
示例:使用 inspect_serializability 检查一个包含不可序列化对象(threading.Lock)的函数
from ray.util import inspect_serializability
import threading
lock = threading.Lock()
def test():
print(lock)
inspect_serializability(test, name="test")
=============================================================
Checking Serializability of <function test at 0x7ff130697e50>
=============================================================
!!! FAIL serialization: cannot pickle '_thread.lock' object
Detected 1 global variables. Checking serializability...
Serializing 'lock' <unlocked _thread.lock object at 0x7ff1306a9f30>...
!!! FAIL serialization: cannot pickle '_thread.lock' object
WARNING: Did not find non-serializable object in <unlocked _thread.lock object at 0x7ff1306a9f30>. This may be an oversight.
=============================================================
Variable:
FailTuple(lock [obj=<unlocked _thread.lock object at 0x7ff1306a9f30>, parent=<function test at 0x7ff130697e50>])
was found to be non-serializable. There may be multiple other undetected variables that were non-serializable.
Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class.
=============================================================
Check https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting for more information.
If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
=============================================================
如果需要更详细的信息,可以在导入 Ray 之前设置环境变量 RAY_PICKLE_VERBOSE_DEBUG='2'。
这会启用基于 Python 的序列化后端,而不是 C-Pickle,因此你可以在序列化过程中调试 Python 代码。不过,这会使序列化变得更慢。
已知问题
在某些 Python 3.8 和 3.9 版本中,用户可能会遇到内存泄漏问题。 这是由于 Python 的 pickle 模块中的一个 bug 导致的。
该问题已在以下版本中修复:
- Python 3.8.2rc1
- Python 3.9.0 alpha 4
- 或更高版本