带外通信
通常情况下,Ray 中的 Actor 通信通过以下方式完成:
- Actor 方法调用
- 分布式对象存储(Object Store)传递数据
但在某些场景下,使用带外通信(Out-of-band Communication)会更加高效或灵活。
封装通信库
某些分布式库已经具备成熟且高性能的通信机制,此时 Ray 主要作为调度器(scheduler)使用。
- Actor 之间的实际通信通过外部通信栈完成
- Ray 仅负责任务调度与资源管理
示例:
- Horovod-on-Ray:使用 NCCL / MPI 做集合通信
- RayDP:使用 Spark 内部的 RPC / object manager
更多细节参考 Blog: Ray Distributed Library Patterns
Ray Collective
Ray 提供了 ray.util.collective 和 Collective Communication Library 用于高效的带外通信:
- 支持
collective communication(集合通信) - 支持
point-to-point(点对点通信) - 可用于 CPU / GPU 分布式计算
HTTP / gRPC 服务
可以在 Actor 内启动一个服务(如 HTTP / gRPC), 让 Ray 集群外的客户端直接与 Actor 通信。
import ray
import asyncio
import requests
from aiohttp import web
@ray.remote
class Counter:
async def __init__(self):
self.counter = 0
asyncio.get_running_loop().create_task(self.run_http_server())
async def run_http_server(self):
app = web.Application()
app.add_routes([web.get("/", self.get)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", 25001)
await site.start()
async def get(self, request):
return web.Response(text=str(self.counter))
async def increment(self):
self.counter = self.counter + 1
ray.init()
counter = Counter.remote()
[ray.get(counter.increment.remote()) for i in range(5)]
r = requests.get("http://127.0.0.1:25001/")
assert r.text == "5"
Tip
除 HTTP 外,还可以在 Actor 中运行 gRPC 服务。
- gRPC 服务
- WebSocket 服务
- 自定义 TCP 服务
限制与注意事项
Warning
使用带外通信时,需要注意:
- Ray 不会管理 Actor 之间的通信过程
- 某些功能将失效,例如:
- 分布式引用计数(distributed reference counting)
- 不要通过带外通道传递
ObjectRef