我们好,我是老兵。
Flink依据流编程
模型,内置了许多强壮功用的算子,能够协助咱们快速开发使用程序。
作为Flink开发内行,大多算子的写法和场景想来已是了然于胸,但是运用进程常常会有一些小小的问题:
- 部分算子长时刻未用,忘了用法。。
- 某些场景挑选什么算子?怎么挑选?含糊不清。。
工欲善其事,必先利其器!快速高效的运用适宜的算子开发程序,往往能够达到事半功倍的效果。
想着好记忆不如烂笔头
这个道理,特此收拾一份常见的Flink算子开发手册
!!也作为自己的作业笔记。欢迎我们保藏~
1 DataStream API
Flink DataStream API
让用户灵敏且高效编写Flink流式程序。首要分为DataSource
模块、Transformation
模块以及DataSink
模块。
- Source模块界说
数据接入
功用,包含内置数据源和外部数据源。 - Transformation模块界说DataStream数据流各种
转化
操作。 - Sink模块界说数据
输出
功用,存储成果到外部存储介质中。
履行环境: StreamExecutionEnvironment
体系模块 :
DataSouce、Transformation和DataSink
2 DataSource 输入
DataSource输入模块界说了DataStream API中的数据输入操作,Flink输入数据源分为内置数据源
和第三方数据源
两种类型。
- 内置数据源包含
文件
、Socket网络端口
以及调集
类型数据,不需要引进其他依靠库,在Flink体系内部现已完结。 - 第三方数据源界说了Flink和外部体系数据交互逻辑,例如
Apache Kafka Connector
、Elastic Search Connector
等。 - 一起用户能够自界说数据源。
2.1 readTextFile、readFile算子
支撑读取文本文件到Flink体系,转化成DataStream数据集。
-
readTextFile算子
直接读取体系文本文件(.log|.txt …) -
readFile算子
能够指定InputFormat
读取特定数据类型的文件(包含CSV、JSON或许自界说InputFormat)
// 读取文本文件
val textInputStream = env.readTextFile(
"/data/example.log")
// 指定InputFormat,读取CSV文件
val csvInputStream = env.readFile(
// 能够自界说类型(InputFormat)
new CsvInputFormat[String] (
new Path("/data/example.csv")
) {
override def fillRecord(out: String,
onbjects: Array[AnyRef]: String) = {
return null
}
}, "/data/example.csv"
)
2.2 Socket算子
支撑从Socket端口读取数据,转化成DataStream算子。
-
算子参数
:Ip地址、端口、delimiter字符串切割符、最大重试次数maxRetry -
maxRetry
首要供给使命失利重连机制。当设定为0时,Flink使命直接停止。 - Unix环境下,履行
nc -lk [:port]
启动网络服务
// Flink程序读取Socket端口(9999)数据
val socketDataStream =
env.socketTextStream("localhost", 9999)
2.3 调集算子
支撑操作Flink内置调集类(Collection),转化成DataStream。
- 支撑
Java
、Scala
算子常见调集类 - 实质是将本地调集数据分发到长途履行;适用于
本地测验
,留意数据结构类型的共同性
// fromElements元素调集转化
val elementDataStream =
env.fromElements(
Tuple2('aa', 1L),Tuple2('bb', 2L)
)
// fromCollection数组转化(Java)
String[] collections = new String[] {
"aa", "bb"
};
DataStream<String> collectionDatastream =
env.fromCollection(
Arrays.asList(collections)
);
// List列表转化(Java)
List<String> arrays = new ArrayList<>();
arrays.add("aa")
arrays.add("bb")
DataStream<String> arrayDataStream =
env.fromCollection(arrays)
2.4 外部数据源算子
支撑从第三方数据源体系读取数据,转化成DataStreams算子。
- 常见外部数据源算子: Hadoop FileSystem、ElasticSearch、 Apache Kafka、RabbitMQ等
- 运用时需要在Maven环境中增加jar包依靠(pom)
// Maven配置
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-1.2_2.12</artifactId>
<version>1.9.1</version>
</dependency>
// 读取Kafka数据源(Java)
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "localhost:9092");
...
DataStream<String> kafkaStream =
env.addSource(
new FlinkKafkaStream010<> (
"topic-1",
new SimpleStringSchema(),
properties
)
)
2.5 自界说数据源算子
支撑完结内置的Function相关接口,自界说数据源。
详细的内置办法包含但不限于:
- SourceFunction接口
- ParallelSourceFunction接口
- RichParallelSourceFunction类
后续再经过env的addSource()办法增加,详细完结不展开。
3 DataStream转化
Flink对若干个DataStream操作生成新的DataStream,该进程被称为Transformation
。
Flink程序中大大都逻辑均在Transformation进程中完结,包含转化
、过滤
、排序
、衔接
、关联
、挑选
和聚合
等操作。
留意和Spark中transformation的区别。
Flink中DataStream转化能够分为几种类型:
-
Single DataStream
: 单个DataStream数据集元素处理逻辑 -
Multi DataStream
: 多个DataStream数据集元素处理逻辑 -
物理分区
:数据集并行度和数据分区处理
3.1 Map算子(#Single)
对数据会集每个元素进行转化操作,生成新DataStream。
- 底层为MapFunction算子。经过调用map函数,对每个元素履行操作。
- 常用于数据清洗、核算和转化等。
val inputStream = env.fromElements(
("aa", 1), ("bb", 2), ("cc", 3)
)
// 第一种写法: map操作,完结每个元素 + 1
val mapStream1 = inputStream.map(
t => (t._1, t.2 + 1)
)
// 第二种写法: 指定MapFunction
val mapStream2 = inputStream.map(
new MapFunction[(String, Int), (String, Int)] {
override def map(t: (String, Int)):
(String, Int) = {
(t._1, t._2 + 1)}
}
)
3.2 FlatMap算子(#Single)
支撑对数据会集一切元素转化成多个元素,生成新DataStream。
val flatDataStream = env.fromCollections()
val resultStream = flatDataStream.flatMap{
line => line.split(",")
}
3.3 filter算子(#Single)
支撑对数据集进行过滤挑选,生成新的DataStream
// 通配符写法
val filterDataStream = dataStream.fliter {
_ % 2 == 0
}
// 指定运算符表达式
val filterDS = dataStream.filter(
x => x % 2 == 0
)
3.4 keyBy算子(#Single)
依据指定Key对DataStream数据集分区,生成新的KeyedStream
- 相同Key值的数据归并到同一分区
- 类似于Spark中的groupByKey
val inputStream = env.fromElements(
("aa", 11), ("aa", 22), ("bb", 33)
)
// 依据第一个字段作为key分区
// 转化为KeyedStream[(String, String), Tuple]
val keyedStream: inputStream.keyBy(0)
3.5 reduce算子(#Single)
支撑对输入KeyedSteam依据reduce()聚合,生成新的DataStream
- 依据key分区聚合构成KeyedStream
- 支撑运算符和自界说reduceFunc函数
val inputStream = env.fromElements(
("aa", 11), ("bb", 33), ("cc", 22), ("aa", 21)
)
// 指定第一个字段分区key
val keyedStream = inputStream.keyBy(0)
// 对第二个字段进行累加求和
val reduceDataStream = keyedStream.reduce {
(t1, t2) => (t1._1, t1._2 + t2._2)
}
自界说Reduce函数,需要完结匿名类。
val reduceDataStream = keyedStream.reeduce(
new ReduceFunction[(String, Int)] {
override def reduce(t1: (String,Int),
t2: (String, Int)): (String, Int) = {
(t1._1, t1._2 + t2._2)
}
}
)
3.6 aggregations算子(#Single)
DataStream基础聚合算子,经过输入KeyedStream进行聚合生成新的DataStream
- 依据指定字段聚合,可自界说聚合逻辑
- 底层封装了sum、min、max等函数
val inputStream = env.fromElements(
(1, 7), (2, 8), (3, 11), (2, 3)
)
// 指定第一个字段分区key
val keyedStream:
[(Int, Int), Tuple] = inputStream.keyBy(0)
// 第二个字段sum核算
val sumStream = keyedStream.sum(1)
// 终究输出成果
sumStream.print()
3.7 Connect兼并算子(#Multi)
兼并多种类型数据集,并保存原数据集的数据类型,生成ConnectedStream
-
同享
状况数据,可相互获取数据集状况 - 某些场景下可代替join算子,变相完结flink
双流join
功用
// 创立不同数据类型数据集
val stream1 = env.fromElements(
("aa", 3), ("bb", 4), ("cc", 11), ("dd", 22)
)
val stream2 = env.fromElements(
(1, 2, 11, 8)
)
// 衔接数据集
// 回来[(String, Int), Int]
// 类似: [("aa", 3),1]
val connectedStream = stream1.connect(stream2)
3.8 Connect算子—CoMap(#Multi)
ConnectedStream数据流的Map功用算子,操作兼并数据集一切元素
- 界说CoMapFunction对象,参数为输入数据类型、输出数据类型和mapFunc
- 子map函数多线程交替履行,生成终究的兼并目标数据集
// 上文Connected操作后构成的数据流
// 参数: 第1个为stream1类型;第2个为stream2类型;第3个为stream3类型
val resultStream = connnectedStream.map(
new CoMapFunction[(String, Int), Int, (Int, String)] {
// 界说第一个数据集处理逻辑,输入值为stream1
override def map1(in1: (String, Int)): (Int, String) = {
(in1._2, in1._1)
}
// 界说第二个数据集处理逻辑,输入值为stream2
override def map2(in2: Int): (Int,String)={
(in2, "default")
}
)
3.9 Connect算子—CoFlatMap(#Multi)
ConnectedStream数据流的flatmap功用算子
在flatmap()办法中指定CoFlatMapFunction,并别离完结flatmap1()和flatmap2()函数。
val resultStream2 = connectedStream.flatMap(
new CoFlatMapFunction[(String, Int), Int, (String, Int, Int)] {
// 举例: 函数中同享变量,完结两个数据调集并
var value = 0
// 界说第1个数据集处理函数
override def flatMap1(in1: (String, Int),
collect: Collector[(String, Int, Int)]): Unit = {
collect.collect((in1._1, in1._2, value))
}
}
// 界说第2个数据集处理函数
override def flatMap2(in2: Int, collect: Collector[(String, Int, Int)]): Unit = {
value = in2
}
)
3.10 Union算子(#Multi)
将两个或许多个数据调集并,生成与输入数据集类型共同的DataStream
- 输入数据集的数据类型要求共同
- 输出数据集的数据类型和输入数据共同
- 留意和connect算子的区别
val stream1 = env.fromElements(
("aa", 3), ("bb", 22), ("cc", 45)
)
val stream2 = env.fromElements(
("dd", 23), ("ff", 21), ("gg", 89)
)
val stream3 = ....
// 兼并数据集
val unionStream = stream1.union(stream2)
val unionStream2 = stream1.union(
stream2, stream3
)
3.11 Split算子(#Multi)
将DataStream数据集按照条件拆分,转化成两个数据集的DataStream算子
- 将接入的数据路由到多个输出数据集,在
split
函数中界说拆分逻辑 - 能够被看作是union的逆向完结
val stream1 = env.fromElements(
("aa", 3), ("bb", 33),
("cc", 56),("aa", 23), ("cc", 67)
)
// 依据第二个字段的奇偶性标记数据(切分)
val splitStream = stream1.split(
v => if (v._2 % 2 == 1 Seq("even")
else Seq("odd"))
)
3.12 Select算子(#Multi)
Select挑选算子,经过条件挑选数据会集元素,生成新的DataStream
// 挑选偶数数据
val evenStream = splitedStream.select("even")
//挑选一切数据
val allStream = splitedStream.select("even", "odd")
3.13 window窗口算子(时刻机制)
Flink的窗口算子是实时核算的中心算子,常用于某固定时刻内指标核算
1)窗口API
Flink供给了高级窗口API算子,封装底层窗口操作,包含窗口类型、触发器、侧输出等。一起依据上游输入Stream流分为Non-Keyed和Keyed两种类型。
- Non-keyed(上游为Non-KeyedStream) 直接调用windowAll(),获取全局核算
val inputStream: DataStream = ...
// 当传入为KeyedStream时,调用window()函数
inputStream.keyBy(0).window(new WindowFunc(...))
// 当传入为不做处理的Non-Keyed输入Stream流
// 直接运用windowAll()全局核算
inputStream.windowAll(new WindowFunc(...))
- keyed(上游为KeyedStream类型)
调用DataStream的内置window()
stream.keyBy(..//keyed输入流.)
.window(..//窗口类型.)
.trigger(.//触发器<可选>..)
.evictor(.//剔除器<可选>.)
.allowdedLateness(.//推迟处理机制.)
.sideOutputLateDate(.//侧输出.)
.reduce/fold.aggregate/apply(.//核算函数.)
2)窗口类型
依据窗口的分配方法分为: 翻滚
、滑动
、会话
和全局
等,别离支撑不同窗口流动方法和规模。
一起支撑事件时刻和处理时刻数据流。
-
Tumbling Window Join (翻滚窗口)
-
Sliding Window Join (滑动窗口)
-
Session Widnow Join(会话窗口)
以十分钟时刻滑动窗口核算案例说明:
val tumblingStream = inputStream
.keyBy(0)
.window(
TumblingEventTimeWindows.of(
Time.seconds(10))
).process(...)
4 DataSink输出
Flink读取数据源,经过系列Transform操作后,成果一般转存至外部存储介质或许下游,即Flink的DataSink
进程。
Flink将外部存储的衔接逻辑封装在Connector衔接器中,常见的有:
- Apache Kafka
- ElasticSearch
- Hadoop FileSystem
- Redis
- 文件体系、端口
4.1 文件|端口
支撑文件、客户端、Socket网络输出,为Flink内置算子,不需要依靠三方库
常见有writeAsCSV(本地文件)、writeToSocket(Socket网络)
// 本地csv
inputStream.writeAsCsv(
"file://path/xx.csv", WriteMode.OVERWRITE
)
// Socket网络
inputStream.writeToSocket(
host, post, new SimpleStringSchema()
)
4.2 外部第三方
依据SinkFunction界说,需要引进外部三方依靠库,设置三方体系参数
val dataStream = ...
// 界说FlinkKafkaProducer
val kafkaProducer = new FlinkKafkaProducer011[Sting] (
"localhost:9092", //kafka broker list衔接
"xxx-topic", // kafka topic
new SimpleStringSchema() //序列化
)
// 增加SinkFunc
dataStream.addSink(kafkaProducer())
5 总结
Flink内置的算子库种类全、功用强壮,熟练掌握算子的运用方法和场景使用,是实时核算的必备技术。
后面还会持续更新此系列,欢迎增加我的个人微信: youlong525
,一起学习交流~
未完待续。。
》》》更多好文,欢迎重视公众号: 大数据兵工厂