本文整理自字节跳动根底架构工程师曹帝胄在 Flink Forward Asia 2022 核心技术专场的同享。Flink OLAP 作业 QPS 和资源隔离是 Flink OLAP 核算面临的最大难题,也是字节跳动内部事务运用 Flink 履行 OLAP 核算需求处理的最大痛点。本次同享将围绕 Flink OLAP 难点和瓶颈剖析、作业调度、Runtime 履行、收益以及未来规划五个方面打开介绍。
Flink OLAP in ByteDance
针对内部许多混合核算的需求场景,字节跳动提出了整合 AP 和 TP 核算的 ByteHTAP 体系,一起将 Flink OLAP 作为ByteHTAP 的 AP 核算引擎。在字节跳动一年多的发展中, Flink OLAP 现已布置支撑了 20+的 ByteHTAP 线上集群,集群规划到达 16000+Cores,每天承当 50w Query 的AP流量。
上图是 Flink OLAP 在字节跳动的服务架构,Flink OLAP 经过 SQL Gateway 供给 Restfull 接口,用户能够经过 Client 向 SQL Gateway 集群提交 Query,SQL Gateway 担任 SQL 解析并生成履行计划后提交给 Flink 集群。Flink 集群接纳到恳求后,由 Dispatcher 创立 JobMaster,依据集群内的 TM 按照一定的调度规则将 Task 布置到对应的 TaskManager 上,终究 Task 将成果推回 Dispatcher,而且终究由 Dispatcher 推给 Client。
应战
Flink OLAP 在发展期间也遇到了许多应战。不同于流式核算使命,OLAP 使命大部分都是秒级、毫秒级的小作业,具有 QPS 高、时延小的特色。以内部事务为例,事务方要求在高峰期支撑大于 200 的 QPS,而且 Lantency p99 < 2s,而优化前的 Flink 调度功用还不能满意事务方需求,因而咱们针对 Flink 的调度功用全链路进行了瓶颈剖析。
首要经过设计针对调度功用的一系列 Benchmark,从事务动身依据杂乱度构建 3 组测验作业。每个 Source 节点只会发生一条数据,数据量能够忽略不计。测验环境运用 了5 台物理机启动了一个 Flink Serssion 集群,总共约 500 Cores CPU,大约 1.25w 个 Slot,完结了一个 Benchamrk 的 Client 能够依据不同的并发度批量提交作业。咱们在benchmark成果中核算了 10min 内完结的作业数量,并核算作业完结的均匀 Latency。
为了更好的剖析 Flink 调度阶段的功用瓶颈,将调度阶段分成了三个阶段。第一个阶段是集群 Dispatcher 收到作业恳求后直接完结作业并回来成果;第二阶段是作业在 JobMaster 中恳求完资源后直接完结并回来成果。第三个阶段是 JobMaster 将 Task 布置到 TaskManager 后,TaskManager 不履行逻辑直接将 Task 置为完结并回来,jobMaster接纳一切Task完结的音讯后,将作业置为完毕。在实践中发现从资源恳求到作业布置的进程中 QPS 功用下降显着。
在 E2E 的测验场景中,能够看到在 WordCount 作业中 Client 并发度从 16 进步到 32 后 Latency 上升显着,Join 作业更是在 4 并发到 16 并发时 Latency 显着上涨。
经过上面的 Benchmark 和 Flink 布置的全流程剖析能够发现首要有两个问题,一块是作业在资源办理和布置上的瓶颈,一块是使命在运转时推迟瓶颈。
针对OLAP场景,在作业资源办理和布置方面,现在 Flink 资源办理流程和布置交互流程过于杂乱。在运转方面,Flink 的作业拉取成果流程存在较多限制,另外大量的小查询会导致资源频频的创立毁掉。针对上面这些问题,咱们分别从作业调度和运转时两个大方向进行优化。
作业调度
资源办理流程优化
现在字节 OLAP 的改造是基于 Flink-1.11 版别,因而先介绍下 Flink-1.11 的集群资源恳求和开释流程。首要TaskManager 在布置完结后向 ResourceMananger 注册,JobMaster 向 RM 进行资源恳求,RM 依据恳求的 Slot 对 TM 进行布置。TM 收到布置恳求后与 JobMaster 树立衔接并供给 Slot 资源。终究由 JobMaster 对 Slot 的资源进行分配并向 TM 进行布置。资源开释流程相同,在使命完毕后,JobMaster 会开释对应的 Slot 的资源,并开释 TaskManager 衔接,TM 也会告诉 RM Slot 资源开释。
从以上流程能够看到作业恳求和开释 Slot 资源分别需求 JobMaster、ResourceManager 以及 TaskManager 进行 2 次网络交互。这些耗时在单查询情况下虽然或许只添加了几十到几百毫秒,但在多查询情况下,会将这部分的耗时放大,甚至使查询作业的资源恳求耗时添加为秒级。
一起在这个流程中能够看到 ResourceManager、JobMaster 以及 TaskManager 三个核心功用模块在资源恳求和开释上的功用区分不行清晰,ResourceManager 办理核算资源存活,另一方面又办理作业的资源分配,形成查询资源恳求的单点问题;另一方面,TaskManager 不只履行核算使命,一起还参与核算资源的恳求和分配流程,恳求和开释流程过长。
此外资源分配中 SlotPool 处理 Slot 恳求和分配比较杂乱,每个 Task 需求获取上游 Task 的分配方位,一起 Share Group 分配资源有多次排序和遍历,添加了 Slot 分配的耗时,这个跟着作业杂乱度上升,耗时也会添加。
在原流程中 ResourceManager 分配 Slot 时需求确保 TaskManager 中指定的 Slot 是闲暇可用的,这会添加恳求和开释流程的杂乱度。一起 TaskManager 经过在资源恳求流程中依据 Slot 初始化对应的 TaskSlot 以及 MemoryManager,确保每个 Slot 只被一个作业的多个核算使命运用。经过剖析能够发现,多个核算使命在同享 Slot 进程中,首要是同享 MemoryManager 办理 Batch 算子的 Aggregate、Join、Sort 等算子的暂时状况以及流核算使命中的 Rocksdb 堆外内存恳求和开释,这部分内存同享的完结跟作业没有强绑定关系,所以多个作业的多个核算使命也能够同享 MemoryManager。因而在为了简化资源恳求流程,及作业的资源同享上,经过去除 Slot 的感念,在 TaskMananger 中运用大局同享的 MemoryManager。
在优化后的流程中,TaskManager 启动后会向 ResourceManager 进行注册,ResourceManager 向 Dispatcher 同步 TM 信息。这儿的 Dispatcher 会一起保护一份集群 TM 的列表,并在作业提交时供给给 Jobmasger。JobMaster 依据集群的 TM 依据指定的布置战略选择布置的 TM 并向 TM 发送布置恳求。优化后的各组件分工如下:
- ResourceManager:办理核算节点的存活以及节点的资源信息汇总,不再履行 OLAP 类型查询核算的资源分配;
- TaskManager:支撑核算使命的履行,在将资源信息汇报给 ResourceManager 后,不参与作业资源恳求和开释流程。
- JobMaster:支撑和完结查询核算的资源分配。
在资源流程改造中,由于去除了 Slot 限制,因而在作业布置上能够以 TM 为粒度批量进行作业布置,经过对布置恳求进行打包,大大减少了 JobMaster 与 TaskManager 之间的恳求次数。
使命布置结构优化
在完结上面的优化后,经过对 Source、WordCount 以及 Join 等三类不同杂乱度的作业核算使命布置功用测验,咱们发现不同作业杂乱度对于核算使命布置的功用影响非常大。
- Source 作业,只包含 Source 节点,共 1 个节点,128 并发,共 128 个核算使命,按照 TM 布置战略会运用 10 个 TM,每个 TM 布置 13 个核算使命。
- WordCount 作业,包含 Source 节点和 Aggregate 节点,共 2 个节点,128 并发,共 256 个核算使命,运用 10 个 TM,每个 TM 布置 26 核算使命。
- Join 作业,包含 3 个 Source 节点,2 个 Join 节点以及 1 个 Aggregate 节点,共 6 个节点,128 并发,共 768 个核算使命,运用 25 个 TM,每个 TM 布置 30 核算使命。
从上面的数据能够看到,跟着使命杂乱度的进步,序列化的总耗时添加显着,WordCount 的序列化总耗时约 122s,而 Source 作业的耗时在 5s 左右。Join 作业的序列化耗时更是在 200s 以上。针对这一现象,能够从两个维度进行优化:
- 数据量巨细:经过剖析作业的布置结构发现每个 Task 的布置结构包含作业信息、作业装备等信息,一起包含该 Task 的信息,包含 Task 名称,上下游信息,上下游的方位信息等。这其间同一个作业不同 Task 的作业维度信息都是相同的,一起假如作业是 All To All 的衔接方式,他们的上下游信息也是能够同享。因而能够对布置结构的冗余信息进行提取,比方将作业维度信息、相同 Task 信息、上下游方位信息等。
- 序列化流程:原有的序列化是由 TM 的 Actor 担任的,高并发下存在单线程瓶颈,所以将布置恳求抽出为单独的链路,经过装备多 Netty Thread,并发处理,再将序列完的结构交由 TM 的 Acotr 处理。
经过以上两个维度的优化,Source 作业的序列化巨细由 63kb 降为 5.6kb,耗时由 5462kb 降为 644kb,杂乱作业的优化更为显着,WordCount 作业的序列化巨细由 317.3kb 降为 11.1kb,耗时由 122546ms 降为 940ms,Join 作业的序列化巨细由 557.5ms 降为 28.3ms,耗时由 219189ms 降为 2830ms。
模块交互优化
在完结资源流程恳求和开释优化后,剩下的模块交互首要是 JobMaster 和 TaskManager 的交互。一部分首要是在 JobMaster 初始化完结后,会与一切的 TM 树立网络衔接,一起在作业运转时两者之间会维持心跳衔接;另一部分首要的交互是 TM 会上报 Task 的运转状况,包含布置完结进入 running 的状况以及使命完毕的终态上报。这儿的交互在高并发情况下规划也是比较可观的,以 Task 数为 128 的测验作业在 QPS 100、TM 100 的环境下为例,JM 每秒创立 10000 左右的链接,收到 Task 的更新恳求 256w 次。
在之前的资源办理优化中,Dispatcher 现已保护了集群内 TaskManager 的一切节点信息,因而能够在 TaskMananger 初始化完结后与 Dispatcher 树立衔接和心跳。一切的 JobMaster 复用该衔接,而不单独保护衔接及心跳。一起之前的心跳衔接中 TM 需求上报 Slot 运用快照等信息,这一部分在资源办理优化后也不需求。这儿有个问题是之前 JobMaster 需求经过心跳感知 TM 的状况,而改造后由 Dispatcher 担任保护与 TM 的心跳,因而当 Dispatcher 感知到 TM 反常 后,会告诉相关的 JobMaster 进行 Failover 处理。
在 Task 使命更新恳求的优化中,在 OLAP 的使命场景下默许采用 Pipeline 形式,在这种形式下,一切 Task 会一起开端调度,因而其实并不关怀单个 Task 粒度的状况变化,一起 Task 直接也没有状况的相互依赖,所以咱们能够将状况更新恳求进行打包,在 Task 布置完结后 JobMaster 直接将状况更新为 Running 不进行额定的交互恳求。只在作业完毕时 TM 以使命粒度进行更新。一起针对一些 Block 衔接的场景,比方自 Join,TM 会对这类 Task 进行单独状况更新来避免死锁。
Runtime 履行
Pull VS Push
Flink 核算成果链路基于 Pull 机制,从 Gateway 向 JobManager 建议 Pull 恳求,JobManager 再向 TaskManager 节点 Pull 成果数据。Gateway 到 JobManager 之间存在 Pull 轮询恳求,存在固定的轮询间隔时间,添加了查询的 Latency,很难满意 OLAP 事务对 Letancy 要求比较高的场景。一起为了支撑和完结 Pull 机制,会创立一些暂时的网络、线程等资源,例如在 Sink 节点会创立 Socket Server,在 Gateway 节点会创立轮询线程等,浪费了核算节点和 Gateway 节点的资源。此外,Dispatcher 节点是一个 Akka Actor 单点,Pull 数据流程会经过 Dispatcher 节点处理和转发,加大了 Dispatcher 处理音讯担负。
因而咱们考虑将获取成果链改为 Push 机制,一方面能够处理轮询 Latency 的问题,一起将成果自动 Push 向 Client,也能够避免 Dispatcher 接受恳求的单线程瓶颈,Gateway 也不需求创立轮询线程进行轮询。但 Push 机制也存在新的应战:
- Push 形式下数据回来进行流量控制,避免 Client 端数据堆积发生 OOM;
- 在原先的机制下,Client 会经过 Dispatcher 获取当时使命状况,在 Push 形式下则需求自动将作业状况回来。
作业成果 Push 架构
为确保 Flink 现有功用架构的稳定性,经过在 JobManager 中新增一个独立的 NettyServer 供给 Socket 服务,SocketServer 接纳 Gateway 创立衔接和提交作业,然后将接纳到的 JobGraph 对象经过 DispatcherGateway 提交到 Dispatcher,再由 Dispatcher 将 Task 布置到 TaskMananger 中。其间 Sink 算子会与 JM 的 NettyServer 创立单独的 Channel 衔接 Push 成果。JM 的 NettyServer 收到成果后会将成果推给 Gateway。
整个 Push 进程利用 Netty 的 Watermark 机制进行流量控制,发送节点前判别 Channel 是否可写,不可写则进行阻塞操作,形成反压。只要在接纳节点消费数据完结而且 Netty Watermark 康复到正常值后,发送节点才会康复成果推送。一起 Sink 节点发送的数据会带上当时的成果的状况,用于判别该 Sink 节点是否完结。之后会在 NettyServer 中汇总这些 Task 的成果状况,当一切数据发送完结后,NettyServer 向 Gateway 发送使命完毕的状况信息。告诉 Gateway 数据现已悉数发送完结,该 Query 现已完毕。在 50 TM Client 128 并发下空数据测验下,Push 形式将 QPS 从 Pull 形式的 850 进步到 4096,进步了 5 倍左右。
线程优化
经过对 TM CPU 运用的剖析发现在高并发情况下,线程的创立能占用 30% 以上的 CPU。因而经过池化线程池减少线程的创立毁掉,可是池化线程操作会带来的问题是由于原来的 Cancel 线程只需关怀履行线程是否存活,但线程池中线程资源是复用的,因而需求对履行线程进行封装,保护线程当时履行的 Task,Interrupt 线程在进行 Interrupt 操作时也会判别中止的 Task 是否正确,在线程完毕后更新状况用于其他 Cancel 线程判别当时履行线程的状况。
锁优化
在对 Thread 的 Profier 剖析中发现存在高并发下 Task 频频的抢锁操作。其间 NetworkBufferPool 用于供给数据传输的内存,MemoryPool 为算子核算时供给堆外内存。以 NetworkBufferPool 为例,Task 初始化完结后,LocalBufferPool 为空,一切的网络内存都需求向 NetworkBufferPool 恳求,而 NetworkBufferPool 是整个 TM 仅有的,为了确保一致性,一切的内存恳求和开释都需求恳求锁。以 100 并发, Task 100,TM 为 1 为例,每秒会发生 1w 次的锁恳求。而在 NetworkBufferPool 内存足够的情况下,能够经过将内存打包进行恳求以减少内存恳求的次数然后减少锁抢占的开销。
Task 在向内存池恳求内存会以 BatchSegment 为单位恳求,一个 BatchSegment 封装了 10 个 Segment。经过批量内存恳求将锁抢占的次数减少为原来的十分之一。在 10 并发,Task 100,TM 为 1,BatchSegment size 为 10 的情况下,恳求 Segment 的锁占用恳求由 1000/s 变为 100/s,模拟恳求 10w 个 Segment 的总耗时由 4353ms,缩短为 793 ms。
内存优化
另外在高 QPS 测验中发现当 JM 运转的作业数量比较多时,JM 会频频的触发 Fullgc,导致作业 Latency 上涨。这种情况经过对 JM 的内存剖析能够发现大部分的内存都是用在保存作业的 Metric 信息中,包含作业维度以及 Task 维度的信息。而在 OLAP 的场景下字节内部有单独的 MetricrePorter,许多 Metric 信息并不需求同步到 JM 端。因而经过 TM 端的 Metric 中加入白名单对 Metric 上报进行过滤 ,只保存需求运用的 Metric,比方TM资源信息、Task io 信息等。在对 Metric 进行过滤后,JM 的 Full GC 频率降低了 88%,对作业 Latency 的影响显着减少。
收益
完结作业调度和履行优化后,经过对优化后的 Flink 集群进行 Benchmark 测验。测验成果显现,QPS 进步显着。单节点作业 QPS,从原先的 17 进步到现在的 33。WordCount 两节点作业最高 QPS,从 7.5 进步到 20 左右。Join 作业的 QPS 从 2 进步至 11,Latency 在 32 个线程下,下降显着。单节点作业的 Latency,从 1.8 秒降低到 200 毫秒左右。WordCount 两节点作业从 4 秒降低到 1 秒多。Join 作业的 QPS 从原先的 15 秒降为 2.5 秒左右。
在事务收益方面,ByteHTAP 受到了公司核心事务的广泛认可。现在,现已接入 User Growth、电商、美好里、飞书等 12 个核心事务。线上汲取能支撑事务杂乱查询高峰期 200 左右的 QPS,Latency P99 控制在 5 秒以下。
未来规划
在公司内部,需求支撑多租户的事务场景,该事务场景包含了少量重要租户,冷租户多,租户数量大的特色,存在 Noisy Neghbor,资源利用率低的问题。
针对这类事务会首要在下面几个维度进步 Serverless 能力:
- 经过支撑 TaskManager 资源隔离和单节点维度支撑核算使命优先级调度处理 Noisy Neghbor 的问题;
- 经过支撑弹性扩缩容处理资源利用率问题。
在功用进步方面,首要分为三个部分:
- 集群负载功用:JobManager 支撑水平扩展、集群资源利用率优化、Task 调度功用优化;
- Flink OLAP 运转时优化:运转时网络音讯优化、资源恳求流程优化;
- 冷启动优化:Gateway 冷启动优化、网络初始化优化、内存恳求流程优化。