跳转至

Pipeline 处理器与调度器

本文讨论 ClickHouse 的核心技术:Processor(处理器)和 DAG 调度器。

Pipeline 问题

在传统数据库系统中,一个查询处理流程大致如下:

graph LR
    A[Parse] --> B[Plan]
    B[Plan] --> C[Execute]
    C[Execute] --> D[Storage]

在 Plan 阶段,通常会构建一个 Pipeline(每个 transformer 表示一个数据处理步骤):

graph LR
    A[Sourcer] -- output0 --> B[Transformer1]
    B -- output1 --> C[Transformer2]
    C -- output2 --> D[Transformer3]
    D -- output3 --> E[Sinker]

在传统的 pipeline 模型中,所有 transformer(数据处理节点)会被依次连接形成一条流水线,由执行引擎按照顺序串行地处理数据,每个 transformer 负责处理输入并产生输出,最终输出到 sink(终点)。

该模型实现简单、工程复杂度低,但缺点也很突出:由于各个阶段串行执行,难以发挥现代多核 CPU 的并行处理能力,本质上属于 volcano 模型。对于对延迟敏感、操作及时的小型 OLTP 查询尚可满足,但在高吞吐、高并发的 OLAP(分析型)场景下,现有 CPU 资源无法被充分利用,成为性能瓶颈。

如果 transformer1 和 transformer2 之间没有依赖关系,它们其实可以并行执行

flowchart LR
    S[Sourcer]

    T1[Transformer1]
    T2[Transformer2]
    T3[Transformer3]

    K[Sinker]

    S --> T1
    S --> T2

    T1 --> T3
    T2 --> T3

    T3 --> K

    style S stroke:#ff7f00
    style T1 stroke:#ff7f00
    style T2 stroke:#ff7f00
    style T3 stroke:#ff7f00
    style K stroke:#ff7f00

关键问题

  • 如何灵活编排 transformer?
  • 如何在 transformer 之间同步数据?
  • 如何实现并行调度?

Processor 与 DAG 调度器

Transformer 编排

ClickHouse 实现了一系列基础 transformer 模块,例如:

  • FilterTransform —— WHERE 过滤
  • SortingTransform —— ORDER BY 排序
  • LimitByTransform —— LIMIT 限制

示例

SELECT * FROM t1 WHERE id=1 ORDER BY time DESC LIMIT 10

QueryPipeline 的构建过程类似:

QueryPipeline::addSimpleTransform(Source)
QueryPipeline::addSimpleTransform(FilterTransform)
QueryPipeline::addSimpleTransform(SortingTransform)
QueryPipeline::addSimpleTransform(LimitByTransform)
QueryPipeline::addSimpleTransform(Sinker)

Transformer 数据同步

在编排 transformer 的同时,还需要构建底层 DAG 连接关系:

connect(Source.OutPort, FilterTransform.InPort)
connect(FilterTransform.OutPort, SortingTransform.InPort)
connect(SortingTransform.OutPort, LimitByTransform.InPort)
connect(LimitByTransform.OutPort, Sinker.InPort)

这就建立了数据流关系:

  • 一个 transformer 的 OutPort 连接到另一个的 InPort
  • 类似现实中的管道系统
  • 支持一对一、甚至多对多连接

Transformer 执行调度

Pipeline 已经构建完成,但数据如何“流动”?

ClickHouse 为 transformer 定义了一组状态,调度器基于这些状态进行调度:

enum class Status
{
    NeedData,   // 等待输入数据
    PortFull,   // 输出端被阻塞
    Finished,   // 完成
    Ready,      // 可以执行 work()
    Async,      // 异步执行
    Wait,       // 等待异步完成
    ExpandPipeline, // 扩展 pipeline(并行拆分)
};

执行逻辑示例:

  • Source 产生数据 -> 状态变为 PortFull
  • FilterTransform 检测到 NeedData -> 拉取数据
  • 状态变为 Ready -> 执行 work() 进行过滤
  • 状态不断流转,直到 Finished

特别重要的是:

👉 ExpandPipeline 状态可以将一个 transformer 拆成多个并行执行,实现“爆炸式并行”

示例

SELECT number + 1 FROM t1;

组件:

  • Source:生成 {0,1,2,3,4}
  • AddTransformer:每个数 +1
  • Sink:输出结果

流程:

  • Source 生成数据
  • AddTransformer 处理数据
  • Sink 输出
class MySource : public ISource
{
public:
    String getName() const override { return "MySource"; }

    MySource(UInt64 end_)
        : ISource(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})), end(end_)
    {
    }

private:
    UInt64 end;
    bool done = false;

    Chunk generate() override
    {
        if (done)
        {
            return Chunk();
        }
        MutableColumns columns;
        columns.emplace_back(ColumnUInt64::create());
        for (auto i = 0U; i < end; i++)
            columns[0]->insert(i);

        done = true;
        return Chunk(std::move(columns), end);
    }
};
class MyAddTransformer : public IProcessor
{
public:
    String getName() const override { return "MyAddTransformer"; }

    MyAddTransformer()
        : IProcessor(
            {Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})},
            {Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})})
        , input(inputs.front())
        , output(outputs.front())
    {
    }

    Status prepare() override
    {
        if (output.isFinished())
        {
            input.close();
            return Status::Finished;
        }

        if (!output.canPush())
        {
            input.setNotNeeded();
            return Status::PortFull;
        }

        if (has_process_data)
        {
            output.push(std::move(current_chunk));
            has_process_data = false;
        }

        if (input.isFinished())
        {
            output.finish();
            return Status::Finished;
        }

        if (!input.hasData())
        {
            input.setNeeded();
            return Status::NeedData;
        }
        current_chunk = input.pull(false);
        return Status::Ready;
    }

    void work() override
    {
        auto num_rows = current_chunk.getNumRows();
        auto result_columns = current_chunk.cloneEmptyColumns();
        auto columns = current_chunk.detachColumns();
        for (auto i = 0U; i < num_rows; i++)
        {
            auto val = columns[0]->getUInt(i);
            result_columns[0]->insert(val+1);
        }
        current_chunk.setColumns(std::move(result_columns), num_rows);
        has_process_data = true;
    }

    InputPort & getInputPort() { return input; }
    OutputPort & getOutputPort() { return output; }

protected:
    bool has_input = false;
    bool has_process_data = false;
    Chunk current_chunk;
    InputPort & input;
    OutputPort & output;
};
class MySink : public ISink
{
public:
    String getName() const override { return "MySinker"; }

    MySink() : ISink(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})) { }

private:
    WriteBufferFromFileDescriptor out{STDOUT_FILENO};
    FormatSettings settings;

    void consume(Chunk chunk) override
    {
        size_t rows = chunk.getNumRows();
        size_t columns = chunk.getNumColumns();

        for (size_t row_num = 0; row_num < rows; ++row_num)
        {
            writeString("prefix-", out);
            for (size_t column_num = 0; column_num < columns; ++column_num)
            {
                if (column_num != 0)
                    writeChar('\t', out);
                getPort()
                    .getHeader()
                    .getByPosition(column_num)
                    .type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
            }
            writeChar('\n', out);
        }

        out.next();
    }
};

int main(int, char **)
{
    auto source0 = std::make_shared<MySource>(5);
    auto add0 = std::make_shared<MyAddTransformer>();
    auto sinker0 = std::make_shared<MySink>();

    /// Connect.
    connect(source0->getPort(), add0->getInputPort());
    connect(add0->getOutputPort(), sinker0->getPort());

    std::vector<ProcessorPtr> processors = {source0, add0, sinker0};
    PipelineExecutor executor(processors);
    executor.execute(1);
}

总结

核心要点:

  • 数据处理单位是 Chunk
  • 数据通过 OutPort → InPort 流动
  • 基于 DAG 的 pipeline 支持并行执行
  • CPU 利用率最大化