敞开生长之旅!这是我参与「日新计划 12 月更文挑战」的第4天 /post/716729…
四种Api形式
- 初级API(Stateful Stream Processing):供给了对时间和状况的细粒度操控,简洁性和易用性较差, 首要运用在一些复杂事情处理逻辑上。
- 中心API(DataStream/DataSet API):首要供给了针对流数据和批数据的处理,是对初级API进行了一 些封装,供给了filter、sum、max、min等高档函数,简略易用。
- Table API:一般与DataSet或许DataStream严密关联,能够经过一个DataSet或DataStream创立出 一个Table,然后再运用相似于filter, join,或许 select这种操作。最后还能够将一个Table目标转成 DataSet或DataStream。
- SQL:Flink的SQL底层是根据Apache Calcite,Apache Calcite完成了规范的SQL,运用起来比其他 API更加灵敏,由于能够直接运用SQL句子。Table API和SQL能够很容易地结合在一块运用,由于它 们都回来Table目标。
Api图示
DataStream Api
DataSource
DataSource是程序输入的数据源,Flink自身内置了非常多的数据源,也支撑自定义数据源,现在供给的内置数据源,在企业中开发肯定是足够的
-
根据socket
-
根据Collection
-
第三方数据源(不局限于以下三个)
- Kafka
- RabbitMQ
- NiFi
针对source的这些Connector中,工作中最常用的便是kafka
恢复机制
程序过错,机器故障、网络故障时,Flink有容错机制能够恢复并持续运行。
针对Flink供给的接口,假如敞开了checkpoint,Flink能够供给容错性保证
- Socket:at most once
- Collection:exactly once
- Kafka0.10以上:exactly once
根据collection的source
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecuti
//运用collection调集生成DataStream
DataStreamSource<Integer> text = env.fromCollection(Arrays.asList(1, 2
text.print().setParallelism(1);
env.execute("StreamCollectionSourceJava");
Transformation
transformation是Flink程序的核算算子,担任对数据进行处理,Flink供给了大量的算子,这点上和Spark相似
算子 | 阐明 |
---|---|
map | 输入一个元素进行处理,回来一个元素 |
flatMap | 输入一个元素进行处理,回来多个元素 |
filter | 对数据进行过滤,契合条件留下 |
keyBy | 根据key分组,相同key数据放入同一个分区 |
reduce | 对当时元素和上一次的成果进行聚合操作 |
aggregations | sum(),min(),max()等 |
union | 兼并多个流,多个流数据类型必须共同 |
connect | 只能衔接两个流,两个流的数据类型能够不同 |
split | 把一个流分为多个流 |
shuffle | 对数据进行随机分区 |
rebalance | 对数据进行再平衡,从头分区,消除数据歪斜 |
rescale | 重分区 |
partationCustom | 自定义分区 |
剖析几个要点的算子:
-
union
- 多个流兼并,可是数据类型必须共同
- 处理规矩也必须共同
-
connect
- 两个流被connect后,指示放入同一个流,内部依然坚持各自的数据和方法不改变,彼此独立。
- connect办法会回来connectedStream,在此流中需求适用CoMap,CoFlatMap,相似正常流的map和flatMap
-
split
- 把一个流切分红多个流
- 切分后的流不能再分
- 场景:将一份数据切分红多份,便于对每一份数据进行不同的逻辑处理
- 该办法现已过时,官方不引荐运用,官方推进运用side output办法
-
side output
- 进行切分后的流还能够二次切分
union与connect
union能够衔接多个流,最后汇总成一个流,流里边的数据运用相同的核算规矩
connect值能够衔接2个流,最后汇总成一个流,可是流里边的两份数据彼此还是独立的,每一份数据使
用一个核算规矩
流切分
假如是只需求切分一次的话运用split或许side output都能够
假如想要切分屡次,就不能运用split了,需求运用side output
分区算子
- Random:随机分区
- rebalance:对数据集进行再平衡,从头分区,消除数据歪斜
- rescale:重分区
- custom partition:自定义分区
random:随机分区,它表明将上游数据随机发送到下流算子实例的每个分区,代码层面是调用shuffle办法,shuffle底层关于的是shufflePartitioner类,这个类有selectChannel函数,这个函数会进行核算数据会发送到哪个分区,里边调用的是random.nextInt办法,所以该算子是随机分区的。
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return random.nextInt(numberOfChannels);
}
rebalance:从头平衡分区(循环分区),它会对数据进行再平衡,未每个分区创立相同的负载,实际上便是经过循环的方法给下流算子的每个分区分配数据,在代码层面调用的是rebalance办法,查看源码,该办法调用的是RebalancePartitioner这个类,里边有一个setup函数和selectChannel函数,setup会根据分区数初始化一个随机值nextChannelToSentTo,然后selectChannel函数会运用该随机值+1和分区数取模,把核算的值赋给该变量,后面以此类推,完成向多个下流算子实例的多个分区循环发送数据,这样每个分区获取的数据根本共同。
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels)
}
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
}
rescale:重分区
查看源码,rescale底层对应的是RescalePartitioner这个类里边有一个selectChannel函数,这里边的numberOfChannels是分区数量,其实也能够认为是咱们所说的算子的并行度,由于一个分区是由一个线程担任处理的,它们两个是一一对应的。
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) {
nextChannelToSendTo = 0;
}
return nextChannelToSendTo;
}
* The subset of downstream operations to which the upstream operation sends
* elements depends on the degree of parallelism of both the upstream and downs
* For example, if the upstream operation has parallelism 2 and the downstream
* has parallelism 4, then one upstream operation would distribute elements to
* downstream operations while the other upstream operation would distribute to
* two downstream operations. If, on the other hand, the downstream operation h
* 2 while the upstream operation has parallelism 4 then two upstream operation
* distribute to one downstream operation while the other two upstream operatio
* distribute to the other downstream operations.
假如上游操作有2个并发,而下流操作有4个并发,那么上游的1个并发成果循环分配给下流的2个并发操 作,上游的别的1个并发成果循环分配给下流的别的2个并发操作。另一种状况,假如上游有4个并发操作而下流有2个并发操作,那么上游的其中2个并发操作的成果会分配给下流的一个并发操作,而上游的别的2个并发操作的成果则分配给下流的别的1个并发操作。
注意:rescale与rebalance的区别是rebalance会发生全量重分区,而rescale不会。
broadcast:播送分区,将上游算子实例中的数据输出到下流算子实例的每个分区中,合适用于大数据集
查看源码,broadcast底层对应的是BroadcastPartitioner这个类.看这个类中selectChannel函数代码的注释,提示播送分区不支撑挑选Channel,由于会输出数据到下流的每个Channel中,便是发送到下流算子实例的每个分区中
custom partition:自定义分区,能够按照自定义规矩完成自定义分区需求完成Partitioner接口
图解几个分区算子
具体的Api参考官网案例
DataSink
DataSink是 输出组件,担任把核算好的数据输出到其它存储介质中。
一般会输出到音讯队列或许数据库,print办法一般适用于测试
Flink内置Connectors
- Kafka
- Cassandra
- Elasticsearch
- Hdfs
- RabbitMQ
- NiFi
- JDBC
- redis
容错保障
Redis:at least once
Kafka:at least once/exactly once
DataSet Api
DataSet也可分为三块来进行剖析
- DataSource
- Transformation
- Sink
DataSource
针对DataSet批处理而言,运用最多的是读取HDFS数据。
-
根据调集
- fromCollection
-
根据文件
- readTextFile(path),读取hdfs中的数据文件
Transformation
常见算子如下:
算子 | 阐明 |
---|---|
map | 输入一个元素进行处理,回来一个元素 |
mapPartation | 相似map,一次处理一个分区的数据 |
flatMap | 输入一个元素进行处理,能够回来多个元素 |
filter | 对数据进行过滤,契合条件的数据会被留下 |
reduce | 对当时元素和上一次的成果进行聚合操作 |
aggregation | sum(),min(),max()等 |
用法与DataStream相关的算子相似
DataSet一些常用的算子:
算子 | 解释 |
---|---|
distinct | 回来数据集中去重之后的元素 |
join | 内衔接 |
outerjoin | 外衔接 |
cross | 获取两个数据集的笛卡尔积 |
union | 回来多个数据集的总和,数据类型需求共同 |
first-n | 获取调集中的前N个元素 |
-
distinct:去重算子
-
join:内衔接,衔接两份数据,相似sql join
-
outer join:相似SQL左右衔接
-
cross:获取笛卡尔积
-
union:回来两个数据集的总和,数据类型需求共同
-
first-n:获取调集中前N个元素
- 这个比较常用,实际工作中咱们经常会核算TopN的产品信息,或许商户信息等
DataSink
Flink针对DataSet供给了一些现已完成好的数据目的地
其中最常见的是向HDFS中写入数据:
-
writeAsText():将元素以字符串方法逐行写入,这些字符串经过调用每个元素的toString()办法来获取
-
writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的,每个字段的值来自目标
的toString()办法
-
print:打印每个元素的toString()办法的值,测试时用
Table Api & Flink SQL
Table API和SQL的由来:
Flink针对规范的流处理和批处理供给了两种联系型API,Table API和SQL。
- Table API答应用户以一种很直观的方法进行select 、fifilter和join操作。
- Flink SQL根据 Apache Calcite完成规范SQL。
- Flink Table API、SQL和Flink的DataStream API、DataSet API是严密联系在一起的。
- Table API和SQL是一种联系型 API,用户能够像操作 Mysql 数据库表相同的操作数据,而不需求写代码,更不需求手工的对代码进行调优
依靠信息:
flink-table-api-java-bridge_2.12
版别:1.11.0
flink-table-planner-blink_2.12
版别:1.11.0
Table API和SQL经过join API集成在一起,这个join API的中心概念是Table,Table能够作为查询的输入和输出。
运用案例
// 获取TableEnvironment
EnvironmentSettings sSettings = EnvironmentSettings.newInstance().use
TableEnvironment sTableEnv = TableEnvironment.create(sSettings);
// 创立输入表
sTableEnv.executeSql("" +
"create table myTable(\n" +
"id int,\n" +
"name string\n" +
") with (\n" +
"'connector.type' = 'filesystem',\n" +
"'connector.path' = 'D:\data\source',\n" +
"'format.type' = 'csv'\n" +
")");
// 书写sql句子
Table result = sTableEnv.sqlQuery("select id,name from myTable")
// 执行并打印
result.execute().print()
DataStream、DataSet与Table互转
Table API和SQL能够很容易的和DataStream和DataSet程序集成到一块。
经过TableEnvironment ,能够把 DataStream 或 者 DataSet注册为Table。
这样就能够运用Table API 和 SQL查询了。
1.DataStream创立表
- 创立view视图
- 创立table目标
//获取StreamTableEnvironment
StreamExecutionEnvironment ssEnv = StreamExecutionEnvironment.getExec
EnvironmentSettings ssSettings = EnvironmentSettings.newInstance().us
StreamTableEnvironment ssTableEnv = StreamTableEnvironment.create(ssE
//获取DataStream
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<Integer,String>(1,"jack"));
data.add(new Tuple2<Integer,String>(2,"tom"));
data.add(new Tuple2<Integer,String>(3,"mick"));
DataStreamSource<Tuple2<Integer, String>> stream = ssEnv.fromCollecti
//第一种:将DataStream转换为view视图
ssTableEnv.createTemporaryView("myTable",stream,$("id"),$("name"));
ssTableEnv.sqlQuery("select * from myTable where id > 1").execute().print
//第二种:将DataStream转换为table目标
Table table = ssTableEnv.fromDataStream(stream, $("id"), $("name"));
table.select($("id"), $("name"))
.filter($("id").isGreater(1))
.execute()
.print();
2.DataSet创立表
新的Blink引擎不支撑这种操作,老版别支撑,现已不常用了。
将 Table 转换为 DataStream 或许 DataSet 时,你需求指定生成的 DataStream 或许 DataSet 的数据类型
通常最方便的挑选是转换成 Row
- Row: 经过角标映射字段,支撑恣意数量的字段,支撑 null 值,无类型安全(type-safe)查看
- POJO: Java中的实体类,这个实体类中的字段称号需求和Table中的字段称号坚持共同,支撑恣意数量的字段,支撑null值,有类型安全查看。
- Case Class: 经过角标映射字段,不支撑null值,有类型安全查看。
- Tuple: 经过角标映射字段,Scala中约束22个字段,Java中约束25个字段,不支撑null值,有类型安全查看。
- Atomic Type: Table 必须有一个字段,不支撑 null 值,有类型安全查看。
3.将表转换成 DataStream
两种形式
- Append Mode:仅附加,不更新
- Retract Mode:能够始终运用此形式,它运用一个Boolean标识来编码INSERT和DELETE更改