Pipeline 处理器与调度器
本文讨论 ClickHouse 的核心技术:Processor(处理器)和 DAG 调度器。
Pipeline 问题
在传统数据库系统中,一个查询处理流程大致如下:
graph LR
A[Parse] --> B[Plan]
B[Plan] --> C[Execute]
C[Execute] --> D[Storage]
在 Plan 阶段,通常会构建一个 Pipeline(每个 transformer 表示一个数据处理步骤):
graph LR
A[Sourcer] -- output0 --> B[Transformer1]
B -- output1 --> C[Transformer2]
C -- output2 --> D[Transformer3]
D -- output3 --> E[Sinker]
在传统的 pipeline 模型中,所有 transformer(数据处理节点)会被依次连接形成一条流水线,由执行引擎按照顺序串行地处理数据,每个 transformer 负责处理输入并产生输出,最终输出到 sink(终点)。
该模型实现简单、工程复杂度低,但缺点也很突出:由于各个阶段串行执行,难以发挥现代多核 CPU 的并行处理能力,本质上属于 volcano 模型。对于对延迟敏感、操作及时的小型 OLTP 查询尚可满足,但在高吞吐、高并发的 OLAP(分析型)场景下,现有 CPU 资源无法被充分利用,成为性能瓶颈。
如果 transformer1 和 transformer2 之间没有依赖关系,它们其实可以并行执行
flowchart LR
S[Sourcer]
T1[Transformer1]
T2[Transformer2]
T3[Transformer3]
K[Sinker]
S --> T1
S --> T2
T1 --> T3
T2 --> T3
T3 --> K
style S stroke:#ff7f00
style T1 stroke:#ff7f00
style T2 stroke:#ff7f00
style T3 stroke:#ff7f00
style K stroke:#ff7f00
关键问题
- 如何灵活编排 transformer?
- 如何在 transformer 之间同步数据?
- 如何实现并行调度?
Processor 与 DAG 调度器
Transformer 编排
ClickHouse 实现了一系列基础 transformer 模块,例如:
- FilterTransform —— WHERE 过滤
- SortingTransform —— ORDER BY 排序
- LimitByTransform —— LIMIT 限制
示例
SELECT * FROM t1 WHERE id=1 ORDER BY time DESC LIMIT 10
QueryPipeline 的构建过程类似:
QueryPipeline::addSimpleTransform(Source)
QueryPipeline::addSimpleTransform(FilterTransform)
QueryPipeline::addSimpleTransform(SortingTransform)
QueryPipeline::addSimpleTransform(LimitByTransform)
QueryPipeline::addSimpleTransform(Sinker)
Transformer 数据同步
在编排 transformer 的同时,还需要构建底层 DAG 连接关系:
connect(Source.OutPort, FilterTransform.InPort)
connect(FilterTransform.OutPort, SortingTransform.InPort)
connect(SortingTransform.OutPort, LimitByTransform.InPort)
connect(LimitByTransform.OutPort, Sinker.InPort)
这就建立了数据流关系:
- 一个 transformer 的 OutPort 连接到另一个的 InPort
- 类似现实中的管道系统
- 支持一对一、甚至多对多连接
Transformer 执行调度
Pipeline 已经构建完成,但数据如何“流动”?
ClickHouse 为 transformer 定义了一组状态,调度器基于这些状态进行调度:
enum class Status
{
NeedData, // 等待输入数据
PortFull, // 输出端被阻塞
Finished, // 完成
Ready, // 可以执行 work()
Async, // 异步执行
Wait, // 等待异步完成
ExpandPipeline, // 扩展 pipeline(并行拆分)
};
执行逻辑示例:
- Source 产生数据 -> 状态变为
PortFull - FilterTransform 检测到
NeedData-> 拉取数据 - 状态变为
Ready-> 执行work()进行过滤 - 状态不断流转,直到
Finished
特别重要的是:
👉 ExpandPipeline 状态可以将一个 transformer 拆成多个并行执行,实现“爆炸式并行”
示例
SELECT number + 1 FROM t1;
组件:
- Source:生成 {0,1,2,3,4}
- AddTransformer:每个数 +1
- Sink:输出结果
流程:
- Source 生成数据
- AddTransformer 处理数据
- Sink 输出
class MySource : public ISource
{
public:
String getName() const override { return "MySource"; }
MySource(UInt64 end_)
: ISource(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})), end(end_)
{
}
private:
UInt64 end;
bool done = false;
Chunk generate() override
{
if (done)
{
return Chunk();
}
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create());
for (auto i = 0U; i < end; i++)
columns[0]->insert(i);
done = true;
return Chunk(std::move(columns), end);
}
};
class MyAddTransformer : public IProcessor
{
public:
String getName() const override { return "MyAddTransformer"; }
MyAddTransformer()
: IProcessor(
{Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})},
{Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})})
, input(inputs.front())
, output(outputs.front())
{
}
Status prepare() override
{
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
if (has_process_data)
{
output.push(std::move(current_chunk));
has_process_data = false;
}
if (input.isFinished())
{
output.finish();
return Status::Finished;
}
if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_chunk = input.pull(false);
return Status::Ready;
}
void work() override
{
auto num_rows = current_chunk.getNumRows();
auto result_columns = current_chunk.cloneEmptyColumns();
auto columns = current_chunk.detachColumns();
for (auto i = 0U; i < num_rows; i++)
{
auto val = columns[0]->getUInt(i);
result_columns[0]->insert(val+1);
}
current_chunk.setColumns(std::move(result_columns), num_rows);
has_process_data = true;
}
InputPort & getInputPort() { return input; }
OutputPort & getOutputPort() { return output; }
protected:
bool has_input = false;
bool has_process_data = false;
Chunk current_chunk;
InputPort & input;
OutputPort & output;
};
class MySink : public ISink
{
public:
String getName() const override { return "MySinker"; }
MySink() : ISink(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})) { }
private:
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
FormatSettings settings;
void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();
for (size_t row_num = 0; row_num < rows; ++row_num)
{
writeString("prefix-", out);
for (size_t column_num = 0; column_num < columns; ++column_num)
{
if (column_num != 0)
writeChar('\t', out);
getPort()
.getHeader()
.getByPosition(column_num)
.type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
}
writeChar('\n', out);
}
out.next();
}
};
int main(int, char **)
{
auto source0 = std::make_shared<MySource>(5);
auto add0 = std::make_shared<MyAddTransformer>();
auto sinker0 = std::make_shared<MySink>();
/// Connect.
connect(source0->getPort(), add0->getInputPort());
connect(add0->getOutputPort(), sinker0->getPort());
std::vector<ProcessorPtr> processors = {source0, add0, sinker0};
PipelineExecutor executor(processors);
executor.execute(1);
}
总结
核心要点:
- 数据处理单位是 Chunk
- 数据通过 OutPort → InPort 流动
- 基于 DAG 的 pipeline 支持并行执行
- CPU 利用率最大化