前言
Flink 作业需求借助 State 来完结聚合、Join 等有状况的核算使命,而 State 也一直都是作业调优的一个要点。现在 State 和 Checkpoint 已经在字节跳动内部被广泛运用,事务层面上 State 支撑了数据集成、实时数仓、特征核算、样本拼接等典型场景;作业类型上支撑了 Map-Only 类型的通道使命、ETL 使命,窗口聚合核算的目标核算使命,多流 Join 等存储数据明细的数据拼接使命。
以 WordCount 为例,假定咱们需求核算 60 秒窗口内 Word 呈现的次数:
select
word,
TUMBLE_START(eventtime, INTERVAL '60' SECOND) as t,
count(1)
from
words_stream
group by
TUMBLE(eventtime, INTERVAL '60' SECOND), word
每个还未触发的 60s 窗口内,每个 Word 对应的呈现次数就是 Flink State,窗口每收到新的数据就会更新这个状况直到最终输出。为了防止作业失利,状况丢掉,Flink 引入了分布式快照 Checkpoint 的概念,定期将 State 耐久化到 Hdfs 上,假如作业 Failover,会从上一次成功的 checkpoint 康复作业的状况(比方 kafka 的 offset,窗口内的核算数据等)。
在不同的事务场景下,用户往往需求对 State 和 Checkpoint 机制进行调优,来确保使命履行的功用和 Checkpoint 的稳定性。阅览下方内容之前,咱们能够回忆一下,在运用 Flink State 时是否常常会面临以下问题:
- 某个状况算子呈现处理瓶颈时,加资源也无法进步功用,不知该如何排查功用瓶颈
- Checkpoint 常常呈现履行功率慢,barrier 对齐时刻长,频频超时的现象
- 大作业的 Checkpoint 发生过多小文件,对线上 HDFS 发生小文件压力
- RocksDB 的参数过多,运用的时分不知该怎么挑选
- 作业扩缩容康复时,康复时刻过长导致线上断流
State 及 RocksDB 相关概念介绍
State 分类
State 分类 | StateBackend | 存储方式 | 运用场景 |
---|---|---|---|
OperatorState | DefaultOperatorStateBackend | ListState/UnionState 运用 ArrayList 进行存储;BroadcastState 运用 HashMap 进行存储;在内存中进行存储 | Non-Keyed Operator 中运用的状况,首要存储少量元信息数据,一般为 KB 等级:比方 Kafka Connector 中运用 Union State 存储 partition 和 offset |
KeyedState | HeapKeyedStateBackend | 运用 StateTable(类似 HashMap) 的结构在内存中存储状况数据 | Keyed Operator 中运用的状况,状况巨细受限于 TaskManager 内存和 GC 压力,一般为 MB 等级:比方小流量的窗口聚合等核算场景 |
RocksDBKeyedStateBackend | 根据 RocksDB 存储状况数据,数据存储于内存和磁盘 | Keyed Operator 中运用的状况,状况巨细能够远超出 TaskManager 内存,一般为 GB 等级;比方大流量的多流 Join、去重算子等场景 |
因为 OperatorState 背后的 StateBackend 只要 DefaultOperatorStateBackend,所以用户运用时一般指定的 FsStateBackend 和 RocksDBStateBackend 两种,实际上指定的是 KeyedState 对应的 StateBackend 类型:
- FsStateBackend:DefaultOperatorStateBackend 和 HeapKeyedStateBackend 的组合
- RocksDBStateBackend:DefaultOperatorStateBackend 和 RocksDBKeyedStateBackend 的组合
RocksDB 介绍
RocksDB 是嵌入式的 Key-Value 数据库,在 Flink 中被用作 RocksDBStateBackend 的底层存储。如下图所示,RocksDB 耐久化的 SST 文件在本地文件体系上经过多个层级进行安排,不同层级之间会经过异步 Compaction 兼并重复、过期和已删去的数据。在 RocksDB 的写入进程中,数据经过序列化后写入到 WriteBuffer,WriteBuffer 写满后转换为 Immutable Memtable 结构,再经过 RocksDB 的 flush 线程从内存 flush 到磁盘上;读取进程中,会先尝试从 WriteBuffer 和 Immutable Memtable 中读取数据,假如没有找到,则会查询 Block Cache,假如内存中都没有的话,则会按层级查找底层的 SST 文件,并将回来的成果所在的 Data Block 加载到 Block Cache,回来给上层运用。
RocksDBKeyedStateBackend 增量快照介绍
这儿介绍一下大家在大状况场景下常常需求调优的 RocksDBKeyedStateBackend 增量快照。RocksDB 具有 append-only 特性,Flink 利用这一特性将两次 checkpoint 之间 SST 文件列表的差异作为状况增量上传到分布式文件体系上,并经过 JobMaster 中的 SharedStateRegistry 进行状况的注册和过期。
如上图所示,Task 进行了 3 次快照(假定作业设置保存最近 2 次 Checkpoint):
- CP-1:RocksDB 发生 sst-1 和 sst-2 两个文件,Task 将文件上传至 DFS,JM 记录 sst 文件对应的引证计数
- CP-2:RocksDB 中的 sst-1 和 sst-2 经过 compaction 生成了 sst-1,2,而且新生成了 sst-3 文件,Task 将两个新增的文件上传至 DFS,JM 记录 sst 文件对应的引证计数
- CP-3:RocksDB 中新生成 sst-4 文件,Task 将增量的 sst-4 文件上传至 DFS,且在 CP-3 完结后,因为只保存最近 2 次 CP,JobMaster 将 CP-1 过期,同时将 CP-1 中的 sst 文件对应的引证计数减 1,并删去引证计数归 0 的 sst 文件(sst-1 和 sst-2)
增量快照涉及到 Task 多线程上传/下载增量文件,JobMaster 引证计数核算,以及许多与分布式文件体系的交互等进程,相对其他的 StateBackend 要更为杂乱,在 100+GB 乃至 TB 等级状况下,作业比较简单呈现功用和稳定性瓶颈的问题。
State 实践经验
进步 State 操作功用
用户在运用 State 时,会发现操作 State 并不是一件很”简单”的事情,假如运用 FsStateBackend,会常常遇到 GC 问题、频频调参等问题;假如运用 RocksDBStateBackend,涉及到磁盘读写,目标序列化,在缺乏相关 Metrics 的状况下又不是很简单进行功用问题的定位,或许面临 RocksDB 的许多参数不知道如何调整到最优。
现在字节跳动内有 140+ 作业的状况巨细到达了 TB 等级,单作业的最大状况为 60TB,在逐渐支撑大状况作业的实践中,咱们积累了一些 State 的调优经验,也做了一些引擎侧的改造以支撑更好的功用和下降作业调优本钱。
挑选合适的 StateBackend
咱们都知道 FsStateBackend 合适小状况的作业,而 RocksDBStateBackend 合适大状况的作业,但在实际挑选 FsStateBackend 时会遇到以下问题:
- 进行开发之前,对状况巨细无法做一个准确的预估,或许做状况巨细预估的杂乱度较高
- 跟着事务增加,所谓的 “小状况” 很快就变成了 “大状况”,需求人工介入做调整
- 相同的状况巨细,因为状况过期时刻不同,运用 FsStateBackend 发生 GC 压力也不同
针对上面 FsStateBackend 中存在的若干个问题,能够看出 FsStateBackend 的维护本钱仍是相对较高的。在字节内部,咱们暂时只引荐部分作业总状况小于 1GB 的作业运用 FsStateBackend,而关于大流量事务如短视频、直播、电商等,咱们更倾向于引荐用户运用 RocksDBStateBackend 以削减未来的 GC 危险,获得更好的稳定性。
跟着内部硬件的更新迭代,ssd 的推广,久远来看咱们更希望将 StateBackend 收敛到 RocksDBStateBackend 来进步作业稳定性和削减用户运维本钱;功用上期望在小状况场景下,RocksDBStateBackend 能够和 FsStateBackend 做到比较挨近或许打平。
观测功用目标,运用火焰图剖析瓶颈
社区版别的 Flink 运用 RocksDBStateBackend 时,假如遇到功用问题,基本上是很难判别出问题原因,此刻主张翻开相关目标进行排查[1]。别的,在字节跳动内部,形成 RocksDBStateBackend 功用瓶颈的原因较多,咱们构建了一套较为完整的 RocksDB 目标体系,并在 Flink 层面上默许透出了部分关键的 RocksDB 目标,并新增了 State 相关目标,部分目标的示意图如下:
形成 RocksDB 功用瓶颈的常见如下:
- 单条记录的 State Size 过大,因为 RocksDB 的 append-only 的特性,write buffer 很简单打满,形成数据频频刷盘和 Compaction,抢占作业 CPU
- Operator 内部的 RocksDB 容量过大,如 Operator 所在的 RocksDB 实例巨细超越 15GB 咱们就会比较显着地看到 Compaction 愈加频频,而且形成 RocksDB 频频的 Write Stall
- 硬件问题,如磁盘 IO 打满,从 State 操作的 Latency 目标能够看出来,假如长期停留在秒等级,阐明硬件或许机器负载偏高
除了以上目标外,别的一个能够相配合的办法是火焰图,常见办法比方运用阿里的 arthas[2]。火焰图内部会展现 Flink 和 RocksDB 的 CPU 开支,示意图如下:
如上所示,能够看出火焰图中 Compaction 开支是占比十分大的,定位到 Compaction 问题后,咱们能够再根据 Value Size、RocksDB 容量巨细、作业并行度和资源等进行进一步的剖析。
运用合理的 RocksDB 参数
除了 Flink 中提供的 RocksDB 参数[3]之外,RocksDB 还有许多调优参数可供用户运用。用户能够经过自定义 RocksDBOptionsFactory 来做 RocksDB 的调优[4]。经过内部的一些实践,咱们列举两个比较有用的参数:
- 封闭 RocksDB 的 compression(需求自定义 RocksDBOptionsFactory):RocksDB 默许运用 snappy 算法对数据进行紧缩,因为 RocksDB 的读写、Compaction 都存在紧缩的相关操作,所以在对 CPU 敏感的作业中,能够经过
ColumnFamilyOptions.setCompressionType(CompressionType.NO_COMPRESSION)
将紧缩封闭,选用磁盘空间容量换 CPU 的方式来削减 CPU 的损耗 - 开启 RocksDB 的 bloom-filter(需求自定义 RocksDBOptionsFactory):RocksDB 默许不运用 bloom-filter[5],开启 bloom-filter 后能够节省一部分 RocksDB 的读开支
- 其他 cache、writebuffer 和 flush/compaction 线程数的调整,相同能够在不同场景下获得不同的收益,比方在写少多读的场景下,咱们能够经过调大 Cache 来削减磁盘 IO
这儿要留意一点,因为许多参数都以内存或磁盘来换取功用上的进步,所以以上参数的运用需求结合详细的功用瓶颈剖析才干到达最好的效果,比方在上方的 火焰图 中能够显着地看到 snappy 的紧缩占了较大的 CPU 开支,此刻能够尝试 compression 相关的参数。
关注 RocksDBStateBackend 的序列化开支
运用 RocksDB State 的相关 API,Key 和 Value 都是需求经过序列化和反序列化,假如 Java 目标较杂乱,而且用户没有自定义 Serializer,那么它的序列化开支也会相对较大。比方去重操作中常用的 RoaringBitmap,在序列化和反序列化时,MB 等级的目标的序列化开支到达秒等级,这关于作业功用是十分大的损耗。因此关于杂乱目标,咱们主张:
- 事务上尝试在 State 中运用更精简的数据结构,去除不需求存储的字段
- StateDescriptor 中经过自定义 Serializer 来减小序列化开支
- 在 KryoSerializer 显式注册 PB/Thrift Serializer[6]
- 减小 State 的操作次数,比方下方的示例代码,假如是运用 FsStateBackend ,则没有太多功用损耗;但是在 RocksDBStateBackend 上因为两次 State 的操作导致 userKey 发生了额定一次序列化的开支,假如 userKey 自身是个相对杂乱的目标就要留意了
if (mapState.contains(userKey)) {
UV userValue = mapState.get(userKey);
}
更多关于序列化的功用和辅导能够参考社区的调优文档[7]。
构建 RocksDB State 的缓存
上面说到 RocksDB 的序列化开支可能会比较大,字节跳动内部在 StateBackend 和 Operator 中心构建了 StateBackend Cache Layer,担任缓存算子内部的热门数据,而且根据 GC 状况进行动态扩缩容,关于有热门的作业收益显着。
相同,关于用户而言,假如作业热门显着的话,能够尝试在内存中构建一个简单的 Java 目标的缓存,但是需求留意以下几点:
- 控制缓存的阈值,防止缓存目标过多形成 GC 压力过大
- 留意缓存中 State TTL 逻辑处理,防止呈现脏读的状况
下降 Checkpoint 耗时
Checkpoint 持续时刻和许多要素相关,比方作业反压、资源是否足够等,在这儿咱们从 StateBackend 的角度来看看如何进步 Checkpoint 的成功率。一次 Task 等级的快照能够划分为以下几个进程:
- 等候 checkpointLock:Source Task 中,触发 Checkpoint 的 Rpc 线程需求等候 Task 线程完结当时数据处理后,开释 checkpointLock 后才干触发 checkpoint,这一步的耗时首要取决于用户的处理逻辑及每条数据的处理时延
- 搜集 Barrier: 非 Source 的 Task 中,这一步是将上游一切 Task 发送的 checkpoint barrier 搜集齐,这一步的耗时首要在 barrier 在 buffer 队列中的排队时刻
- 同步阶段:履行用户自定义的 snapshot 办法以及 StateBackend 上的元信息快照,比方 FsStateBackend 在同步阶段会对内存中的状况结构做浅拷贝
- 异步阶段:将状况数据或文件上传到 DFS
字节跳动内部,咱们也针对这四个进程构建了相关的监控看板:
生产环境中,「等候 checkpointLock」和「同步阶段」更多是在事务逻辑上的耗时,一般耗时也会相对较短;从 StateBackend 的层面上,咱们能够对「搜集 Barrier」和「异步阶段」这两个阶段进行优化来下降 Checkpoint 的时长。
削减 Barrier 对齐时刻
削减 Barrier 对齐时刻的核心是下降 in-flight 的 Buffer 总巨细,即使是运用社区的 Unaligned Checkpoint 特性,假如 in-flight 的 Buffer 数量过多,会导致最终写入到分布式存储的状况过大,有时分 in-flight 的 Buffer 巨细乃至可能超越 State 自身的巨细,反而会对异步阶段的耗时发生负面影响。
- 下降 channel 中 Buffer 的数量:Flink 1.11 版别支撑在数据倾斜的环境下限制单个 channel 的最大 Buffer 数量,能够经过 taskmanager.network.memory.max-buffers-per-channel 参数进行调整
- 下降单个 Buffer 的巨细:假如单条数据 Size 在 KB 等级以下,咱们能够经过下降 taskmanager.memory.segment-size 来削减单个 Buffer 的巨细,然后削减 Barrier 的排队时刻
结合事务场景下降 DFS 压力
假如在你的集群中,一切 Flink 作业都运用同一个 DFS 集群,那么事务增加到必定量级后,DFS 的 IO 压力和吞吐量会成为「异步阶段」中十分重要的一个参考目标。尤其是在 RocksDBStateBackend 的增量快照中,每个 Operator 发生的状况文件会上传到 DFS中,上传文件的数量和作业并行度、作业状况巨细呈正比。而在 Flink 并行度较高的作业中,因为各个 Task 的快照基本都在同一时刻发生,所以几分钟内,对 DFS 的写恳求数往往能够到达几千乃至上万。
-
合理设置 state.backend.fs.memory-threshold 减小 DFS 文件数量:此参数表明生成 DFS 文件的最小阈值,小于此阈值的状况会以 byte[] 的方式封装在 RPC 恳求内传给 JobMaster 并耐久化在 _metadata 里)。
- 关于 Map-Only 类型的使命,一般状况中存储的是元信息相关的内容(如 Kafka 的消费位移),状况相对较小,咱们能够经过调大此参数防止将这些状况落盘。Flink 1.11 版别之前,state.backend.fs.memory-threshold 默许的 1kb 阈值较小,比较简单地导致每个并行度都需求上传自己的状况文件,上传文件个数和并行度成正比。咱们能够结合事务场景调整此参数,将 DFS 的恳求数从 N(N=并行度) 次优化到 1 次
- 这儿需求留意,假如阈值设置过高(MB等级),可能会导致 _metadata 过大,然后增大 JobMaster 康复 Checkpoint 元信息 和布置 Task 时的 GC 压力,导致 JobMaster 频频 Full GC
-
合理设置 state.backend.rocksdb.checkpoint.transfer.thread.num 线程数削减 DFS 压力:此参数表明制造快照时上传和康复快照时下载 RocksDB 状况文件的线程数。
- 在状况较大的状况下,用户为了进步 Checkpoint 功率,可能会将此线程数设置的比较大,比方超越 10,在这种状况下快照制造和快照康复都会给 DFS 带来十分大的瞬时压力,尤其是对 HDFS NameNode,很有可能瞬间占满 NameNode 的恳求资源,影响其他正在履行的作业
-
调大 state.backend.rocksdb.writebuffer.size:此参数表明 RocksDB flush 到磁盘之前,在内存中存储的数据巨细。
- 假如作业的吞吐比较高,Update 比较频频,形成了 RocksDB 目录下的文件过多,经过调大此参数能够必定程度上经过加大文件巨细来削减上传的文件数量,削减 DFS IO 次数。
兼并 RocksDBKeyedStateBackend 上传的文件(FLINK-11937)
在社区版别的增量快照中,RocksDB 新生成的每个 SST 文件都需求上传到 DFS,以 HDFS 为例,HDFS 的默许 Block 巨细一般在 100+MB(字节跳动内部是 512MB),而 RocksDB 生成的文件一般为 100MB 以下,关于小数据量的使命乃至是 KB 等级的文件巨细,Checkpoint 发生的许多且频频的小文件恳求,关于 HDFS 的元数据管理和 NameNode 访问都会发生比较大的压力。
社区在 FLINK-11937 中提出了将小文件兼并上传的思路,类似的,在字节内部的完结中,咱们将小文件兼并的逻辑笼统成 Strategy,这样咱们能够根据 SST 文件数量、巨细、存活时长等要素完结契合咱们自己事务场景的上传战略。
进步 StateBackend 康复速度
除了 State 功用以及 DFS 瓶颈之外,StateBackend 的康复速度也是实际生产进程中考虑的一个很重要的点,咱们在生产进程中会发现,因为某些参数的设置不合理,改动作业装备和并发度会导致作业在重启时,从快照康复时功用特别差,康复时刻长达十分钟以上。
慎重运用 Union State
Union State 的特点是在作业康复时,每个并行度康复的状况是一切并行度状况的并集,这种特性导致 Union State 在 JobMaster 状况分配和 TaskManager 状况康复上都比较重:
- JobMaster 需求完结一个 NN 的遍历,将每个并行度的状况都赋值成一切并行度状况的并集。(这儿实际上能够运用 HashMap 将遍历优化成 N1 的杂乱度[8])
- TaskManager 需求读取全量 Union State 的状况文件,比方 1000 并行度的作业在康复时,每个并行度中的 Union State 在康复状况时都需求读取 1000 个并行度 Operator 所发生的状况文件,这个操作是十分低效的。(咱们内部的优化是将 Union State 状况在 JobMaster 端聚合成 1 个文件,这样 TaskManager 在康复时只需求读取一个文件即可)
Union State 在实际运用中,除康复速度慢的问题外,假如运用不当,关于 DFS 也会发生许多的压力,所以主张在高并行度的作业中,尽量防止运用 Union State 以下降额定的运维负担。
增量快照 vs 全量快照康复
RocksDBStateBackend 中支撑的增量快照和全量快照(或 Savepoint),这两种快照的差异导致了它们在不同场景下的康复速度也不同。其中增量快照是将 RocksDB 底层的增量 SST 文件上传到 DFS;而全量快照是遍历 RocksDB 实例的 Key-Value 并写入到 DFS。
以是否扩缩容来界定场景,这两种快照下的康复速度如下:
非扩缩容场景 | 扩缩容场景 | |
---|---|---|
增量快照 | 快 | 慢 |
全量快照 / Savepoint | 中等 | 中等 |
-
非扩缩容场景:
- 增量快照的康复只需将 SST 文件拉到本地即可完结 RocksDB 的初始化 (多线程)
- 全量快照的康复需求遍历属于当时 Subtask 的 KeyGroup Range 下的一切键值对,写入到本地磁盘并完结 RocksDB 初始化 (单线程)
-
扩缩容场景:
- 增量快照的康复涉及到多组 RocksDB 的数据兼并,涉及到多组 RocksDB 文件的下载以及写入到同一个 RocksDB 中发生的许多 Compaction,Compaction 进程中会发生严重的写扩大
- 全量快照的康复和上面的非扩缩容场景一致 (单线程)
这儿比较费事的一点是扩缩容康复时比较简单遇到长尾问题,因为单个并行度状况过大而导致全体康复时刻被拉长,现在在社区版别下还没有比较完全的解决办法,咱们也在针对大状况的作业进行康复速度的优化,在这儿根据社区已支撑的功用,在扩缩容场景下给出一些加速康复速度的主张:
- 扩缩容康复时尽量挑选从 Savepoint 进行康复,能够防止增量快照下多组 Task 的 RocksDB 实例兼并发生的 Compaction 开支
- 调整 RocksDB 相关参数,调大 WriteBuffer 巨细和 Flush/Compaction 线程数,增强 RocksDB 批量将数据刷盘的能力
参数 | 含义 | 默许值 | 主张值 |
---|---|---|---|
state.backend.rocksdb.writebuffer.count | RocksDB存储在内存中的memtable数量,进步这个值能够增加RocksDB的读写功用 | 2 | 4,内存足够可设置为6或8 |
state.backend.rocksdb.writebuffer.size | RocksDB存储在内存中的单个memtable巨细,到达装备值后转为immutable memtable。进步这个值能够增加RocksDB的写功用 | 64MB | 128M,内存足够时可设置为256M |
state.backend.rocksdb.flush.thread.num | RocksDB后台用于将immutable memtable flush到磁盘的线程数,当immutable不能及时刷到磁盘时,会下降rocksdb的write速率。进步这个线程数能够进步rocksdb的写入功用 | 1 | writebuffer.count – 1 |
state.backend.rocksdb.compaction.thread.num | RocksDB后台会不断进行compact操作,来对过期、重复的数据进行整理,然后削减磁盘文件的巨细。 | 1 | 2 ~ 4 |
总结
本篇文章中,咱们介绍了 State 和 RocksDB 的相关概念,并针对字节跳动内部在 State 运用上遇到的问题,给出了相关实践的主张,希望大家在阅览本篇文章之后,关于 Flink State 在日常开发工作中的运用,会有愈加深入的知道和了解。
现在,字节跳动流式核算团队同步支撑的火山引擎流式核算 Flink 版正在公测中,支撑云中立模式,支撑公共云、混合云及多云布置,全面贴合企业上云战略,欢迎申请试用: