背景
众所周知,Flink 中的 State 保存了算子核算进程的中心成果。当使命出现反常时,能够经过查询使命快照中的 State 获取有效头绪。
但现在关于 Flink SQL 使命来说,当咱们想要查询作业 State 时,通常会由于无法获悉 State 的界说方法和详细类型等信息,而导致查询 State 的本钱过高。
为了处理这个问题,字节跳动流式核算团队在内部提出了 State Query on Flink SQL 的处理计划——用户经过写 SQL 的方法就能够简略地查询 State。本文将主要介绍字节跳动在 Flink 状况查询这方面所进行的相关作业。
State Processor API 介绍
提到状况查询,咱们自然会联想到 Flink 在 1.9 版本提出的特性 — State Processor API。运用 State Processor API,咱们能够将作业发生的 Savepoint 转换成 DataSet,然后运用 DataSet API 完结对 State 的查询、修正和初始化等操作。
下面简略介绍一下怎么运用 State Processor API 来完结 State 的查询:
- 首要创立 ExistingSavepoint 用来表明一个 Savepoint。初始化 ExistingSavepoint 时需求供给 Savepoint 途径和 StateBackend 等信息;
- 然后实现 ReaderFunction 用于从头注册所需求查询的 State 以及界说处理 State 的方法。查询状况的进程中会遍历所有的 Key 并依照咱们界说的方法去操作 State;
- 最终,调用 Savepoint.readKeyedState 并传入算子的 uid 和 ReaderFunction,就能够完结 State的查询。
接下来为大家简述一下 State 查询背面的原理。
在 Savepoint 目录中包括两种文件,一种是状况数据文件,比方上图中的 opA-1-state ,这个文件里面保存着算子 A 在第一个 SubTask 状况的明细数据;还有一种元数据文件,对应上图中的 _metadata,元数据文件中保存了每个算子和状况文件的映射关系。
当咱们在进行状况查询的时分。首要在 Client 端会根据 Savepoint 途径去解析 metadata 文件。经过算子ID,能够获取需求查询的状况所对应的文件的句柄。当状况查询真正执行时,担任读取状况的 Task 会创立一个新的 StateBackend ,然后将状况文件中的数据康复到 Statebackend 中。等到状况康复完结之后就会遍历悉数的 Key 并把对应的状况交给 ReaderFunction 处理。
有些同学可能会问,既然社区已经供给了查询 State 的功用,咱们为什么还要去做相同的作业呢?主要是由于咱们在运用 State Processor API 的进程中发现一些问题:
- 每次查询 State 咱们都需求独立开发一个 Flink Batch 使命,对用户来说具有一定的开发本钱;
- 实现 ReaderFunction 的时分需求比较清晰地了解使命状况的界说方法,包括 State 的称号、类型以及 State Descriptor 等信息,对用户来说运用门槛高较高;
- 运用 State Processor API 时,只能查询单个算子状况,无法一起查询多个算子的状况;
- 无法直接查询使命状况的元信息,比方查询使命运用了哪些状况,或许查询某个状况的类型。
整体来说,咱们的方针有两个,一是下降用户的运用本钱;二是增强状况查询的功用。咱们期望用户在查询 State 时能用最简略的方法;一起也不需求知道任何信息。
此外,咱们还期望用户能一起查询多个算子的 State ,也能够直接查询作业运用了哪些 State,每个 State 的类型是什么。
因而,咱们提出了 State Query on Flink SQL 的处理计划。 简略来说是把 State 当成数据库相同,让用户经过写 SQL 的方法就能够很简略地查询 State。
在这个计划中,咱们需求处理两个问题:
- 怎么对用户屏蔽 State 的信息:参阅 State Processor API 咱们能够知道,查询 State 需求供给非常多的信息,比方 Savepoint 途径、 StateBacked 类型、算子 id 、State Descriptor 等等。经过 SQL 语句明显难以完整地表述这些杂乱的信息,那么查询状况究竟需求哪些内容,咱们又怎么对用户屏蔽 State 里杂乱的细节呢?这是咱们面对的第一个难点。
- 怎么用 SQL 表达 State:State 在 Flink 中的存储方法并不像 Database 相同,咱们怎么去用 SQL 来表达状况的查询进程呢?这是咱们要处理的另一个难点。
StateMeta Snapshot 机制
首要咱们来答复第一个问题,查询一个 State 需求哪些信息呢 ?
能够参阅上文中 State Processor API 的示例,当咱们创立 ExistingSavepoint 和 ReaderFunction 的时分,咱们需求供给的信息有 Savepoint 途径、Backend 类型、OperatorID、算子 key 的类型、State 称号以及 Serializer 等等,咱们能够将这些一致称为状况的元信息。
关于 Flink SQL 使命来说,要清楚地了解这些信息,对用户来说门槛是非常高的。咱们的主意是让用户只需求供给最简略的信息,即 Savepoint ID ,然后由 Flink 框架把其他的元信息都存在 Savepoint 中,这样就能够对用户屏蔽 State 那些杂乱的细节,完结状况的查询。因而,咱们引入了 StateMeta Snapshot 机制。
StateMeta Snapshot 简略来说便是把状况的元信息添加到 Savepoint Metadata 的进程,详细步骤如下:
- 首要在 State 注册的时分,Task 会把 operatorName\ID\KeySerializer\StateDescriptors 等元信息都保存在 Task 的内存中;
- 触发 Savepoint 时,Task 会在制作快照的一起,对状况的元信息也相同进行快照。快照完结之后将状况的元信息 (StateMeta) 和状况文件的句柄 (StateHandle) 一起上报给 JobManager;
- JobManager 在收到所有 Task 上报的 StateMeta 信息之后 ,将这些状况元信息进行兼并,最终会把兼并之后的状况元信息保存到 Savepoint 目录里名为 stateInfo 的文件中。
之后在状况查询时就只需解析 Savepoint 中的 stateInfo 文件,而不再需求用户经过代码去输入这些 State 的元信息。经过这样的方法能够很大程度地下降用户查询状况的本钱。
State as Database
接下来咱们来答复第二个问题,咱们怎么用 SQL 来表达 State。其实社区在规划 State Processor API 的时分就提出了一些处理思路,也便是 State As Database。
在传统的数据库中,通常用 Catalog、Database、Table 这个三个元从来表明一个 Table,其实咱们也能够将用样的逻辑到映射到 Flink State 上。咱们能够把 Flink 的 State 当作一种特殊的数据源,作业每次发生的 Savepoint 都当作一个独立 DB 。在这个 DB 中,咱们将 State 元信息、State 的明细数据,都笼统成不同的 Table 暴露给用户,用户直接查询这些 Table 就能够获取使命的状况信息。
首要咱们来看怎么把 State 表明为 Table。咱们都知道在 Flink 中,常用的 State 有 两种类型 ,分别是 KeyedState 和 OperatorState 。
- 关于 OperatorState 来说,它只有 Value 这一个属性,用来表明这个 State 详细的值。因而咱们能够把 OperatorState 表明为只包括一个 Value 字段的表结构。
- 关于 KeyedState 来说,每个 State 在不同的 Key 和 Namespace 下的值可能都不相同, 因而咱们能够将 KeyedState 表明为一个包括 Key、Namespace、Value 这三个字段的表结构。
当咱们笼统出了单个 State 之后,想要表明多个 State 就比较简单了。能够看到在上图的例子中,这个算子包括 3 个 State,分别是两个 KeyedState 和一个 OperatorState,咱们只需求将这些 Table 简略的 union 起来,再经过 state_name 字段去区分不同的 State,就能够表明这个算子中所有的 State。
最终还有一个问题,咱们怎么知道一个使命究竟用了哪些 State 或许这些 State 的详细类型 呢?
为了处理这个问题,咱们界说了一种特殊表 — StateMeta ,用来表明一个 Flink 使命中所有 State 的元信息。StateMeta 中包括一个使命中每个 State 的称号、State 地址的算子 ID 、算子称号 、Key 的类型和 Value 的类型等等,这样用户直接查询 StateMeta 这个表就能获取使命中所有状况的元信息。
运用 Flink Batch SQL 查询使命状况
以上便是状况查询计划的整体介绍。那咱们究竟怎么去查询一个 State 呢,咱们以一个 Word Count 使命 为例来说明。
首要,咱们需求创立一个 Flink SQL 使命并启动。经过 web-ui 能够看到这个使命中包括三个算子,分别是 Source,Aggregate 还有 Sink。然后,咱们能够触发 Savepoint,当 Savepoint 制作成功之后获取对应的 SavepointID。咱们能够经过 SavepointID 去完结作业状况的查询。
假设咱们现在对 Flink SQL 使命中状况的运用一窍不通,那么首要咱们需求查询的便是这个 Flink 使命中包括哪些 State 以及这些 State 的类型。咱们能够从 StateMeta 表获取这些信息。如上图中场景一所示,经过查询 StateMeta 表,能够看到这个使命包括一个 ListState 和一个 ValueState,分别存在于 Source 算子和 Aggregate 算子中。
此外,有些对 Flink 比较了解的同学知道,KafkaSource 中的 State 是用于记载当前消费的 Offset 信息。 如场景二所示,咱们能够经过查询 Source 算子的状况,获取到使命中消费 Kafka Topic 的 Partition 和 Offset 信息。
还有一种比较常见的场景,比方下游的事务同学发现某个 key(比方 key_662)的成果反常。咱们在定位问题的时分能够直接去查询作业中 aggregate 算子中的状况,一起去指定 key 等于 key_662 作为查询条件。 如上图场景三所示,经过查询的成果能够看到,当 key 为 662 时对应的聚合成果是 11290。用户运用这样的方法就能够比较方便地验证状况是否正确。
未来展望
未来,咱们计划进一步丰富 State 的功用,现在咱们支持了运用 SQL 查询 State 的功用 ,其实社区还供给了 State 修正和初始化的才能。在一些场景下,这些才能也比较重要。比方,咱们已知状况中的部分 key 核算错误,期望将状况中这部分的数据进行批改;或许使命逻辑发生变更以后和之前的状况不能完全兼容, 这个时分咱们期望能够经过状况修正和初始化的才能去生成一个新的 Savepoint。相同,在运用方法上咱们也期望用户能直接运用 SQL 中 insert 和 update 语法来完结状况的修正和初始化操作。
其次,咱们会进一步加强 State 的可用性。咱们运用 DAG 修正的计划处理了作业拓扑发生变化时发生的状况不兼容问题,可是当 Flink SQL 使命修正字段时 State Serializer 可能会变化,相同导致状况无法兼容。针对这种情况咱们规划了完整的 Flink SQL State Schema Evolution 计划,能够极大的增强 Flink SQL 使命发生变化之后状况的康复才能,现在计划正在落地中。此外,咱们还供给了完善的状况康复事前检查才能,能够做到在使命上线之前就检查出状况是否兼容并告知用户,避免状况不兼容引起的作业启动失败对线上造成影响。
加入咱们
字节跳动实时核算团队担任公司内部实时核算场景, 支撑了数仓/机器学习/推荐/搜索/广告/流媒体/安全和风控等很多核心事务,咱们面对的应战是超大单体作业和超大集群规模的使用场景,在 SQL, State, Runtime 上都有深度优化。公司仍处于高速发展阶段,欢迎有才能、有主意的同学来这儿一起建造实时核算引擎。作业地址:北京、杭州,感兴趣的同学欢迎投递:job.toutiao.com/s/YfA8XnA