以三节点集群执行一条查询语句为样例解析物理计划构建和执行流程,结构如下:
SQL
create table a(a1 int, a2 int, a3 bool);
insert into a values(1,1,false),(2,2,false),(3,3,false);
create table b(b1 int, b2 int, b3 bool);
insert into b values(1,2,false),(3,4,true),(5,6,true);
select * from a join b on a.a1=b.b1;
向右滑动查看

整体流程示意图
▪ Node
该物理计划构建首先会进入 Join 算子的物理算子生成函数 (createPlanNodeforJoin),进行左右两个 scanNode 的物理计划构建,scanNode 是处理表键/值对的扫描,并重组为行的算子。
在构建 scanNode 时,会构建表读取算子 TableReader (createTableReaders),它会确认表数据的分布信息,根据数据分布所在节点创建出对应数量的 TableReader 算子,最终左右的物理计划将各生成的 3个 TableReader 算子加入到 PhysicalPlan 当中,如图所示。

MergePlans 将左右计划进行合并,整体变成一个物理计划,该物理计划中有 6 个 TableReader 算子。

findJoinProcessorNodes 函数决定在几个节点做 Join 操作,这里的 Tablereader 算子是 3 节点,所以 Join 算子也会构建 3 个分别对应 3 个节点,之后在每个指定节点上添加 HashJoiner,并将左边和右边的输出进行连接,生成 3 个 HashJoiner 算子,并将它左右算子的 Output 类型改为 OutputRouterSpec_BY_HASH(执行时需要跨节点 Hash 重分布,下一小节会介绍另一种 Hash 分布方式)类型。

每个 HashJoiner 算子都需要有左右两个 Input,在每个节点上执行时都需要获取到两个表的 TableReader 算子的输出,所以需要调用 MergeResultStreams 方法按照节点数量将 a 表和 b 表的各 3 个 TableReader 连接到各个 HashJoiner 的 Input 左右两端。
如图所示,跨节点 Hash 数据重分布方式 (OutputRouterSpec_BY_HASH) 是要将三个节点上每个 HashJoiner 算子的 Input 每个表的数据做到基本平均分配,例:a表在节点 1 有 3000 条数据,节点 2 有 5000 条数据,节点 3 有 1000条 数据,节点 2需要将数据传输一部分到节点 1 和节点 3,最终三个节点的 HashJoiner 算子的 a 表 Input 都有 3000 条数据。
有向箭头表示数据流 (Stream),Stream 起于一个算子的 Output,终于另一个算子的Input,图中的 Output 类型均为上一小节所说的 OutputRouterSpec_BY_HASH。

另外,除本样例中提到跨节点 Hash 重分布 (OutputRouterSpec_BY_HASH) 分布方式,还有一种镜像 Hash 分布方式 (OutputRouterSpec_MIRROR),此方式会将表数据完全广播到每一个节点上,不需要根据节点数进行数据重分布,此 Hash 分布方式的计划如图所示,具体采用哪种分布方式,需要根据表数据量,节点数等进行数据传播和接受的代价计算,两种方式哪个代价更小就使用哪种分布方式。

createPlanForNode 函数完成后,物理计划的基本结构就生成完毕,但还需要一个最终汇总结果返回的算子,FinalizePlan 函数便会在已有的物理计划基础上增加一个 Noop 算子,用于汇总最终的执行结果,还需要设置每个算子的 Input 和 Output 所关联的 StreamID,以及 Stream 的类型(Stream 如果连接相同节点的算子为 streamEndpointSpec_LOCAL,否则为 streamEndpointSpec_REMOTE)为之后物理计划的执行做准备,此阶段物理计划如图所示。

至此,物理计划构建全部完成,而后将进行物理计划的分布式执行,此模块将在后续博客进行介绍。