跳转至

Quick Start

Abstarct

介绍如何使用 Ray Data 的核心抽象 Dataset(数据集) 来进行分布式数据处理。

主要涵盖四个核心能力:

  • 加载数据(Loading data)
  • 数据转换(Transforming data)
  • 数据消费(Consuming data)
  • 数据保存(Saving data)

可以理解为: Ray Data = “像 Pandas 一样写代码 + 自动变成分布式 + 支持超大数据规模”

核心概念:Dataset

在 Ray Data 中,最核心的概念是 Dataset:Dataset = 分布式数据集合

特点:

  • 专为机器学习任务设计
  • 支持超出单机内存的数据规模
  • 自动分布式处理(无需手动拆分数据)

Loading data

你可以从多种来源创建 Dataset:

  • 本地文件
  • Python 对象
  • 云存储(如 S3 / GCS)

Ray Data 基于 Apache Arrow,支持多种文件系统。

从 S3 读取 CSV

import ray

# Load a CSV dataset directly from S3
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

# Preview the first record
ds.show(limit=1)
{
'sepal length (cm)': 5.1,
'sepal width (cm)': 3.5,
'petal length (cm)': 1.4,
'petal width (cm)': 0.2,
'target': 0
}

👉 可以理解为:直接把远程数据加载成分布式表


Transforming data

你可以通过自定义函数(UDF)对数据进行处理。

Ray 会自动

  • 并行执行
  • 分布式调度

计算花瓣面积

from typing import Dict
import numpy as np

# Define a transformation to compute a "petal area" attribute
def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    vec_a = batch["petal length (cm)"]
    vec_b = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = np.round(vec_a * vec_b, 2)
    return batch

# Apply the transformation to our dataset
transformed_ds = ds.map_batches(transform_batch)

# View the updated schema with the new column
# .materialize() will execute all the lazy transformations and
# materialize the dataset into object store memory
print(transformed_ds.materialize())
shape: (150, 6)
╭───────────────────┬──────────────────┬───────────────────┬──────────────────┬────────┬───────────────────╮
│ sepal length (cm) ┆ sepal width (cm) ┆ petal length (cm) ┆ petal width (cm) ┆ target ┆ petal area (cm^2) │
│ ---               ┆ ---              ┆ ---               ┆ ---              ┆ ---    ┆ ---               │
│ double            ┆ double           ┆ double            ┆ double           ┆ int64  ┆ double            │
╞═══════════════════╪══════════════════╪═══════════════════╪══════════════════╪════════╪═══════════════════╡
│ 5.1               ┆ 3.5              ┆ 1.4               ┆ 0.2              ┆ 0      ┆ 0.28              │
│ 4.9               ┆ 3.0              ┆ 1.4               ┆ 0.2              ┆ 0      ┆ 0.28              │
│ 4.7               ┆ 3.2              ┆ 1.3               ┆ 0.2              ┆ 0      ┆ 0.26              │
│ 4.6               ┆ 3.1              ┆ 1.5               ┆ 0.2              ┆ 0      ┆ 0.3               │
│ 5.0               ┆ 3.6              ┆ 1.4               ┆ 0.2              ┆ 0      ┆ 0.28              │
│ …                 ┆ …                ┆ …                 ┆ …                ┆ …      ┆ …                 │
│ 6.7               ┆ 3.0              ┆ 5.2               ┆ 2.3              ┆ 2      ┆ 11.96             │
│ 6.3               ┆ 2.5              ┆ 5.0               ┆ 1.9              ┆ 2      ┆ 9.5               │
│ 6.5               ┆ 3.0              ┆ 5.2               ┆ 2.0              ┆ 2      ┆ 10.4              │
│ 6.2               ┆ 3.4              ┆ 5.4               ┆ 2.3              ┆ 2      ┆ 12.42             │
│ 5.9               ┆ 3.0              ┆ 5.1               ┆ 1.8              ┆ 2      ┆ 9.18              │
╰───────────────────┴──────────────────┴───────────────────┴──────────────────┴────────┴───────────────────╯
(Showing 10 of 150 rows)
👉 map_batches = 对每个数据批次执行函数(类似批处理 map 操作)

Lazy Execution

在 Ray Data 中,许多操作都是惰性执行的,这意味着它们不会立即执行,而是会在需要时执行。 这使得 Ray Data 可以高效地处理大规模数据。

.materialize() 的作用

  • 触发所有“延迟计算”(lazy execution)
  • 将结果真正计算并存入内存

Consuming data

可以用多种方式读取数据, 常见方式:

  • take_batch():取一小批数据
  • iter_batches():逐批遍历
  • 传给 Ray 的 Task / Actor 继续处理

👉 可以理解为:把分布式数据“拿出来用”

取一批数据

print(transformed_ds.take_batch(batch_size=3))
{
    'sepal length (cm)': array([...]),
    'sepal width (cm)': array([...]),
    ...
}


Saving data

处理后的数据可以导出到多种格式

👉 数据会自动分片(分布式输出)

导出为 Parquet

import os

transformed_ds.write_parquet("/tmp/iris")

print(os.listdir("/tmp/iris"))
['..._000000.parquet', '..._000001.parquet']