本文整理自阿里云实时核算团队 Apache Flink Committer 和 PMC Member 任庆盛在 FFA 2023 中心技术专场(二)中的分享,内容主要分为以下四部分:

  1. Source API
  2. Sink API
  3. 将 Connecter 集成至 Table /SQL API
  4. Catalog API

在正式介绍这些 API 的细节之前,要把这些 API 以它们的分层列了出来,下面一层 API 是 DataStream ,针对 Java 开发一般都是根据 DataStream API 直接开发的。再往上一层便是 Table 和 SQL API ,Connector API 分层也是类似的,假如是想完结一个 Connector 的话其实是一个自底向上的进程。首要需求完结 DataStream 层上的 Source 还有 Sink API ,这个是不可或缺的。在完结了 Source、Sink 之后咱们为了支撑 Table 、Catalog ,需求在其上针关于 Table 层关于 Connector 供给的 API 来进行再次的开发。那接下来的介绍也会按照这个次序,从底向上来介绍每一层 API 的基本的逻辑以及开发时需求留意的一些问题。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

一、Source API

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

Source API 现已引进许多版别,从 1.12 开端咱们有了 Source API 的榜首个版别,到 Flink 1.14 开端逐渐到达一个安稳的状况,并标记成 Public。假如了解 Flink 的时刻较长,咱们之前还有 InputFormat 和 SourceFunction。请咱们留意这些 API 在 2.0 都会被弃用掉,假如需求开发一个新的 Connector ,请重视最新的 Source API 。

从全体的规划来看,Source API 运用了主从的结构,和 Flink 的集群结构是类似的,它分为了两个部分,榜首部分便是 SplitEnumerator ,它相当于整个 Source 的大脑,从姓名上来看它的主要功用便是枚举、分片。

分片是 Source 对外部体系的部分笼统,比方 Kafka 一个分片便是一个 Topic 里的 Partition。假如是一个文件体系的 Source ,那一个 Split 便是一个文件或许是一个文件夹。SplitEnumerator 的作业便是在外部体系傍边发现这些分片并把它做成使命分配给终究真实进行干活的,咱们称为 SourceReader 。SplitEnumerator 是作业级别的,也便是每个作业或许说每个 Source 是只要一个的, SourceReader 是 Subtask 级别的,整个 Source 的并发是多少,那咱们 SourceReader 的实例就有多少。SourceReader 和 Enumerator之间是经过 JM 和 TM 的 RPC 进行通讯的,咱们在这之上也封装了一些作业可以让 Enumerator 和 Reader 之间进行交流,咱们称之为 Source Event,这样可以更好地和谐 Enumerator 和 Reader 之间的使命分配和大局办理的作业。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

为了进一步简化用户对 Source 的开发进程,正如上图所示,咱们供给 SourceReaderBase 的基类,SourceReader 是一个相当于比较底层的接口,为了简化开发难度,咱们也供给了一个 SourceReaderBase ,它进一步把 Source 和外部体系之间的交流部分和与 Flink 进行协作的部分做了一个拆分,这样做的优点是 Source 开发者可以更加重视和外部体系之间的交互,而不需求过多重视和 Flink 之间的 Checkpoint 处理、会不会影响 Flink 主线程的作业等问题。在 SourceReaderBase 的规划之下,咱们又抽出了一个名为 SplitReader 的 API,它才是真实的从外部体系中拉取数据,依据 Enumerator 分配的分片到外部体系中读取数据的部分。数据在读取来之后,SplitReader 会把它放到 SourceReaderBase 中心的一个 Element 队列,实际上便是做了和外部体系以及和 Flink 之间的阻隔。在 Flink 这一侧,咱们真实作业的是 Flink Task 的主线程,这是一个没有 Box 的模型,它会不断地从中心拉取数据,然后把数据经过 RecordEmitter 的处理后发送到下流。RecordEmitter 的作业主要便是用来做反序列化,将外部体系傍边的数据格式转换成下流傍边的一些数据格式。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

在开发 Source 的时分需求留意的问题有哪些,这个是我在检查 Source 的完结和办理傍边总结出来的。榜首点是开发的时分必定要留意把和外部体系交互的部分以及和 Flink 交互的部分区分隔,比方刚刚讲的 SourceReaderBase 为什么中心要插一个队列,这样便是尽量把和外部体系交互的部分和与 Flink 之间交互的部分区分隔。之所以这样做是由于 Flink 的主线程是一个 mailbox 模型 ,包含 Checkpoint 和一些操控信息的传递都是经过这个 mailbox 的线程来做的。假如是咱们用它去和外部体系做 IO 的话,这样有或许会对下流算子以及整个 Task 的运转发生一些影响,包含 Checkpoint 的运转有或许也会受到必定程度的影响,所以在开发的时分必定要留意把做 IO 的线程和 Flink 的线程区分隔。

第二点是咱们 Source 傍边供给了许多东西或许说办法,比方 SplitEnumeratorContext 里边有一个 callAsync 办法,许多人在开发 Enumerator 时没有留意到它,自己去起一个线程池或许去起一个线程,很费劲地去处理各个线程之间的和谐问题。那经过这个 callAsync 咱们现已供给了一个可以给外部体系做 IO 的一个线程的,叫 worker thread。咱们就可以直接利用这个东西,咱们会在 Flink 里边把一些线程之间的阻隔问题处理好。尽量能去复用 SourceReaderBase 和 SplitReader 的时分就尽量去复用它,这样可以大大下降咱们的开发难度,总的来说便是尽量自己少造轮子,可以复用现有的轮子。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

在近几个版别中咱们对 Source 的功用做了增强,首要便是 Hybrid Source ,它有一种典型的用户场景,一些线上用户需求首要去读取 HDFS 或许其他文件体系存储里边的一些存量数据,在读取完已有的存量数据之后进行切换,比方切到 Kafka 或许其他的音讯队列来读在线数据,那实际上是需求一个在不同 Source 之间进行切换的才能。Hybrid Source 便是在现有 Source 的根底上,封装这么一层,供给了这样一个多 Source 之间按序切换的才能。两个 Source 之间是有切换次序的,当一个 Source 比方 FileEnumerator 履行完作业之后,会发生一个 Switch context ,也便是说我会把当时的发展或状况的信息经过这个 SwitchContext 供给给接下来要运转的 Source 。比方说刚说到的场景里边,FileEnumerator 就要告知 Kafka 的 Enumerator 我现在读到哪, Kafka Enumerator 会依据当时在 Switchcontext 里边供给的位点信息或许说时刻信息来正确地发动 Kafka Source 的读取,滑润地迁移到存量阶段或许在线数据的读取傍边。从这个架构图中咱们也可以观察到,实际上 Hybrid Source 便是对咱们现有的 Enumerator 还有 Reader 进行了二次封装,并供给了这样一个东西类来帮助它们进行切换。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

第二个在 Source 上面咱们支撑的功用便是 Watermark Alignment。

先说一下这个问题的背景,不论咱们的作业傍边有一个 Source 还是多个 Source ,经常会遇到的状况是不同 Source 之间读取的发展差异会很大,比方假如是 Kafka, Source 中的某一个 Partition 由于网络或许其他原因,它的发展远远落后于其他的 Partition。或许有两个 Source ,它们之间由于读取不同的外部体系导致发生不同的发展,这样就会导致一些下流的算子比方说我需求做一些 Join、我需求做 Aggregate,就需求等候一切并发的 Watermark 都前进到同一个方位之后才可以出发核算,只要有一个拖了后腿,那其他人的数据都要在状况里边等,这样就会导致后边某些需求用到的算子状况会越来越大,这实际上便是读取发展的不同所导致的。

针对这个问题,咱们提出了这样一个 Watermark Alignment 的机制,在完结的时分,假如是同一个 Source 相对会简单一点,可以直接在这个 Source 的 Coordinator 、或许说是 Enumerator 里边就把这个作业做了。假如跨 Source 之间要完结这个才能,咱们是在中心引进了一个叫 CoordinatorStore 的一个组件。它可以让不同的 Source 之间来交流一些信息,在这儿边咱们需求交流的便是 Watermark 信息, Source Operator 这边会周期性的给自己的 Coordinator 报告当时处理的发展怎样样,然后 Source Coordinator 会周期性的检查当时发展的最小值,假如发现某些 Operator 读的太快了,有落在后边的并发或许说落在后边的 Source ,会让它先等一等,下降一下读取速度,等咱们都追齐之后再往前读。这便是 Watermark Alignment 完结的一个细节。

二 、Sink API

介绍完 Source 之后,咱们再为咱们介绍一下 Sink API 。Sink API 也是经过了许多版别的迭代,最开端咱们有 OutputFormat 和 SinkFunction,相同还要提示咱们这两个 API 在 2.0 里边是被抛弃了的。在引进 Sink 之后咱们也由于某些需求无法满意,所以推出了两个版别:Sink V1 、Sink V2 ,在这儿主要介绍 Sink V2。Sink API 自身的规划相对来讲没有那么复杂,它不涉及到主从结构或许说不涉及到和谐才能,Sink 自身仅仅一个工厂类,是来构建整个 Sink 的拓扑或许说各个组件的。其中最中心的组件便是 SinkWriter ,由于 Sink 自身需求往外写数据,所以不论是什么 Sink ,SinkWriter 必定是必不可少的,它的功用便是把上游的数据进行序列化,然后对应的写出到外部体系。假如说 Sink想要完结 Exactly-once 或许说第二阶段提交的才能,那在此根底上需求供给一个可选的 SinkCommitter 的组件。它们两个之间和谐的方式便是在每个 Checkpoint 的时分,SinkWriter 会生成一个叫 Committable 的特别的音讯。

一般来讲数据库或许便是一个 Transaction,当 Checkpoint 触发的时分会发生这样一个 Committable,留给下面的 SinkCommitter,当一切的并发的 Checkpoint 都完结之后,咱们会经过 SinkCommitter 将 Committable 提交到外部体系傍边去,从而完结这样一个第二阶段提交的进程。有了这两个组件之后咱们还是发现有一些需求很难满意,比方说像 Iceberg、Hive 这些 Sink ,它或许会涉及到 Checkpoint 之后再做一些小文件合并等额定的逻辑。为了更大程度地丰厚 Sink 可以适用的场景,咱们在此根底上又供给了三个部分,别离是 PreWrite 、 PreCommit 、PostCommit。实际上便是答应 Sink 的开发者在 SinkWriter 和 Committer 之间可以刺进任意的拓扑逻辑。我可以在在里边串联许多的Operator也好或许说我可以给它们规划不同的并发,从而完结我 Sink 里边的特别功用。但其实关于绝大多数的 Sink 来讲,这些功用或许用到的机会很少,可是假如你发现咱们现有的 Writer和Committer 没有办法满意需求的时分,那就可以直接考虑用这三个自界说组件来完结自己的逻辑。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

类似于刚刚介绍的 SourceReaderBase, 为了简化 Sink 的开发,咱们供给 Async Sink 的基类,它供给的才能是对一些通用的、异步输出数据逻辑,经过这些场景来供给一个基本的笼统。在这儿边涉及的概念比方 ElementConverter 会将咱们上游的数据转换成可以对外部体系进行的真实的恳求。Async Sink 自身会供给攒批的才能,用户可以经过设置攒批的条件比方当数据到达必定的大小,然后攒批的时刻是多少之后,然后将这一批恳求的批量提交到外部体系傍边去。相同这儿边也供给了内置的异常重试逻辑,假如是某次提交失败了,那么在下一次提交的时分再一次测验把这些数据进行重试提交。根据这些逻辑咱们可以看到实际上它仅仅完结了一个 at-least-once 语义的。但实际上咱们出产傍边,绝大部分的 Sink 都是 at-least-once ,由于完结一个 Exactly-once 的本钱会很大,有一些 Sink 会觉得费了半天劲完结一个 Exactly-once 的 Sink 可是真实运用的人很少,那不如退而求其次,可以把 at-least-once 语义做到极致就可以了。假如你的 Sink 只需求完结 at-least-once 语义,不妨测验 Async Sink ,可以大大下降咱们的开发难度。

三、集成至 Table / SQL API

介绍完下面两层之后,咱们再来说一下怎样把 Source Sink 集成至 Table / SQL API 上面去。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

Table SQL API 关于 connector 供给的接口主要是一个层次关系, Source 最根底的接口叫做 DynamicTableSource,它下面有两种集成:ScanTableSource 和 LookupTableSource 。Scan 望文生义便是对原表的扫描, Lookup 便是咱们常说的对维表的典查的逻辑。这儿我列了一个样例的kafka,那 ScanTableSource 或许便是我从 Kafka 中读取数据,再读取完之后经过 Redis 维表供给的LookupTableSource 去 Redis 上面进行点查,终究会写到 Sink 傍边。比方说咱们是 Hive ,会把这个结果经过这个 Sink 对应的接口叫做 DynamicTableSource 来写到外部体系傍边去。实际上这三个接口都是对 Source 和 Sink 或许说咱们下面会介绍的 LookupFunction 的一个工厂或许说是一个构造器,那真实在下面干活的,便是咱们刚刚说的 DataStream API 、Source 和 Sink 。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

那么咱们先看 ScanTableSource 的接口长什么样。很简单也很好了解,它有两个办法,榜首个叫 getChangelogMode,由于咱们Flink Source全体是支撑三种数据类型的,比方说像 INSERT / UPDATE / DELETE ,假如你的 Source 是一个三种才能都支撑的 Source ,比方说我是一个满 MySQL CDC Source ,我可以读取原表里的刺进、更新和删去等等,我就需求在 ChangelogMode 里边指定我是支撑三个才能的。假如你是一个音讯队列的 Source ,比方说 Kafka ,那它就只支撑一种 INSERT ,那这儿边返回一个支撑 INSERT 就可以了。这个办法会被 Planner 拿去用来对一些下流算子的校验等等,包含你的一些逻辑,全体写出来之后看 INSERT / UPDATE / DELETE 下流的算子能不能承受。

第二个办法便是咱们怎样真实去从 Table 构建出来 Source 在底层运转的 Source API 的构造器,从 Context 里边咱们是可以拿到用户在 Source 在 CREATE TABLE 句子里边的一切装备的。我会依据这些装备,创立出对应的 Source Provider ,把终究运转在里边的 Source 构建出来。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

再介绍一下 LookupTableSource ,这个刚刚说是完结了点查的逻辑,可是在 DataStream API 咱们没有供给一致的笼统的接口,便是可以供给这样一个典查的才能,那么在这儿边咱们在 Table 阶段用的是叫 LookupFunction 。

它有两个版别,一个是 LookupFunction ,一个是 AsyncLookupFunction ,别离对应的是同步的完结和异步的完结。咱们在 1.16 版别里边也为维表供给了一些辅佐的才能,比方说维表的 Cache 可以快速地帮你构建出来一个 LookupFunction 这样一些东西类,这样咱们在完结维表的时分也可以去考虑运用它。LookupTableSource 的接口比 Source 的还要简单,它不需求供给 ChangelogMode,由于它的作业便是接一条数据然后将对应的字段到外部体系查一下就可以了。唯一供给的便是 LookupRuntimeProvider 怎样依据用户的的装备来构建出来在 Runtime 傍边运用的 LookupFunction 。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

然后是 Sink ,Sink 和 Source 相似,两个接口我是不是支撑写出 INSERT / UPDATE / DELETE ,给 Planner 做校验,下面便是咱们怎样依据用户的装备构建出来 Sink ,这个跟 Source 基本上是彻底对称的。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

除此之外咱们有一些 Source 是支撑高级辅佐才能的,比方说我可以供给给 Planner 这个信息, Planner 可以把 Projection Pushdown 到这个 Source 里边,把 Filter Pushdown 到另一个 Source 里边,咱们在这儿边一致地去界说了一些接口比方说 Supports 后边是才能的姓名。比方说咱们有一个很厉害的 Table Source ,那它是支撑 FilterPushdown 和 ProjectionPushdown 的。很简单,咱们只需求在这个类上面去完结这两个接口就可以了,依据这些接口里边供给的办法来供给对应的信息,比方怎样把真实的 Pushdown 推到 Source 里边。咱们可以在代码里边检查这些一切支撑的才能,挑选自己可以支撑的然后进行对应的完结。

四、Catalog API

在介绍完 Table 层后便是最终一部分 Catalog API 。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

在这儿我举个简单的比如,写过 SQL 的人都知道,很头疼的一件事便是去写 CREATE TABLE ,咱们需求给每一个字段去界说它的类型和姓名,比方说我在读取它的上游表,这个表里边界说的字段上百个,这是十分常见的状况,需求逐个把它映射到 Flink 的数据类型里边,然后把它罗列在列界说里。除此之外,咱们还需求为 Table 写 With 参数,指定 Connector ,装备这个 Connector。某些 Connector 的装备十分复杂,比方说衔接一个敞开了 SSL 的 Kafka 集群,或许需求写许多的参数才可以把这个 Table 创立出来,这便是咱们遇到的榜首个问题:CREATE TABLE 句子太冗长了。

第二个问题是装备问题很难复用,比方说我今日为这个集群装备了这个表,写了一堆参数,明日我还需求用这个集群,另一个表又写了一遍参数,这个感觉就很冗余。别的还有一个问题便是刚刚说到的我要给每个字段处理它对应的类型映射,这个也很麻烦。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展
Catalog 的诞生便是为了解决这个问题。Catalog 实际上是一个可以供给外部体系原信息的一个组件,咱们在 Catalog 这个 API 里边是供给了一个一致的笼统,和 Flink、Source里边的概念相对应。比方说像 Database 、 Table 、 Partition 、 View 、Function 供给这些一致的笼统概念,咱们在开发 Catalog 的时分只需求把外部体系对应的概念和它们进行逐个映射就可以了。这个也不是必须的,比方说我外部体系没有 Partition 、Function 这些的话就可以不完结它。Catalog 的其他才能还有可以对原信息进行一个耐久化的存储,关于 Hive 的话咱们可以对接到 Hive 的 Catalog 里边,把一些表的信息存储到里边进行一个耐久化,方便后边进行复用。

除此之外 Catalog 供给了一个一致的 API 可以对外部体系进行一个一致的办理。当咱们供给了 catalog 之后就可以大幅简化用户的装备本钱。举个比如,外部体系各式各样,或许有各式各样的数据库类型,它们对自己办理数据库的概念又不一样,或许有的有 Schema、有的没有 Schema,有的叫 Database 、有的叫 Namespace,经过咱们一致的 Catalog API 的这层翻译之后,可以把它们对应的概念逐个映射到 Flink 里边的概念傍边,用户在运用的时分接触到的就只要一种概念,咱们在 Flink 里边界说的这些顶层逻辑,直接经过 Catalog 里边挑选这个表就可以把这个表的数据拉出来了。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

在这儿举个比如,咱们或许对 MySQL 这些数据库或许都比较好映射,它里边有 Database 、 Table 。那不典型的也不是这种结构化存储的比方说 kafka ,它能不能支撑 Catalog 呢?当然是可以的。在这儿边咱们把一个 Kafka Catalog 映射到一个 Kafka 集群上面,一个 Table 对 kafka 来说便是一个 Topic ,在这儿边 kafka 或许没有那么多层级的概念,或许没有一个 Database,那我就不映射 Database,给一个默认的就可以了,在完结 Kafka Cluster 的时分可以让用户装备这个 Topic 里边读取的数据类型。在这儿边我举个比如,Kafka 里边存储的是一个 JSON 类型,那 Catalog 自身就可以对每一种字段的类型依据它 JOSN 的内容进行一个推导,把每一条数据映射到表里边的每一个行,这样就完结了对 Kafka 一致的笼统,对终端用户来讲假如运用这个 Kafka catalog ,就没有必要重复去装备这个 Kafka 集群的一些信息,想要哪一个 Topic 的数据,一个 SELECT 句子就可以直接拿出来,这样可以大大下降用户的运用门槛。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

有了 Catalog 之后,根据 Catalog 可以做一些更丰厚的作业,比方血缘信息办理。在 1.18 和 1.19(还未发布)两个版别傍边 Flink 也会对血缘信息做一些支撑。现在现已完结的部分 FLIP-294、Catalog Modification Listener ,也便是咱们可以在 Catalog 上面注册一个监听器,假如 Catalog 有任何的改变,比方说加表、删表,这些信息都会经过 Listener 报告到对应的外部组件里边。在血缘信息办理傍边,它报告的对象便是一个 MetadataPlatform ,比方说像 Atlas、Datahub 这些原信息办理体系,相对应的假如有建表我会在原信息办理平台上面创立对应的数据节点,删表之后会将其进行移除。

在未来 1.19 版别里边咱们估计要完结的便是这个对作业血缘的监听,刚刚咱们经过对 Katalog 的监听是对 MetadataPlatform 的一些数据节点进行一个创立,怎样把点之间的线衔接起来呢?经过 FLIP-314 之间界说的一些接口它会对作业的发动、停止、暂停等根底的信息进行监听。假如一个作业发动之后,可以经过拿到它的 Source 和 Sink 把两个数据节点之间的线衔接起来,这样就可以完好的取得这个 Flink 集群上面运转的数据血缘或许说节点之间的一个上下流逻辑,方便用户对自己的数据流向有更充沛的办理。

怎样高效接入 Flink: Connecter / Catalog API 中心规划与社区发展

再回到这个图,刚刚的介绍次序是由底向上的,当咱们想要完结自己的 Connector 时,首要需求考虑要接触到的必定是这个 Source 和 Sink ,它们是在 DataStream 层上面的完结,也是上面这些 API 的基石,想完结一个 Connector 必定要完结这两个接口。假如说 Connector 想支撑 SQL、还有 Table 丰厚的生态,咱们需求在它的根底上完结 DynamicTableSource 和 DynamicTableSink , 它们可以了解成下面 Source 和 Sink 的构造器。假如想进一步简化用户运用 Connector 的本钱,不要每次都写一堆很冗长的 Table ,咱们对它进行一个复用,然后就可以去对接到 Catalog API 上面,把自己外部体系的一些概念笼统到 Flink 上面去,这样用户可以直接从你的外部体系 Catalog 里边 Select 数据出来, 不需求重复的去界说字段、界说装备等等,可以下降用户的运用门槛。供给了 Catalog 之后咱们就可以天生的取得一些血缘办理或许说原信息办理的才能。