Quick Start
Abstarct
介绍如何使用 Ray Data 的核心抽象 Dataset(数据集) 来进行分布式数据处理。
主要涵盖四个核心能力:
- 加载数据(Loading data)
- 数据转换(Transforming data)
- 数据消费(Consuming data)
- 数据保存(Saving data)
可以理解为: Ray Data = “像 Pandas 一样写代码 + 自动变成分布式 + 支持超大数据规模”
核心概念:Dataset
在 Ray Data 中,最核心的概念是 Dataset:Dataset = 分布式数据集合
特点:
- 专为机器学习任务设计
- 支持超出单机内存的数据规模
- 自动分布式处理(无需手动拆分数据)
Loading data
你可以从多种来源创建 Dataset:
- 本地文件
- Python 对象
- 云存储(如 S3 / GCS)
Ray Data 基于 Apache Arrow,支持多种文件系统。
从 S3 读取 CSV
import ray
# Load a CSV dataset directly from S3
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
# Preview the first record
ds.show(limit=1)
{
'sepal length (cm)': 5.1,
'sepal width (cm)': 3.5,
'petal length (cm)': 1.4,
'petal width (cm)': 0.2,
'target': 0
}
👉 可以理解为:直接把远程数据加载成分布式表
Transforming data
你可以通过自定义函数(UDF)对数据进行处理。
Ray 会自动:
- 并行执行
- 分布式调度
计算花瓣面积
from typing import Dict
import numpy as np
# Define a transformation to compute a "petal area" attribute
def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
vec_a = batch["petal length (cm)"]
vec_b = batch["petal width (cm)"]
batch["petal area (cm^2)"] = np.round(vec_a * vec_b, 2)
return batch
# Apply the transformation to our dataset
transformed_ds = ds.map_batches(transform_batch)
# View the updated schema with the new column
# .materialize() will execute all the lazy transformations and
# materialize the dataset into object store memory
print(transformed_ds.materialize())
shape: (150, 6)
╭───────────────────┬──────────────────┬───────────────────┬──────────────────┬────────┬───────────────────╮
│ sepal length (cm) ┆ sepal width (cm) ┆ petal length (cm) ┆ petal width (cm) ┆ target ┆ petal area (cm^2) │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ double ┆ double ┆ double ┆ double ┆ int64 ┆ double │
╞═══════════════════╪══════════════════╪═══════════════════╪══════════════════╪════════╪═══════════════════╡
│ 5.1 ┆ 3.5 ┆ 1.4 ┆ 0.2 ┆ 0 ┆ 0.28 │
│ 4.9 ┆ 3.0 ┆ 1.4 ┆ 0.2 ┆ 0 ┆ 0.28 │
│ 4.7 ┆ 3.2 ┆ 1.3 ┆ 0.2 ┆ 0 ┆ 0.26 │
│ 4.6 ┆ 3.1 ┆ 1.5 ┆ 0.2 ┆ 0 ┆ 0.3 │
│ 5.0 ┆ 3.6 ┆ 1.4 ┆ 0.2 ┆ 0 ┆ 0.28 │
│ … ┆ … ┆ … ┆ … ┆ … ┆ … │
│ 6.7 ┆ 3.0 ┆ 5.2 ┆ 2.3 ┆ 2 ┆ 11.96 │
│ 6.3 ┆ 2.5 ┆ 5.0 ┆ 1.9 ┆ 2 ┆ 9.5 │
│ 6.5 ┆ 3.0 ┆ 5.2 ┆ 2.0 ┆ 2 ┆ 10.4 │
│ 6.2 ┆ 3.4 ┆ 5.4 ┆ 2.3 ┆ 2 ┆ 12.42 │
│ 5.9 ┆ 3.0 ┆ 5.1 ┆ 1.8 ┆ 2 ┆ 9.18 │
╰───────────────────┴──────────────────┴───────────────────┴──────────────────┴────────┴───────────────────╯
(Showing 10 of 150 rows)
map_batches = 对每个数据批次执行函数(类似批处理 map 操作)
Lazy Execution
在 Ray Data 中,许多操作都是惰性执行的,这意味着它们不会立即执行,而是会在需要时执行。 这使得 Ray Data 可以高效地处理大规模数据。
.materialize() 的作用
- 触发所有“延迟计算”(lazy execution)
- 将结果真正计算并存入内存
Consuming data
可以用多种方式读取数据, 常见方式:
take_batch():取一小批数据iter_batches():逐批遍历- 传给 Ray 的 Task / Actor 继续处理
👉 可以理解为:把分布式数据“拿出来用”
取一批数据
print(transformed_ds.take_batch(batch_size=3))
{
'sepal length (cm)': array([...]),
'sepal width (cm)': array([...]),
...
}
Saving data
处理后的数据可以导出到多种格式:
- Parquet(
write_parquet()) - CSV(
write_csv()) - 其他存储系统
👉 数据会自动分片(分布式输出)
导出为 Parquet
import os
transformed_ds.write_parquet("/tmp/iris")
print(os.listdir("/tmp/iris"))
['..._000000.parquet', '..._000001.parquet']