KaiwuDBkaiwudb logo

KaiwuDB 技术博客专区

深度解读 KaiwuDB 的排序操作

2023-06-02

标签: 排序操作





一、单节点执行





在单节点环境执行一条简单的 SQL 语句 SELECT * FROM NATION ORDER BY N_NAME。NATION 是一张小表,只有 25 条记录;对第 2 列 N_NAME 进行升序排列。


1. 抽象语法树


上述示例中的 SQL 语句经过分析器解析后得到 AST,如下图所示:


1.png


2. 逻辑计划


将 AST 转换成一个树状结构的 Plan,称之为逻辑查询计划。抽象语法树中的每一个语法元素都被转换成一个查询逻辑单元,例如 scanNode, sortNode, joinNode 等。


逻辑计划可以通过一系列规则进行优化,称之为 RBO(Rule Base Optimization)。


举一个简单的例子,SQL 语句 SELECT * FROM t WHERE a + 1 > 4 通过规则改写可以转换为 SELECT * FROM t WHERE a > 3 。从数据库的计划角度,两者有很大差别。


前者只能扫描全表,每次读取一条记录并计算表达式判断是否符合过滤条件;后者可以利用 a 列索引信息减少扫描范围,即使没有索引也不需要每次进行表达式计算。


例子中的逻辑计划很简单,就是扫描节点 Scan 和排序节点 Sort。命令 Explain SELECT * FROM NATION ORDER BY N_NAME 显示如下:


2.jpg


3. 物理计划


(DistSQLPlanner).PlanAndRun 方法把逻辑计划转换为物理计划,其中递归调用 createPlanForNode 方法生成各个物理算子,交给执行器具体执行。


生成物理计划也是一个优化过程,称之为 CBO(Cost Base Optimization)。例如逻辑计划的表连接,在具体实现时可以交换顺序,也可以有不同的实现方法(Nest Loop Join,Sort Merge Join和Hash Join)。


此时就需要通过代价模型(Cost Model)在巨大的搜索空间中寻找一个合理的物理执行计划。这是一个 NP 问题,所以一般只能找到一个相对最优解。


此外,KaiwuDB 根据底层 KV 数据的分布和预估返回数据集的大小,决定是否需要生成分布式执行计划;这个例子是一个本地执行的物理计划。


逻辑计划节点和物理计划节点并不是一一对应关系,但是这个例子中,逻辑计划中的 Scan 和 Sort 分别对应物理计划中的 TableReader 算子和 Sorter 算子。


4. 执行引擎


最后调用(DistSQLPlanner).Run 方法执行物理计划。执行引擎采用火山模型(Volcano),每一层执行算子通过调用下一层的 Next 方法获取一条记录。执行伪代码如下:


4.png


5.排序分析


下面具体分析排序算子操作,首先了解一下数据在各个阶段的存储格式。


(1)数据格式


5.png


在 KV 存储引擎中,数据的存储格式如下:


6.png

*注:Column ID Diff 表示跟前一个列 ID 的差值


TableReader 通过 KV 存储接口,读取一条数据后返回一个 EncDatumRow 对象,里面添加了 KaiwuDB 的编码信息。


7.png


Sorter 算子的 fill 方法将 EncDatumRow 对象中的数据解码后加入 MemRowContainer 对象的 chunks 里,用于后续的排序。


chunks 是一个二维数组[][]Datum,Datum 是一个接口,代表 SQL 中的值,具体的实现类包括 DBool, DInt, DString 等。KaiwuDB 中尽量使用原生数据类型,比如 DBool/DInt/DString 分别被定义成 bool/int64/string,并实现了 Datum 接口方法;而 DDecimal 就是一个自定义结构体。

func (s *sortAllProcessor) fill() (ok bool, _ error) {
    ctx := s.EvalCtx.Ctx()    
    for { 
        //input是一个RowChannel对象,Next获取下一条数据
        row, meta := s.input.Next()  
        ......    
        
        // rows是一个MemRowContainer对象
        if err := s.rows.AddRow(ctx, row); err != nil { 
            return false, err 
        }
    } 
    s.rows.Sort(ctx)
    
    s.i = s.rows.NewFinalIterator(ctx)
    s.i.Rewind() 
    return true, nil
}


直觉上 []Datum 应该对应表里的一行数据,但是为了减少 golang 里切片扩容带来的影响,KaiwuDB 将 64 行数据打包放在了一个[]Datum 里。Nation 表的数据在 chunks 中打印显示如下:


8.png


内存中的 chunks 结构:


9.png

UML 类图


10.png


(2)排序方法


最常用的排序执行算子叫做 sortAllProcessorSorter,将全部待排序结果读入后(内存或磁盘),进行一次排序输出最终结果。调用时序图如下:


11.png


MemRowContainer 的 Sort 方法实现了内存中对 chunks 数组的排序:

 · 数组长度小于等于 12 时,采用插入排序;

 · 快速排序,递归最大深度 2*ceil(lg(n+1));

 · 递归到了最大深度,采用堆排序;


// At accesses a row at a specific index.
func (c *RowContainer) At(i int) tree.Datums {
    // This is a hot-path: do not add additional checks here.    
    chunk, pos := c.getChunkAndPos(i)        
    return c.chunks[chunk][pos : pos+c.numCols : pos+c.numCols]
}

// Sort sorts data.
// It makes one call to data.Len to determine n, and O(n*log(n)) calls to
// data.Less and data.Swap. The sort is not guaranteed to be stable.
func Sort(data sort.Interface, cancelChecker *CancelChecker) {
   n := data.Len()
   quickSort(data, 0, n, maxDepth(n), cancelChecker)
}

// Sort is part of the SortableRowContainer interface.
func (mc *MemRowContainer) Sort(ctx context.Context) {
   mc.invertSorting = false        
   cancelChecker := sqlbase.NewCancelChecker(ctx)        
   sqlbase.Sort(mc, cancelChecker)}

func quickSort(data sort.Interface, a, b, maxDepth int, cancelChecker *CancelChecker) {
   for b-a > 12 { // Use ShellSort for slices <= 12 elements
       if maxDepth == 0 {
            heapSort(data, a, b, cancelChecker)
            return                
        }                
        maxDepth--                
        // Short-circuit sort if necessary.                
        if cancelChecker.Check() != nil {                        
            return                
        }              
        mlo, mhi := doPivot(data, a, b)                
        // Avoiding recursion on the larger subproblem guarantees
        // a stack depth of at most lg(b-a).                
        if mlo-a < b-mhi {                        
            quickSort(data, a, mlo, maxDepth, cancelChecker)
            a = mhi // i.e., quickSort(data, mhi, b)
        } else {
            quickSort(data, mhi, b, maxDepth, cancelChecker)
            b = mlo // i.e., quickSort(data, a, mlo)
        }      
     }        
     if b-a > 1 {                
         // Do ShellSort pass with gap 6                
         // It could be written in this simplified form cause b-a <= 12
         for i := a + 6; i < b; i++ {                        
             if data.Less(i, i-6) {                                
                 data.Swap(i, i-6)                        
             }                
         }                
         insertionSort(data, a, b)        
      }
   }


如果需要排序的数据超出了阈值(WorkMemBytes,默认 64MB),会调用 spillToDisk 方法将 MemRowContainer 中的数据写入 DiskRowContainer 中,后者将 OrderBy 列的信息作为 Key 值写入 KV 存储引擎。


此外为了提高 KV 写入速度,DiskRowContainer 不会每次只写一条数据,而是有一个 buffer 负责累积一批键值对,然后一起写入。


其他的两种排序执行算子包括:

 · sortTopkProcessor: Limit 下推到 Sorter 时,算子只分配 N 行的排序空间,然后进行堆排序;


 · sortChunksProcessor: 多列排序时,如果前 i 列 (0 < i < N) 已经有序,算子逐行读入输入数据,直到前 i 列出现不同值;重复对读入的批次进行排序。处理完数据集后前 i+1 列已经有序,迭代前面的步骤对后续列进行排序,直到结果集多列排序完成。



二、多节点执行





在三节点环境上执行 SQL 语句 SELECT * FROM LINEITEM ORDER BY L_SHIPDATE。LINEITEM 是一张大表,有约 6000 万条数据;对第 11 列 L_SHIPDATE 进行升序排列。


1. 逻辑计划


逻辑计划和单节点环境的计划相似。


12.jpg


2. 物理计划


KaiwuDB 在多节点环境中将数据分片放在不同的节点中,每个分片数据有多个备份。示意图如下:


13.png


分布式执行计划根据 Span 信息分为多个 tableReader 算子在多个节点上执行;AddNoGroupingStage 方法将 sorter 算子加入到所有的 tableReader 算子之后,对各个节点上的分片数据进行排序。


最后 FinalizePlan 方法会增加一个 No-op 算子,归并汇总最终的结果集。生成的分布式物理计划如下:


14.png

免费体验 KaiwuDB 全新功能

立即体验

关于我们
联系我们

KaiwuDB B站

KaiwuDB
B站

KaiwuDB 微信公众号

KaiwuDB
微信公众号

© 上海沄熹科技有限公司 Shanghai Yunxi Technology Co., Ltd.    沪ICP备2023002175号-1
400-624-5688-7
1V1 方案咨询
marketing@kaiwudb.org.cn