最后调用 (DistSQLPlanner).Run 方法执行物理计划。执行引擎采用火山模型(Volcano),每一层执行算子通过调用下一层的 Next 方法获取一条记录。
聚集分为两种情况,具体执行过程如下:
(1)HashAggregater
在 Hash Aggregate 的计算过程中,我们需要维护一个 Hash 表,Hash 表的键为聚合计算的 Group-By 列,若以平均数函数 avg 为例,值为聚合函数的中间结果 sum 和 count。在 Group-By 列 a,求 avg(b) 的例子中,求键为 Group-By 列 b 的值,即 sum(b) 和 count(b)。
计算过程中,只需要根据每行输入数据计算出键,在 Hash 表中找到对应值进行更新即可。
// Next is part of the RowSource interface.
func (ag *hashAggregator) Next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata) {
for ag.State == runbase.StateRunning {
var row sqlbase.EncDatumRow
var meta *distsqlpb.ProducerMetadata
switch ag.runningState {
case aggAccumulating:
ag.runningState, row, meta = ag.accumulateRows()
case aggEmittingRows:
ag.runningState, row, meta = ag.emitRow()
default:
log.Fatalf(ag.Ctx, "unsupported state: %d", ag.runningState)
}
if row == nil && meta == nil {
continue
}
return row, meta
}
return nil, ag.DrainHelper()
}
其中 ag.runningState 为 aggAccumulating:是数据还没有读取完毕,没有算出最终的 agg 结果时的状态,当所有的数据读取完毕后将状态设置为 aggEmittingRows 输出结果。上面的计划就是一个典型的 Hashagg 的例子的逻辑计划。
(2)OrderAggregator 算子
OrderAggregate 的计算需要保证输入数据按照 Group-By 列有序。在计算过程中,每当读到一个新的 Group 的值或所有数据输入完成时,便对前一个 Group 的聚合最终结果进行计算。因为 OrderAggregate 的输入数据需要保证同一个 Group 的数据连续输入,所以 Stream Aggregate 处理完一个 Group 的数据后可以立刻向上返回结果,不用像 HashAggregate 一样需要处理完所有数据后才能正确的对外返回结果。
当上层算子只需要计算部分结果时,比如 Limit,当获取到需要的行数后,可以提前中断 OrderAggregate 后续的无用计算。当 Group-By 列上存在索引时,由索引读入数据可以保证输入数据按照 Group-By 列有序,此时同一个 Group 的数据连续输入 OrderAggregate 算子,可以避免额外的排序操作。如果想要走 Orderagg 则需要将 groupby 列建立索引:create index on nature(b),即可看到计划的转变。

以下是 Hashagg 和 Orderagg 聚集方法类图:
