跳转至

Key Concepts

Dataset & Block

在 Ray Data 中,有两个最重要的概念:DatasetBlock

Dataset & Block 的关系

该图展示了一个 Dataset,由 3 个 Block(数据块)组成,每个 Block 包含 1000 行数据。

Dataset and Block

在 Ray Data 中:

  • Dataset 本身 保存在触发执行的进程中(通常是程序入口,也叫 Driver)
  • Block(数据块) 以对象形式存储在 Ray 的共享内存对象存储(object store)

在内部实现上,每个 Block 可以用以下格式表示:

  • Pandas DataFrame
  • PyArrow Table

Dataset

Dataset = 分布式数据集合(用户主要操作的对象)

常见的 Dataset 使用流程包括:

  • 从数据源创建 Dataset
  • 对数据进行转换
  • 输出结果(存储或用于训练)

关键特性: Lazy Execution

Dataset API 是“懒执行”的:操作不会立即执行,只有在这些时候才运行:

  • .show()
  • .materialize()
  • .take()

Lazy Execution 的好处:

  • Ray 可以优化执行计划
  • 自动做流水线(streaming)处理

Block

Block(数据块) 是 Ray Data 中最基本的处理单元,表示 Dataset 的一个分区(partition),包含若干行数据。Block 通常采用列式格式(如 Apache Arrow)进行存储和处理。

具体来说:

  • 每个 Dataset 都会被划分为多个 Block
  • 整个 Dataset 的处理是在 Block 级别进行分布式并行执行
  • 各个 Block 通常是相互独立、并行处理

Operators & Plans

Ray Data 使用两阶段规划(two-phase planning)来高效执行数据处理操作。

Dataset 的执行计划不会立即执行

只有在以下情况才会触发:

  • .show()
  • .materialize()
  • .take()

当你使用 Dataset API 编写程序时:

  1. Ray Data 会先构建一个 逻辑计划(logical plan)

    👉 用来描述“要做什么操作”

  2. 当真正开始执行时,会把它转换为 物理计划(physical plan)

    👉 用来描述“具体怎么执行这些操作”

Execution Plan

这些计划都是由 Operator(算子) 组成的, Operator 是核心构建单元

  • 逻辑计划(Logical Plan): 由 逻辑算子(logical operators) 组成, 用来描述“做什么”

    例如:dataset = ray.data.read_parquet(...) 会生成 ReadOp 去指定数据源和读取方式

  • 物理计划(Physical Plan): 由 物理算子(physical operators) 组成, 用来描述“怎么做”

    例如:Ray Data 会把 ReadOp 转换为 TaskPoolMapOperator 物理算子,启动多个 Ray Task 并行读取数据

示例:Ray Data 如何构建逻辑计划(logical plan)

当你链式调用操作时,Ray Data 会在后台自动构建逻辑计划:

dataset = ray.data.range(100)
dataset = dataset.add_column("test", lambda x: x["id"] + 1)
dataset = dataset.select_columns("test")
你可以通过打印 Dataset 来查看逻辑计划:
Project
+- MapBatches(add_column)
    +- Dataset(schema={...})

当真正开始执行时:

  • Ray Data 会先优化逻辑计划(logical plan)
  • 然后将其转换为物理计划(physical plan)

👉 物理计划是由一系列算子组成的,用于真正执行数据转换

转换过程中的特点

  • 一个逻辑算子 → 多个物理算子

    例如:ReadOp -> InputDataBuffer + TaskPoolMapOperator

  • 逻辑计划和物理计划都会进行多轮优化

    例如:OperatorFusionRule 会尝试把多个 map 合并,减少序列化开销(提升性能)

物理算子执行流程

  1. 接收一批 block 引用(stream)
  2. 执行操作:

    • 用 Ray Task / Actor 做计算
    • 或处理数据引用
  3. 输出新的 block 流


Streaming execution model

Ray Data 可以通过算子流水线(pipeline)对数据进行流式处理(streaming),从而高效处理大规模数据集。

这意味着:

  • 不同的算子(operators)可以独立扩展
  • 同时并发执行(concurrently)
  • 实现更灵活、细粒度的资源分配

例如,两个 map 操作(一个需要 CPU,一个需要 GPU),Streaming 模型可以让它们:

  • 同时运行
  • 各自使用不同资源
  • 保持高性能

Streaming 模型主要适用于非 shuffle 操作(例如 mapfilter 等)


Shuffle 操作(例如 sortgroupby)会中断 streaming:

  • 因为它们需要全量数据(必须先收集再计算)
  • 会触发 materialize(阻塞流水线)

示例:展示了 Streaming 模型如何工作:

import ray

# Create a dataset with 1K rows
ds = ray.data.read_parquet(...)

# Define a pipeline of operations
ds = ds.map(cpu_function, num_cpus=2)
ds = ds.map(GPUClass, num_gpus=1)
ds = ds.map(cpu_function2, num_cpus=4)
ds = ds.filter(filter_func)

# Data starts flowing when you call a method like show()
ds.show(5)

产生的对应逻辑计划如下:

Filter(filter_func)
+- Map(cpu_function2)
+- Map(GPUClass)
    +- Map(cpu_function)
            +- Dataset(schema={...})

对应的 Streaming 拓扑如下:

Streaming Topology

在流式执行模型(streaming execution model)中:

  • 各个算子(operators)通过 流水线(pipeline)连接
  • 每个算子的输出队列(output queue)
  • 会直接作为下游算子的输入队列(input queue)

👉 从而形成一个高效的数据流动过程

这种设计可以:

  • 让多个阶段同时执行(concurrently)
  • 提高整体性能
  • 提高资源利用率