本文整理自字节跳动基础架构工程师何润康在 Flink Forward Asia 2022 中心技术专场的同享。Flink OLAP 是数据仓库系统的重要运用,支撑杂乱的剖析型查询,广泛运用于数据剖析、商业决议计划等场景。本次同享将围绕字节 Flink OLAP 全体介绍、查询优化、集群运维和稳定性建造、收益以及未来规划五个方面展开介绍。
一、字节 Flink OLAP 介绍
事务落地状况
字节 Flink OLAP 上线以来接入了包含 User Growth、飞书、电商和幸福里等 12 家以上中心事务方,集群规模到达 1.6 万 Core 以上,每天的查询规模超越 50w 次,单集群支撑了杂乱查询高峰期的 200 QPS,一起 Query Latency P99 控制在 5s 以内,较好的满足了事务的功能需求。
架构
Flink OLAP 的全体架构分为 Flink SQL Gateway 和 Flink Session Cluster 两部分。
首要,用户经过 Client 提交一个 Query,先经过 Gateway 的 SQL 解析和优化进程,生成作业的履行计划,再提交给 Flink Session Cluster 的 JobManager,JobManager 的 Dispatcher 组件会创立一个对应的 JobMaster,并依据特定的调度规则将 Task 布置到对应的 TaskManager 上履行,最终将履行的成果回来给 Client。
Flink OLAP 是作为内部自研的高功能 HTAP 产品 — ByteHTAP 的 AP 引擎,用于支撑内部的中心事务。经过支撑双机房布置进步容灾才能,每个新接入的事务可以在双机房笔直布置两套 AP 集群,在线上集群呈现严峻毛病时,可以经过 Proxy 快速切流到另一个集群,然后进步服务的可用性。
事务落地应战
Flink 在流式场景的运用已经十分成熟,在批式场景的运用也在逐步扩展,可是在 OLAP 场景下的打磨和运用则较少。字节 Flink OLAP 在实在的事务落地进程中遇到了很多问题和应战,首要分为对功能和运维稳定性的应战。
在功能方面的一大应战是 OLAP 事务要求亚秒级的作业 Latency,这和流批有很大的不同,流式和批式首要重视数据的处理速度,而不需求重视 Plan 构建、Task 初始化等阶段的耗时。可是在 OLAP 场景下,优化这些阶段的耗时就变得十分重要。别的,字节 Flink OLAP 根据存算分离架构,有更加强烈的算子下推需求。
另一个应战是,OLAP 事务要求较高的 QPS,所以当 OLAP 集群频频地创立和履行作业,某些状况下会导致集群呈现严峻的功能问题,可是在流式和批式下只需求履行一次一般不会呈现问题。因而,针对以上不同,在 OLAP 场景下进行了很多查询相关的优化,比方 Plan 的构建加快和初始化等相关优化。
在事务的落地进程中,OLAP 和流批场景有很大的不同,运维、监控和稳定性都需求针对 OLAP 场景独自构建。
在运维方面,OLAP 是在线服务,对可用性的要求很高,所以完善测验流程和测验场景是十分必要的,可以削减线上 Bug 的概率。别的在运维晋级时,不同于流批作业的直接重启晋级,OLAP 集群的运维晋级由于不能中止用户运用,所以怎么做到无感知晋级是一个应战。
在监控方面,为了保障在线服务的可用性,线上集群呈现问题后,需求及时进行毛病康复和定位。因而针对 OLAP 下的监控系统就尤为重要。除了流批的集群状况监控外,OLAP 场景下特有的慢查询剖析和监控,是需求额定构建的。
在稳定性方面,第一个应战是建造 OLAP 容灾才能。流批和 OLAP 的毛病康复战略不同,流式作业经过 Failover 来康复,批式作业经过作业重跑或 Failover 来康复。在 OLAP 下,多个作业一起运转在一个在线集群上,单个作业失利可以重试,可是整个集群呈现无法康复的毛病时,假如采用重启康复,分钟等级的耗时关于线上服务是无法接受的。第二个应战是 Full GC 的管理,流批作业对 Full GC 的容忍度相对较高,可是 OLAP 下事务对 Latency 十分灵敏,而且 Full GC 还会导致一起运转的其它作业变慢,严峻影响用户体验。
二、查询优化
Query Optimizer 优化
Plan 缓存
在 OLAP 场景下,Query 有两个典型的特点:事务上重复的 Query 和亚秒级的查询耗时。经过剖析发现,Plan 阶段的耗时为几十到几百毫秒,占比较高。因而支撑了 Plan 缓存,防止相同 Query 的重复 Plan。此外也支撑了 Catalog Cache,加快元信息的访问,还支撑 ExecNode 的并行 Translate,使 TPC-DS Plan 的耗时下降了 10% 左右。
算子下推
在存算分离架构下,算子下推是一类十分重要的优化。中心思路是尽可能的将一些算子下推到存储层进行核算,大幅削减 Scan 的数据量,下降外部的 IO,一起也可以削减 Flink 引擎需求处理的数据量,然后明显进步 Query 的功能。
TopN 下推: 在字节内部的一个典型事务上,大部分 Query 都是取 TopN 的数据。经过支撑 TopN 的下推优化,把 Local SortLimit 算子,也便是 Local 的 TopN 算子,下推到了 Scan 节点,终究在存储层做 TopN 核算,然后大幅下降从存储读取的数据量。经过优化后,读取数据量下降了 99.9%,事务 Query 的 Latency 下降了 90.4%。 除此之外,也支撑了包含 Aggregate、Filter、Limit 等更多的算子下推。
跨 Union All 的常见算子下推: 字节内部某个事务的数据是按照典型的分库分表存放的,在该场景下,用户假如需求查询全量数据,会对多张表进行 Union All 后再进行核算。目前,Flink Planner 缺乏对常用算子跨 Union All 下推的支撑,导致用户查询会从 Source 读取很多的数据,而且处理这些数据也会占用很多的资源,终究导致资源耗费和 E2E Latency 都较高。因而支撑了常用算子跨 Union All 下推的优化,包含 Aggregate,SortLimit 和 Limit 算子。
以 Aggregate 为例,从图中可以看出,在优化之前,Union All 节点的下流是一个 Local Aggregate 节点。由于当时 Flink Planner 不支撑跨 Union All 的算子下推,导致这里的 Local Aggregate 节点无法下推到 Union All 的上游,也无法进一步下推到 Scan 节点,导致从存储读取了很多的数据。优化之后把 Local Aggregate 节点推到了 Union All 的上游,终究下推到了存储做核算。经过优化后,事务查询的 E2E Latency 下降 42%,Flink 集群的 CPU 耗费下降 30%。
Join Filter 传递
在线上事务的查询中,带 Join 的查询是十分多的,其间大部分的查询是 Equal Join,而且带一个 Filter 条件。可是由于 Join 一侧的 Filter 没有传递到 Join 的另一侧,然后导致 Scan 的数据量较大,进而影响查询功能。
因而支撑了 Join Filter 的传递。从上图中可以看出,t1 表的 Filter t1.id > 1,可以经过 Equal 的 Join 条件 t1.id=t2.id,推导出 t2.id>1。因而可以推到 t2 Scan 节点的上游,一起由于支撑了 Filter 传递,终究 t2.id>1 会被下推到存储做核算,那么从 t2 的 Scan 节点读取的数据会大幅削减,然后进步查询功能。
Query Executor 优化
Query Executor 优化
Classloader 复用优化
在线上集群继续运转的进程中,咱们发现了JM / TM 进程频频创立 Classloader,导致 CPU 占用过高的问题。经过火焰图剖析,JVM Dictionary::find 占有了 70% 以上的 CPU,进一步剖析 JVM 源码发现,JVM 在加载了 class 之后,为了加快从 class name 到 Classloader 的查找,会保护一个名叫 SystemDictionary 的哈希表。在 Classloader 数量十分多的时分,哈希表中存在很多的抵触,导致查找进程十分缓慢,一起整个 JM 大部分的 CPU 都耗费在这个过程。
经过定位发现,这些 Classloader 都是 UserCodeClassloader,用于动态加载用户的 Jar 包。从图中看出,新 Job 的 JobMaster 和 TM 上该 Job 的 Task 都会创立新的 UserCodeClassloader,导致 JM 和 TM 上的 Classloader 过多。除此之外,Classloader 过多还会导致 JVM Metaspace 空间不足,进而频频地触发 Metaspace Full GC。
因而支撑了 Classloader 复用的优化,分为两步:首要优化依靠 Jar 包的方法,由于 OLAP 场景下依靠的第三方 Jar 包是相对固定的,可以直接放在 JM 和 TM 发动的 Classpath 下,并不需求每个作业独自提交 Jar 包。其次,关于每个作业在 JobMaster 和 Task 初始化时,直接复用 System Classloader。经过优化后,JM 中 Dictionary::find 所占的 CPU 运用从 76% 下降到 1%,一起,Metaspace Full GC 的频率明显下降。
Codegen 缓存优化
在 OLAP 场景下,Codegen 源代码编译的 TM CPU 占比较高,一起耗时较大。为了防止重复编译,当时的 Codegen 缓存机制会依据 Codegen 源代码的 Class Name 映射到 Task 所用的 Classloader,再映射到编译好的 Class 中,一定程度上缓解了该问题。但在当时缓存机制下,存在两个明显的问题:
- 当时的机制只完成了同一个作业内部,同一个 Task 不同并发的复用,可是关于同一个 Query 的多次履行,依然存在重复编译;
- 每次编译和加载 Class 都会创立一个新的 ByteArrayClassloader,频频创立 Classloader 会导致 Metaspace 碎片严峻,并引发 Metaspace Full GC,形成服务耗时的颤动。
为了防止跨作业代码的重复编译,完成跨作业的 Class 同享,需求优化缓存逻辑,完成相同源代码到编译 Class 的映射。存在以下两个难点:
怎么保证相同逻辑的算子所生成的代码相同?
在 Codegen 代码生成的时分,把类名和变量名中的自增 ID,从大局粒度替换为 local context 粒度,使相同逻辑的算子能生成相同的代码。
怎么设计 cache key 仅有辨认相同的代码?
经过设计根据 Classloader 的 Hash 值 + Class Name + 代码的长度 + 代码的 MD5 值的四元组。并将其作为 cache key 来仅有辨认相同的代码。
Codegen 缓存优化的作用十分明显,TM 侧代码编译的 CPU 运用率 46% -> 0.3%,Query 的 E2E Latency 下降了 29.2%,一起 Metaspace Full GC 的时刻也下降了 71.5%。
反序列优化
在优化 Task 布置功能时,经过火焰图发现,TM Task 初始化阶段的 CPU 占用比较高,进一步剖析发现在做 Task 布置信息的反序列化时,同一个 Task 的多个 Subtask 存在冗余的反序列化。Task 布置信息 TaskInfo 首要包含 Head Operator、Chained Operators 信息,在作业构建时会别离被序列化为 TaskInfo 中的 SerializedUDF 和 ChainedTaskConfig。为了削减冗余的反序列化,有以下两个可优化的方向:
其一是 Chained Operators 的嵌套序列化结构,首要是去掉对 Map 结构不必要的序列化和反序列化,使得同一 Task 的多个 Subtask 可以复用同一个反序列化后的 Map。
其二是 Codegen 算子的优化。在占比较大的 Codegen 算子在初始化时,也存在较高的反序列化开支。经过剖析,该类算子布置信息首要包含 Codegen 源代码,可是一个 TM 上的多个 Subtask 都需求反序列化一遍相同的源代码,存在很多的冗余,因而把 Codegen 源代码拆分出来,独自反序列化一遍后,给所有 Subtask 同享。
以上反序列化优化的作用十分明显,在同一个 Task 的 Subtask 个数等于 3 的时分,TaskInfo 全体的序列化和反序列化 QPS 别离进步了 102% 和 163%。
其他更多优化
- Join Probe 提早输出:Probe / Full Outer Hash Join 支撑在 Probe 阶段,根据 Build 端的 Bloom Filter 提早输出成果,削减 Probe 端数据的落盘,然后进步功能。
- 内存池化:在算子发动的时分,从 Managed Memory 请求内存,并初始化内存分片。在 OLAP 场景下,这部分的时刻和资源耗费占比较大,因而支撑了 Cached Memory Pool,即在 TM 维度内同享内存池,而不需求在算子发动的时分初始化内存。
- 内存运用优化:在并行履行包含很多 Aggregate / Join 算子的 Query 时,发现即使数据量十分小,TM 的Managed Memory 运用也很高。经过排查,关于需求运用 Managed Memory 的算子,每次请求内存的步长是 16 MB,因而这些算子的每个并发都至少需求请求 16 MB 内存,导致内存的实践利用率很低,因而支撑了可配置步长,并设置较小的默认值以节约很多内存。
三、集群运维和稳定性建造
运维系统完善
构建运维发版流程:在进行完善的测验后,运用主动化流水线,对上下流依靠的所有组件统一发版,最终对线上集群进行平滑的晋级。
完善测验方法:支撑 CI、准确性测验、功能测验、长稳和毛病测验。CI 可以及时发现 UT 失利的问题;准确性测验挑选 Query 丰富的 TPC-DS 测验集;功能测验首要包含 TPC-H 功能测验和调度 QPS 功能测验;此外,由于在线服务对稳定性要求比较高,因而支撑了长稳和毛病测验,在服务长期运转,并注入各种毛病场景的状况下,判断集群的状况、测验 Query 的履行成果等是否契合预期。其间毛病测验包含了丰富的毛病场景,包含反常 SQL,JM / TM 退出和网络毛病等,协助发现内存走漏等问题,进步了服务的稳定性。
平滑晋级线上集群:支撑 SQL Gateway 翻滚晋级。具体的完成进程是经过先发动一个新版本的 Flink 集群,再把线上的多个 Gateway 实例逐个翻滚地切流到新的集群,完成无感晋级,使得服务中止时刻从之前的 5 min 下降到挨近为 0。一起在翻滚切流时,会进行小流量验证,在发现问题后可以快速回滚,下降上线风险。
监控系统完善
监控系统的完善进程中,除了流批的集群监控,比方对 CPU 等资源运用的监控、GC 时刻等进程状况的监控外,还增加了细粒度的 CPU 监控,用于明确在短 Query 的状况下,集群是否存在 CPU 瓶颈。与此一起,经过增加查询负载监控,判断事务负载和 Flink 集群的负载是否正常。
在集群监控之外,又增加了 OLAP 下所特有的作业监控,完善了全链路的 Latency,便利快速定位慢查询呈现耗时问题的阶段,比方 Parse、Optimize、Job 履行阶段等。此外,还增加了更多的慢查询和失利查询的监控,以及对依靠的外部 IO 的监控等。
稳定性管理
Flink OLAP 作为在线服务对稳定性要求很高,可是在落地初期,由于服务缺乏容灾、JM / TM FGC 频频等问题,线上稳定性较差。咱们别离从 HA、限流、GC 优化和 JM 稳定性进步四个方面进行管理。
- HA:支撑双机房热备,进步在线服务的可用性。支撑双机房容灾后,可以经过切流快速康复。其次,经过支撑 JM HA,解决 JM 单点的问题,进步线上服务的可用性。
- 限流和熔断:虽然在流式和批式下,没有作业的限流需求,但在 OLAP 场景下,用户会继续提交 Query。为了防止查询高峰集群被打挂,支撑了 SQL Gateway 的 QPS 限流。为了防止多作业一起运转导致的 JM 和 TM 的负载过高、查询过慢的问题,咱们约束了 Flink 集群最大运转的作业数。除了限流之外,还支撑了在 OLAP 下,运用 Failfast 的 Failover 战略,防止失利作业堆积,形成集群雪崩。
- GC 优化:OLAP 场景下,事务对 Latency 十分灵敏,Full GC 会导致耗时颤动。因而优化了 JM 和 TM 的 Full GC。首要移除 Task / Operator 等级的 Metric,使 JM 的 Full GC 频率下降 88%。其次,支撑 Codegen 缓存优化,使 TM 的 Metaspace Full GC 次数下降为挨近 0。
- JM 稳定性进步:在 OLAP 场景下,支撑 JobMaster 去除 ZK 依靠,由于在高 QPS 下,ZK 依靠会导致作业的 Latency 颤动。一起约束 Flink UI 展现的作业数,由于在 OLAP 场景下继续提交很多的作业,会使整个 JM 的内存过大,影响 JM 的稳定性。与此一起,封闭 Flink UI 的主动刷新,防止主动刷新导致 JM 负载上升引起页面的卡顿。
四、收益
Benchmark 收益:经过上述对 Query Optimizer 和 Query Executor 的查询优化,在 TPCH 100G 的Benchmark 中,Query Latency 下降了50.1% 。其次对三类不同杂乱程度的小数据量查询(点查类 Source-Sink、较杂乱的 WordCount 和更加杂乱的三表 Join),进行了 E2E Benchmark,优化作用十分明显,E2E QPS 均匀进步 25 倍,E2E Latency 均匀下降 92%,下降了超越 10倍。
事务收益:功能和稳定性都有明显的进步。功能方面,Job 均匀 Latency 下降了 48.3%,TM 均匀 CPU 下降了27.3%;稳定性方面,JM Full GC 频率下降了 88%,TM Full GC 时刻下降了 71.5%。
五、未来规划
- 产品化完善:包含 History Server 的支撑和慢查询的智能剖析。
- 向量化引擎:充分利用 CPU 的并行化才能,进步核算功能。
- 物化视图:关于大数据量的核算,现查现算的耗时和资源开支都十分大,所以未来考虑引入物化视图加快用户的查询,节约资源运用。
- Optimizer 演进:继续跟进业界和学术界的最新进展,比方根据 Learning-based 完成 SQL Optimization 的 AI4DB 等。