实践工作中,数据到来不会总是有序的,所以window需求结合水印来运用,以满意实践场景。但是既便用了水印,也有可能存在漏网之鱼,这时就要用到侧流来将漏网数据收住,提高数据精准度。
运转方法
1、将下述代码贴到工程里,留意需求依靠flink的相关包
2、调试步骤1:linux 翻开 nc东西,发动控制台,准备数据输入
3、调试步骤2:发动工程,调查侧流与干流的数据
例子数据
/**
* 1000,a,1
* 2000,a,1
* 4998,a,1
* 4999,a,1
* 6999,a,1
* 12000,a,1
* 当到了12000时,履行的时,第二个窗口内的数据
* 第1个窗口:[0000,5000)
* 第2个窗口:[5000,10000)
* 注:flink窗口是左闭右开的
*/
代码如下
package com.flink.watermarks;
import cn.hutool.core.date.format.FastDateFormat;
import lombok.val;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class EventTimeWMApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
test01(env);
env.execute("WindowApp");
}
/**
* EventTime结合WM运用
* 输入数据格式:时刻字段、单词、次数
* @param env
*/
public static void test01(StreamExecutionEnvironment env){
//用于接纳推迟到来的数据(窗口已完毕,还有相应的时刻段的数据进来)
OutputTag<Tuple2<String,Integer>> outputTag = new OutputTag<Tuple2<String,Integer>>("late-data"){};
DataStreamSource<String> source = env.socketTextStream("localhost",9527);
//开端建数据源时,直接建watermarker更好一些,而不是在过程中建。
SingleOutputStreamOperator<String> lines = source.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
//乱序的watermarker处理器
@Override
public long extractTimestamp(String element) {
return Long.parseLong(element.split(",")[0]);//获取数据中的第一列,被当成触发5秒一个窗口的时刻序列(对比值)
}
}
);
SingleOutputStreamOperator<Tuple2<String,Integer>> mapStream = lines.map(new MapFunction<String, Tuple2<String,Integer>>(){
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] splits = value.split(",");
try{
return Tuple2.of(splits[1],Integer.parseInt(splits[2].trim()));
}catch (Exception e){
e.printStackTrace();
return new Tuple2("null",Integer.MIN_VALUE);
}
}
});
/**
* 1000,a,1
* 2000,a,1
* 4998,a,1
* 4999,a,1
* 6999,a,1
* 12000,a,1
* 当到了12000时,履行的时,第二个窗口内的数据
* 第1个窗口:[0000,5000)
* 第2个窗口:[5000,10000)
* 注:flink窗口是左闭右开的
*/
SingleOutputStreamOperator window = mapStream.keyBy(x -> x.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//sideOutputLateData(OutputTag<T> outputTag) ,用于处理窗口完毕,还有数据进来的情况
.sideOutputLateData(outputTag)//接纳推迟数据
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) throws Exception {
System.out.println(" ---reduce invoked ---" v1.f0 "[(" v1.f1 "," v2.f1 ")]===>" (v1.f1 v2.f1));
return Tuple2.of(v1.f0, v1.f1 v2.f1);
}//以增量的方法聚合
}, new ProcessWindowFunction<Tuple2<String, Integer>, Object, String, TimeWindow>() {
FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
//双窗口的方法
@Override
public void process(String s, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
for (Tuple2<String, Integer> element : elements) {
out.collect("[" format.format(context.window().getStart()) "]===> " format.format(context.window().getEnd()) "]," element.f0 "===>" element.f1);
//});
} //窗口的开端时刻
}
});
//.sum(1)
//干流数据
window.print();
//侧流数据
DataStream<Tuple2<String,Integer>> sideOutput = window.getSideOutput(outputTag); //得到推迟数据
sideOutput.print();
}
}