1. 概述
本文将首要介绍 Spark AQE SkewedJoin 的基本原理以及字节跳动在运用 AQE SkewedJoin 的实践中遇到的一些问题;其次介绍针对遇到的问题所做的相关优化和功用增强,以及相关优化在字节跳动的收益;此外,咱们还将共享 SkewedJoin 的运用经历。
2. 背景
首要对 Spark AQE SkewedJoin 做一个简略的介绍。Spark Adaptive Query Execution, 简称 Spark AQE,整体思维是动态优化和修正 stage 的物理履行计划。利用履行完毕的上游 stage 的核算信息(主要是数据量和记载数),来优化下流 stage 的物理履行计划。
Spark AQE 能够在 stage 提交履行之前,根据上游 stage 的一切 MapTask 的核算信息,核算得到下流每个 ReduceTask 的 shuffle 输入,因而 Spark AQE 能够自动发现产生数据歪斜的 Join,并且做出优化处理,该功用便是 Spark AQE SkewedJoin。
例如 A 表 inner join B 表,并且 A 表中第 0 个 partition(A0)是一个歪斜的 partition,正常状况下,A0 会和 B 表的第 0 个 partition(B0)产生 join,由于此刻 A0 歪斜,task 0 就会成为长尾 task。
SkewedJoin 在履行 A Join B 之前,经过上游 stage 的核算信息,发现 partition A0 明显超过均匀值的数倍,即判别 A Join B 产生了数据歪斜,且歪斜分区为 partition A0。Spark AQE 会将 A0 的数据拆成 N 份,运用 N 个 task 去向理该 partition,每个 task 只读取若干个 MapTask 的 shuffle 输出文件,如下图所示,A0-0 只会读取 Stage0#MapTask0 中归于 A0 的数据。这 N 个 Task 然后都读取 B 表 partition 0 的数据做 join。这 N 个 task 履行的成果和 A 表的 A0 join B0 的成果是等价的。
不难看出,在这样的处理中,B 表的 partition 0 会被读取 N 次,尽管这添加了必定的额定成本,可是经过 N 个任务处理歪斜数据带来的收益依然大于这样的成本。
Spark 从3.0 版本开端支撑了 AQE SkewedJoin 功用,可是咱们在实践中发现了一些问题。
- 不精确的核算数据或许导致 Spark 无法辨认数据歪斜。
- 切分不均匀导致优化处理作用不抱负。
- 不支撑复杂场景例如同一个字段产生接连 join。
我将在【优化增强】中胪陈这些问题以及咱们的优化和解决方案。
3. 优化增强
3.1 进步数据歪斜的辨认才能
由 Spark AQE 处理数据歪斜的原理不难发现,Spark AQE 辨认歪斜以及切分数据歪斜的功用依赖于上游 Stage 的核算数据,核算数据越精确,歪斜的辨认才能和处理才能就越高,直观体现便是歪斜数据被拆分的十分均匀,拆分后的数据巨细简直和中位数一致,将长尾Task的影响降到最低。
MapStage 履行完毕之后,每一个 MapTask 会生成核算成果 MapStatus,并将其发送给 Driver。MapStatus维护了一个 Array[Long],记载了该 MapTask 中归于下流每一个 ReduceTask 的数据巨细。当 Driver 收集到了一切的 MapTask 的MapStatu之后,就能够核算得到每一个 ReduceTask 的输入数据量,以及分归于每一个上游 MapTask 的数据巨细。根据每一个 ReduceTask 的数据巨细,Spark AQE 能够判别出数据歪斜,并根据上游 MapTask 的核算信息,合理切分 Reducetask,尽或许确保切分的均匀性。
如下图描绘,ReduceTask0 的 ShuffleRead(shuffle 过程中读取的数据量) 为 200,明显大于 ReduceTask1 和 ReduceTask2 的 100,产生了数据歪斜。咱们能够将 ReduceTask0 拆成 2 份,ReduceTask0-0 读取 MapTask0 和 MapTask1 的数据,ReduceTask0-1 读取 MapTask2 和 MapTask3 的数据,拆分后的两个 task 的 ShuffleRead 均为 100。
咱们能够看出,核算信息的巨细的空间复杂度是 O(M*R),关于大任务而言,会占有大量的 Driver 内存,所以 Spark 原生做了限制,关于 MapTask,当下流 ReduceTask 个数大于某一阈值(spark.shuffle.minNumPartitionsToHighlyCompress
,默许 2000),就会将MapStatus进行紧缩,一切小于spark.shuffle.accurateBlockThreshold
(默许100M)的值都会被一个均匀值所替代填充。
举个比方,下图是咱们遇到的一个 SkewedJoin 没有收效的作业,从运行 metrics 来看,ShuffleRead 产生了很严重的歪斜,契合 SkewedJoin 收效的场景,但实践运行时并没有收效。
经过阅览日志,能够看到,Spark AQE 在运行时,获取的 join 两侧的 shuffle partitions 的中位数和最大值都是相同的,所以没有辨认到任何的歪斜。这便是由于紧缩后 MapStatus 的核算数据的不精确造成的。
咱们在实践中,遇到许多大作业由于核算数据不精确,无法辨认歪斜。而当咱们尝试进步这一阈值之后,部分大作业由于 Driver 内存运用上涨而失利,为了解决这一问题,咱们做了以下优化:
- Driver 收到详细的 MapStatus之后,先将数据用于更新每个 ReduceTask 的累计输入数据,然后将 MapStatus紧缩,这样就不会占用太多内存。此刻,尽管紧缩后的 MapStatus无法让咱们取得 ReduceTask 精确的上游散布,可是能够取得精确的 ReduceTask 的输入数据总巨细,这样咱们就能够精确的辨认产生歪斜的 ReduceTask。
- 上述优化添加了一次 MapStatus 的解压操作,而 MapStatus 的解压是一个比较耗CPU的操作,关于大作业或许出现 Driver CPU 被打满,无法处理 Executor 心跳导致作业失利的状况。对此,咱们运用缓存确保Driver端在消费 MapStatus 时,每个 MapStatus 只会被解压一次,大大降低了优化带来的 Overhead。
经过上述优化,咱们成功在线上将默许阈值从 2000 调整为 5000,确保了线上 96.6% 的 Spark 作业能够精确的辨认数据歪斜(假如存在)。
3.2 进步歪斜数据的切分均匀程度
由于 HighlyCompressMapStatus 用均匀值填充一切低于 spark.shuffle.accurateBlockThreshold 的值,每个 ReduceTask 经过紧缩后的 MapStatus 累加核算得到的总数据巨细和数据散布,就和实践差距很大。
举个简略的比方:咱们得到 ReduceTask0 的实践总数据是 1G,而中位数是 100M,因而咱们的期望是将 ReduceTask0 拆成 10 份,每一份是 100M。此刻上游的 MapStage 一共有 100 个 MapTask,除了 MapTask0 中归于 ReduceTask0 的数据是 100M,其他 99 个 MapStak 的数据都是 10M。当咱们将一切的 MapStatus 紧缩之后,AQE 获取的 ReduceTask0 的上游散布,便是 MapTask0 有 100M (由于大块数据所以被保存),其他 99 个 MapTask 的数据都是 1M(在紧缩时运用均匀值填充)。这时,Spark AQE 依照 100M 的期望值来切分,只会切分红两个 ReduceTask:ReduceTask0-0(读取MapTask0)和 ReduceTask0-1(读取剩下99个MapTask)。
基于此,咱们改善后的方法是利用精确的 ReduceTask 数据量来反推每个 MapperTask 对应的数据量,得到尽或许精确的数据散布。同样是方才的比方,咱们已知 ReduceTask0 的实践总数据是 1G,MapStatus 紧缩的阈值是 100M,那么能够确定的是,MapTask0 关于 ReduceTask0 的数据 100M 是精确被保存的(由于大于等于阈值),而其他 99 个 MapTask 的数据都是不精确的。此刻 AQE 就不会运用被紧缩的数据,而是经过 1G 的总数据反推得到其他 99 个 MapTask 中归于 ReduceTask0 的数据是 10M,尽管同样是存在误差的均匀值,可是相比紧缩数据,经过精确的总量反推得到的均匀值会愈加精确。这个时分 Spark 依照 100M 的期望值来切分,就会切成 10 个 ReduceTask,契合咱们的预期。
而在实践运用中,利用新方案,AQE SkewedJoin 切分歪斜数据愈加均匀,优化作用有明显的提升。
下图是某个歪斜处理作用不抱负的作业,SkewedJoin 收效后,该 Stage ShuffleReadSize 的中位数和最大值分别为 4M 和 9.9G。
经过咱们的优化后,该 Stage 的 ShuffleReadSize 的中位数和最大值分别为 149M 和 1427M,歪斜分区的切分愈加均匀,该 Stage 的运行时间也由本来的 2h 降为 20m。
3.3 支撑更多的场景
- 场景1:JoinWithAggOrWin
以下图为例,Stage10 尽管只有一个 SortMergeJoin,可是 join 的一边并不是 Sort+Exchange 的组合,而是存在 Aggregate 算子或许 Window 算子,因而不归于社区完结的范围内。
- 场景2:MultipleSkewedJoin
在用户的事务逻辑中,经常出现这样一种场景:一张表的主键需求接连的 join 多张表,这种场景体现在 Spark 的具体履行上,便是接连的 join 存在于同一个 Stage 傍边。如下图所示 Stage21 中存在接连的多个 SortMergeJoin,而这种场景也是社区的完结无法优化的。
- 场景3:JoinWithUnion
Stage 中有 Union 算子,且 Union 的 children 中有 SMJ。
此外,咱们还支撑了 ShuffleHashJoin、 BucketJoin、MultipleJoinWithAggOrWin 等更多场景。
4. 字节的实践
上面介绍的LAS对SparkAQE SkewedJoin 的优化功用在字节跳动内部已运用 1 年左右,截止 2022年8月,优化日均覆盖1.8万+ Spark 作业,优化命中作业均匀性能提升 35% 左右,其间 30% 被优化的 Spark 作业所归于的场景是 LAS 自研支撑的,大家能够经过火山引擎开通 LAS 服务并体会这些优化功用。
5. 用户指南
5.1 哪些场景 AQE SkewedJoin 不支撑
AQE SkewedJoin 功用并不能处理一切产生数据歪斜的 Join,这是由它的完结逻辑所决定的。
榜首,假如歪斜的分区的大部分数据来自于上游的同一个 Mapper,AQE SkewedJoin 无法处理,原因是 Spark 不支撑 Reduce Task 只读取上游 Mapper 的一个 block 的部分数据。
第二,假如 Join 的产生歪斜的一侧存在 Agg 或许 Window 这类有指定 requiredChildDistribution 的算子,那么 SkewedJoin 优化无法处理,由于将分区切分会损坏 RDD 的 outputPartitioning,导致不再满意 requiredChildDistribution。
第三,关于 Outer/Semi Join,AQE SkewedJoin 是无法处理非 Outer/Semi 侧的数据歪斜。比方,关于 LeftOuter Join,SkewedJoin 无法处理右侧的数据歪斜。
第四,AQE 无法处理歪斜的 BroadcastHashJoin。
5.2 AQE SkewedJoin 优化作用不明显时的办法
假如遇到了契合运用场景可是 SkewedJoin 没有收效或许歪斜处理作用不抱负的状况,有以下调优手法:
- 进步
spark.shuffle.minNumPartitionsToHighlyCompress
,确保值大于等于 shuffle 并发(当敞开 AQE 时,即为spark.sql.adaptive.coalescePartitions.initialPartitionNum
)。
- 调小
spark.shuffle.accurateBlockThreshold
,比方 4M。可是需求注意的是,这会添加 Driver 的内存耗费,需求同步添加 Driver 的 cpu 和内存。
- 降低
spark.sql.adaptive.skewJoin.skewedPartitionFactor
,降低界说产生歪斜的阈值。
6. 总结
本文首要简略介绍了 Spark AQE 的基本思维以及 SkewedJoin 功用的原理,接着提出了咱们在运用 SkewedJoin的过程中遇到的一些问题。针对这些问题,咱们介绍了对 AQE SkewedJoin 做的优化和增强——进步核算的精确度;进步歪斜数据的切分均匀程度;支撑了更多的场景。接着,本文介绍了 AQE SkewedJoin 在字节跳动的运用状况,包含日均优化覆盖作业和优化作用,其间30%被优化的 Spark 作业所归于的场景是字节自研支撑的。最后共享了咱们关于 AQE SkewedJoin 的用户指南:哪些场景 AQE SkewedJoin 不支撑;当 AQE SkewedJoin 作用不明显时,能够采取哪些办法。
7. 附录A :本文涉及的关于 AQE SkewedJoin 优化的相关参数配置
参数配置名 | 默许值 | 参数含义 |
---|---|---|
spark.shuffle.minNumPartitionsToHighlyCompress | 2000 | 决定 Mapstatus 运用 HighlyCompressedMapStatus仍是 CompressedMapStatus 的阈值,假如 huffle partition 大于该值,则运用 HighlyCompressedMapStatus。 |
spark.shuffle.accurateBlockThreshold | 100M | HighlyCompressedMapStatus 中记载 shuffle blcok 精确巨细的阈值,当 block 小于该值则用均匀值替代。 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 10 | 假如一个 partition 大于该因子乘以分区巨细的中位数,那么它便是歪斜的 partition。 |
8. 关于咱们
火山引擎湖仓一体分析服务 LAS
支撑构建开源Hadoop生态的企业级大数据分析系统,彻底兼容开源,提供 Hadoop、Spark、Hive、Flink集成和办理,帮助用户轻松完结企业大数据平台的构建,降低运维门槛,快速形成大数据分析才能。点击立即体会产品!
欢迎参加字节跳动数据平台官方群,进行数据技术交流、获取更多内容干货。