Key Concepts
Dataset & Block
在 Ray Data 中,有两个最重要的概念:Dataset 和 Block。
Dataset & Block 的关系
该图展示了一个 Dataset,由 3 个 Block(数据块)组成,每个 Block 包含 1000 行数据。
在 Ray Data 中:
- Dataset 本身 保存在触发执行的进程中(通常是程序入口,也叫 Driver)
- Block(数据块) 以对象形式存储在 Ray 的共享内存对象存储(object store)中
在内部实现上,每个 Block 可以用以下格式表示:
- Pandas
DataFrame - PyArrow
Table
Dataset
Dataset = 分布式数据集合(用户主要操作的对象)
常见的 Dataset 使用流程包括:
- 从数据源创建 Dataset
- 对数据进行转换
- 输出结果(存储或用于训练)
关键特性: Lazy Execution
Dataset API 是“懒执行”的:操作不会立即执行,只有在这些时候才运行:
.show().materialize().take()等
Lazy Execution 的好处:
- Ray 可以优化执行计划
- 自动做流水线(streaming)处理
Block
Block(数据块) 是 Ray Data 中最基本的处理单元,表示 Dataset 的一个分区(partition),包含若干行数据。Block 通常采用列式格式(如 Apache Arrow)进行存储和处理。
具体来说:
- 每个 Dataset 都会被划分为多个 Block
- 整个 Dataset 的处理是在 Block 级别进行分布式并行执行的
- 各个 Block 通常是相互独立、并行处理的
Operators & Plans
Ray Data 使用两阶段规划(two-phase planning)来高效执行数据处理操作。
Dataset 的执行计划不会立即执行
只有在以下情况才会触发:
.show().materialize().take()等
当你使用 Dataset API 编写程序时:
-
Ray Data 会先构建一个 逻辑计划(logical plan)
👉 用来描述“要做什么操作”
-
当真正开始执行时,会把它转换为 物理计划(physical plan)
👉 用来描述“具体怎么执行这些操作”
这些计划都是由 Operator(算子) 组成的, Operator 是核心构建单元。
-
逻辑计划(Logical Plan): 由 逻辑算子(logical operators) 组成, 用来描述“做什么”
例如:
dataset = ray.data.read_parquet(...)会生成ReadOp去指定数据源和读取方式 -
物理计划(Physical Plan): 由 物理算子(physical operators) 组成, 用来描述“怎么做”
例如:Ray Data 会把
ReadOp转换为TaskPoolMapOperator物理算子,启动多个 Ray Task 并行读取数据
示例:Ray Data 如何构建逻辑计划(logical plan)
当你链式调用操作时,Ray Data 会在后台自动构建逻辑计划:
dataset = ray.data.range(100)
dataset = dataset.add_column("test", lambda x: x["id"] + 1)
dataset = dataset.select_columns("test")
Project
+- MapBatches(add_column)
+- Dataset(schema={...})
当真正开始执行时:
- Ray Data 会先优化逻辑计划(logical plan)
- 然后将其转换为物理计划(physical plan)
👉 物理计划是由一系列算子组成的,用于真正执行数据转换
转换过程中的特点
-
一个逻辑算子 → 多个物理算子
例如:
ReadOp->InputDataBuffer+TaskPoolMapOperator -
逻辑计划和物理计划都会进行多轮优化
例如:
OperatorFusionRule会尝试把多个 map 合并,减少序列化开销(提升性能)
物理算子执行流程
- 接收一批 block 引用(stream)
-
执行操作:
- 用 Ray Task / Actor 做计算
- 或处理数据引用
-
输出新的 block 流
Streaming execution model
Ray Data 可以通过算子流水线(pipeline)对数据进行流式处理(streaming),从而高效处理大规模数据集。
这意味着:
- 不同的算子(operators)可以独立扩展
- 同时并发执行(concurrently)
- 实现更灵活、细粒度的资源分配
例如,两个 map 操作(一个需要 CPU,一个需要 GPU),Streaming 模型可以让它们:
- 同时运行
- 各自使用不同资源
- 保持高性能
Streaming 模型主要适用于非 shuffle 操作(例如 map、filter 等)
Shuffle 操作(例如 sort、groupby)会中断 streaming:
- 因为它们需要全量数据(必须先收集再计算)
- 会触发 materialize(阻塞流水线)
示例:展示了 Streaming 模型如何工作:
import ray
# Create a dataset with 1K rows
ds = ray.data.read_parquet(...)
# Define a pipeline of operations
ds = ds.map(cpu_function, num_cpus=2)
ds = ds.map(GPUClass, num_gpus=1)
ds = ds.map(cpu_function2, num_cpus=4)
ds = ds.filter(filter_func)
# Data starts flowing when you call a method like show()
ds.show(5)
产生的对应逻辑计划如下:
Filter(filter_func)
+- Map(cpu_function2)
+- Map(GPUClass)
+- Map(cpu_function)
+- Dataset(schema={...})
对应的 Streaming 拓扑如下:
在流式执行模型(streaming execution model)中:
- 各个算子(operators)通过 流水线(pipeline)连接
- 每个算子的输出队列(output queue)
- 会直接作为下游算子的输入队列(input queue)
👉 从而形成一个高效的数据流动过程
这种设计可以:
- 让多个阶段同时执行(concurrently)
- 提高整体性能
- 提高资源利用率