本文首要介绍了数据库系统中常用的算子 Join 和 Aggregation 在 TiFlash 中的履行状况,包含查询方案生成、编译阶段与履行阶段,以希望读者对 TiFlash 的算子有初步的了解。
视频
www.bilibili.com/video/BV1tt…
算子概要
在阅览本文之前,推荐阅览本系列的前作:核算层 overview,以对 TiFlash 核算层、MPP 框架有必定了解。
在数据库系统中,算子是履行 SQL 首要逻辑的地方。一条 SQL 会被 parser 解析为一棵算子树(查询方案),然后经过 optimizer 的优化,再交给对应的 executor 履行,如下图所示。
本文的首要内容包含
- TiDB 怎么生成与优化 MPP 算子与查询方案
- Join 算子在 TiFlash 中的编译(编译指的是将 TiDB-server 下发的履行方案片段生成可履行结构的过程,下同)与履行
- Aggregation 算子在 TiFlash 中的编译与履行
构建查询方案
一些背景知识:
- 逻辑方案与物理方案:可以简单理解为逻辑方案是指算子要做什么,物理方案是指算子怎样去做这件事。比方,“将数据从表 a 和表 b 中读取出来,然后做 join”描绘的是逻辑方案;而“在 TiFlash 中做 shuffle hash join” 描绘的是物理方案。更多信息可以参阅:TiDB 源码阅览系列文章
- MPP:大规模并行核算,一般用来描绘节点间可以交流数据的并行核算,在当时版别(6.1.0,下同)的 TiDB 中,MPP 运算都发生在 TiFlash 节点上。推荐观看:源码解读 – TiFlash 核算层 overview。MPP 是物理方案级别的概念。
MPP 方案
在 TiDB 中,可以在 SQL 前加上 explain 来检查这条 SQL 的查询方案,如下图所示,是一棵由物理算子组成的树,可以检查 TiDB 履行方案概览 来对其有更多的了解。
MPP 查询方案的共同之处在于查询方案中多出了用于进行数据交流的 ExchangeSender 和 ExchangeReceiver 算子。
履行方案中会有这样的 pattern,代表将会在此处进行数据传输与交流。
...
|_ExchangeReceiver_xx
|_ ExchangeSender_xx
…
每个 ExchangeSender 都会有一个 ExchangeType,来标识本次数据传输的类别,包含:
- HashPartition,将数据按 Hash 值进行分区之后分发到上游节点。
- Broadcast,将自身数据拷贝若干份,播送到一切上游节点中。
- PassThrough,将自己的数据悉数传给一个指定节点,此刻接纳方可以是 TiFlash 节点(ExchangeReceiver);也可以是 TiDB-server 节点(TableReader),代表 MPP 运算结束,向 TiDB-server 回来数据。
在上面的查询方案图中,一共有三个 ExchangeSender,id 别离是 19, 13 和 17。其间 ExchangeSender_13 和 ExchangeSender_17 都是将读入后的数据按哈希值 shuffle 到一切节点中,以便进行 join,而 ExchangeSender_19 则是将 join 完结后的数据回来到 TiDB-server 节点中。
增加 Exchange
在优化器的方案探索过程中,会有两处为查询方案树刺进 Exchange 算子:
- 一个是 MPP 方案在探索结束后,接入 TiDB 的 tableReader 时。类型为 passThrough type. 源码在函数
func (t *mppTask) convertToRootTaskImpl
中 - 一个是 MPP 方案在探索过程中,发现当时算子的 property(这儿首要指分区特点)不满意上层要求时。例如上层要求需求按 a 列的 hash 值分区,但是基层算子不能满意这个要求,就会刺进一组 Exchange.
func (t *mppTask) enforceExchanger(prop *property.PhysicalProperty) *mppTask {
if !t.needEnforceExchanger(prop) {
return t
}
return t.copy().(*mppTask).enforceExchangerImpl(prop)
}
// t.partTp 表明当时算子已有的 partition type,prop 表明父算子要求的 partition type
func (t *mppTask) needEnforceExchanger(prop *property.PhysicalProperty) bool {
switch prop.MPPPartitionTp {
case property.AnyType:
return false
case property.BroadcastType:
return true
case property.SinglePartitionType:
return t.partTp != property.SinglePartitionType
default:
if t.partTp != property.HashType {
return true
}
if len(prop.MPPPartitionCols) != len(t.hashCols) {
return true
}
for i, col := range prop.MPPPartitionCols {
if !col.Equal(t.hashCols[i]) {
return true
}
}
return false
}
}
Property 关于分区特点的要求(MPPPartitionTp)有以下几种:
- AnyType,对基层算子没有要求,所以并不需求增加 exchange;
- BroadcastType,用于 broadcast join,要求基层节点仿制数据并播送到一切节点中,此刻必定需求增加一个 broadcast exchange;
- SinglePartitionType,要求基层节点将数据汇总到同一台节点中,此刻假如现已在同一台节点上,则不必再进行 exchange。
- HashType,要求基层节点按特定列的哈希值进行分区,假如现已按要求分好区了,则不必再进行 exchange.
在优化器的生成查询方案的探索中,每个算子都会对基层有 property 要求,一起也需求满意上层传下来的 property;当上下两层的 property 无法匹配时,就刺进一个 exchange 算子交流数据。依靠这些 property,可以不重不漏的刺进 exchange 算子。
MPP 算法
是否挑选 MPP 算法是在 TiDB 优化器生成物理方案时决议,即 CBO(Cost-Based Optimization) 阶段。优化器会遍历一切可挑选的方案路径,包含含有 MPP 算法的方案与不含有 MPP 算法的方案,估量它们的价值,并挑选其间总价值最小的一个查询方案。
关于当时的 TiDB repo 代码,有四个位置可以触发 MPP 方案的生成,别离对应于 join、agg、window function、projection 四个算子:
- func (p *LogicalJoin) tryToGetMppHashJoin
- func (la *LogicalAggregation) tryToGetMppHashAggs
- func (lw *LogicalWindow) tryToGetMppWindows
- func (p *LogicalProjection) exhaustPhysicalPlans
这儿只描绘具有代表性的 join 和 agg 算子,其他算子同理。
Join
当时 TiDB 支持两种 MPP Join 算法,别离是:
- Shuffle Hash Join,将两张表的数据各自按 hash key 分区后 shuffle 到各个节点上,然后做 hash join,如上一节中举出的查询方案图所示。
- Broadcast Join,将小表播送到大表所在的每个节点,然后做 hash join,如下图所示。
tryToGetMppHashJoin 函数在构建 join 算子时给出了对子算子的 property 要求:
if useBCJ { // broadcastJoin
…
childrenProps[buildside] = {MPPPartitionTp: BroadcastType}
childrenProps[1-buildside] = {MPPPartitionTp: AnyType}
…
} else { // shuffle hash join
…
childrenProps[0] = {MPPPartitionTp: HashType, key: leftKeys}
childrenProps[1] = {MPPPartitionTp: HashType, key: rightKeys}
…
}
如代码所示,broadcast join 要求 buildside(这儿指要播送的小表)具有一个 BroadcastType 的 property,对大表侧则没有要求。而 shuffle hash join 则要求两边都具有 HashType 的分区特点,分区列别离是 left keys 和 right keys。
Aggregation
当时 tryToGetMppHashAggs 或许生成三种 MPP Aggregation 方案:
1.“一阶段 agg”,要求数据先按 group by key 分区,然后再进行聚合。
2.“两阶段 agg”,首先在本地节点进行第一阶段聚合,然后按 group by key 分区,再进行一次聚合(用 sum 汇总成果)。
3.“scalar agg”,没有分区列的特定状况,在本地节点进行第一阶段聚合,然后汇总到同一台节点上完结第二阶段聚合。
一阶段 agg 和两阶段 agg 的区别是是否先在本地节点做一次预聚合,优化器会依据 SQL 与价值估算来挑选履行哪种方式。关于重复值许多的状况,两阶段 agg 可以在网络传输前减少许多数据量,从而减少大量的网络耗费;而假如重复值很少的状况下,这次预聚合并不会减少许多数据量,反而白白增大了 cpu 与内存耗费,此刻就不如运用一阶段 agg。
这儿留一个小思考题,这三种 agg 各自对下方有什么 property 要求?在聚合做完之后又满意了怎样的 property?
答案是:
一阶段 agg 要求 hash,做完满意 hash;二阶段 agg 无要求,做完满意 hash;scalar agg 无要求,做完满意 singlePartition.
编译与履行
履行方案构建好之后,TiDB-server 会将 dag(履行方案的片段)下发给对应的 TiFlash 节点。在 TiFlash 节点中,需求首先解析这些履行方案,这个过程咱们称作“编译”,编译的成果是 BlockInputStream,它是 TiFlash 中的可履行结构;而最终一步就是在 TiFlash 中履行这些 BlockInputStream.
下图是一个 BlockInputStream DAG 的比方,每个 BlockInputStream 都有三个办法:readPrefix, read 和 readSuffix;类似于其他火山模型调用 open、next 和 close。
下图的来源是 TiFlash 履行器线程模型 – 知乎专栏 (zhihu.com),关于履行模型更多的内容,可以参阅这篇文章或许 TiFlash Overview,这儿不再赘述。
Join 的编译与履行
TiDB-server 节点会将查询方案按 Exchange 会作为分界,将查询切分为不同的方案片段(task),作为 dag 发给 TiFlash 节点。比方关于下图中所示的查询方案,会切分为这三个红框。
TiFlash 节点在编译完结后生成的 BlockInputStream 如下,可以在 debug 日志中看到:
task 1
ExchangeSender
Expression: <final projection>
Expression: <projection after push down filter>
Filter: <push down filter>
DeltaMergeSegmentThread
task 2
ExchangeSender
Expression: <final projection>
Expression: <projection after push down filter>
Filter: <push down filter>
DeltaMergeSegmentThread
task 3
CreatingSets
Union: <for join>
HashJoinBuildBlockInputStream x 20: <join build, build_side_root_executor_id = ExchangeReceiver_15>, join_kind = Inner
Expression: <append join key and join filters for build side>
Expression: <final projection>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
Union: <for mpp>
ExchangeSender x 20
Expression: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = HashJoin_34>
Expression: <final projection>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
其间 task1 和 task2 是将数据从存储层读出,经过简单的处理之后,发给 ExchangeSender. 在 task3 中,有三个 BlockInpuStream 值得重视,别离是:CreatingSets, HashJoinBuild, HashJoinProbe.
CreatingSetsBlockInputStream
接受一个数据 BlockInputStream 表明 joinProbe,还有若干个代表 JoinBuild 的 Subquery。CreatingSets 会并发发动这些 Subquery, 等待他们履行结束后在开端发动数据 InputStream. 下面两张图别离是 CreatingSets 的 readPrefix 和 read 函数的调用栈。
为什么 CreatingSets 或许一起创立多张哈希表?因为在一个多表 join 中,同一个方案片段或许紧接着做屡次 join porbe,如下图所示:
task:4
CreatingSets
Union x 2: <for join>
HashJoinBuildBlockInputStream x 20: <join build, build_side_root_executor_id = ExchangeReceiver_22>, join_kind = Left
Expression: <append join key and join filters for build side>
Expression: <final projection>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
Union: <for mpp>
ExchangeSender x 20
Expression: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = HashJoin_50>
Expression: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = HashJoin_14>
Expression: <final projection>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
Join Build
留意,join 在此处仅代表 hash join,现已与网络通信和 MPP 级别的算法无关。
关于 join 的代码都在 dbms/src/Interpreters/Join.cpp 中;咱们以下面两张表进行 join 为例来阐明:
left_table l join right_table r
on l.join_key=r.join_key
where l.b>=r.c
默许右表做 build 端,左表做 probe 端。哈希表的值运用链式存储:
Join Probe
这儿首要描绘的是 JoinBlockImpl 这个函数的流程:
1.block 包含了左表的内容;创立 added_columns, 即要增加到 block 中的右表的列;然后创立相应的过滤器 replicate_offsets:表明当时共匹配了几行,之后可以用于挑选未匹配上的行,或仿制匹配了多行的行。
2.依次查找哈希表,依据查找成果调用相应的 addFound 或 addNotFound 函数,填充 added_columns 和过滤器。
从填充的过程中也可以看到,replicate_offsets 左表表明到当时行为止,一共能匹配上的右表的行数。并且 replicate_offsets[i] – replicate_offsets[i-1] 就表明左表第 i 行匹配到的右表的行数。
3.将 added_column 直接拼接到 block 上,此刻会有短暂的 block 行数不一致。
4.依据过滤器的内容,仿制或过滤掉原先左表中的行。
5.最终在 block 上处理 other condition,则得到了 join 的成果。
上文中描绘的是关于正常的 “all” join 的状况,需求回来左右表的数据。与之相对的则是 “any” join,表明半衔接,无需回来右表,只需回来左表的数据,则无需运用 replicate_offsets 这个辅助数组,读者可以自行阅览代码。 仍然在 dbms/src/intepreters/Join.cpp 中。
Aggregation 的编译与履行
还是以一个查询方案以及对应的 BlockInputStream 为例:
task:1
ExchangeSender
Expression: <final projection>
Expression: <before order and select>
Aggregating
Concat
Expression: <before aggregation>
Expression: <projection>
Expression: <before projection>
Expression: <final projection>
DeltaMergeSegmentThread
task:2
Union: <for mpp>
ExchangeSender x 20
Expression: <final projection>
Expression: <projection>
Expression: <before projection>
Expression: <final projection>
SharedQuery: <restore concurrency>
ParallelAggregating, max_threads: 20, final: true
Expression x 20: <before aggregation>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Int64>, <exchange_receiver_1, Nullable(Int64)>}
从查询方案中可以看到这是一个两阶段 agg,第一阶段对应 task1,履行聚合的 BlockInputStream 是 Aggregating。第二阶段对应 task2,履行聚合的 BlockInputStream 是 ParallelAggragating。两个 task 经过 Exchange 进行网络数据传输。
在 aggregation 的编译期,会检查当时 pipeline 可以提供的并行度,假如只要 1,则运用 AggregatingBlockInputStream 单线程履行,假如大于 1 则运用 ParallelAggragating 并行履行。
DAGQueryBlockInterpreter::executeAggregation(){
if (pipeline.streams.size() > 1){
ParallelAggregatingBlockInputStream
}else {
AggregatingBlockInputStream
}
}
AggregatingBlockInputStream 的调用栈如下:
ParallelAggregatingBlockInputStream 内部会分两阶段操作(这儿的两阶段是内部履行中的概念,发生在同一台节点上,和查询方案中的两阶段不是一个概念)。partial 阶段别离在 N 个线程构建 HashTable,merge 阶段则将 N 个 HashTable 合并起来,对外输出一个流。调用栈如下:
假如 result 是空,那么会单独调用一次 executeOnBlock 办法,来生成一个默许数据,类似于 count() 没有输入时,会回来一个 0.
两种履行方式都用到了 Aggregator 的 executeOnBlock 办法和 mergeAndConvertToBlocks 办法,他们的调用栈如图所示。前者是实践履行聚合函数的地方,会调用聚合函数的 add 办法,将数据值加入;后者的首要目的是将 ParallelAggregating 并行生成的哈希表合并。