跳转至

工具类

适用场景

Ray 提供了一些实用工具类,用于简化 Actor 调度与任务间通信。

工具 主要用途 解决的核心问题
ActorPool 批量任务调度、实现负载均衡 谁来执行任务
Queue 多任务 / Actor 之间的数据传递 任务之间如何通信

Actor Pool

ray.util.ActorPool 是一个类似 multiprocessing.Pool 的工具类,用于在一组固定的 Actor 上调度任务。

import ray
from ray.util import ActorPool


@ray.remote
class Actor:
    def double(self, n):
        return n * 2


a1, a2 = Actor.remote(), Actor.remote()
pool = ActorPool([a1, a2])

# pool.map(..) returns a Python generator object ActorPool.map
gen = pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])
print(list(gen))
# [2, 4, 6, 8]

Note

  • 在固定 Actor 池上进行任务分发
  • 自动负载均衡(哪个 Actor 空闲就用哪个)
  • 接口风格类似 multiprocessing.Pool

使用 Ray Queue 进行消息传递

在更复杂的场景中,仅靠简单信号同步是不够的。如果需要在多个任务或 Actor 之间传递数据,可以使用 ray.util.queue.Queue

import ray
from ray.util.queue import Queue, Empty

ray.init()
# 可以在多个 Task / Actor 之间共享这个队列对象
queue = Queue(maxsize=100)


@ray.remote
def consumer(id, queue):
    try:
        while True:
            next_item = queue.get(block=True, timeout=1)
            print(f"consumer {id} got work {next_item}")
    except Empty:
        pass


# 生产数据
[queue.put(i) for i in range(10)]
print("Put work 1 - 10 to queue...")

# 启动多个消费者
consumers = [consumer.remote(id, queue) for id in range(2)]
ray.get(consumers)

import time
time.sleep(1)

Note

  • 支持多生产者 / 多消费者模型
  • 可在 Actor 和 Task 之间共享
  • API 风格类似:
    • asyncio.Queue:用于异步编程(需 await
    • queue.Queue:用于多线程同步(阻塞调用,需 block=True