滑动窗口源码
核心代码类-SlidingWindow
package com.jd.platform.hotkey.worker.tool;
import cn.hutool.core.date.SystemClock;
import org.checkerframework.checker.units.qual.C;
import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;
/**
* 滑动窗口。该窗口相同的key都是单线程核算。
*
* @author wuweifeng wrote on 2019-12-04.
*/
public class SlidingWindow {
/**
* 循环行列,便是装多个窗口用,该数量是windowSize的2倍
*/
private AtomicLong[] timeSlices;
/**
* 行列的总长度
*/
private int timeSliceSize;
/**
* 每个时刻片的时长,以毫秒为单位
*/
private int timeMillisPerSlice;
/**
* 共有多少个时刻片(即窗口长度)
*/
private int windowSize;
/**
* 在一个完好窗口期内答应经过的最大阈值
*/
private int threshold;
/**
* 该滑窗的开始创立时刻,也便是第一个数据
*/
private long beginTimestamp;
/**
* 终究一个数据的时刻戳
*/
private long lastAddTimestamp;
public static void main(String[] args) throws InterruptedException {
//1秒一个时刻片,窗口共5个
SlidingWindow window = new SlidingWindow(2, 40);
//循环屏障
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
CountDownLatch latch=new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
//步骤一致
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
boolean hot = window.addCount(2);
System.out.println(hot);
window.print();
latch.countDown();
}
}).start();
}
latch.await();
for (int i = 0; i < 100; i++) {
System.out.println(window.addCount(2));
window.print();
System.out.println("--------------------------");
try {
Thread.sleep(102);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void print() {
Arrays.asList(timeSlices).stream().forEach(e->System.out.print(e+" "));
System.out.println();
}
public SlidingWindow(int duration, int threshold) {
//超过10分钟的按10分钟
if (duration > 600) {
duration = 600;
}
//要求5秒内勘探出来的,
if (duration <= 5) {
this.windowSize = 5;
this.timeMillisPerSlice = duration * 200;
} else {
this.windowSize = 10;
this.timeMillisPerSlice = duration * 100;
}
this.threshold = threshold;
// 确保存储在至少两个window
this.timeSliceSize = windowSize * 2;
reset();
}
public SlidingWindow(int timeMillisPerSlice, int windowSize, int threshold) {
this.timeMillisPerSlice = timeMillisPerSlice;
this.windowSize = windowSize;
this.threshold = threshold;
// 确保存储在至少两个window
this.timeSliceSize = windowSize * 2;
reset();
}
/**
* 初始化
*/
private void reset() {
beginTimestamp = SystemClock.now();
//窗口个数
AtomicLong[] localTimeSlices = new AtomicLong[timeSliceSize];
for (int i = 0; i < timeSliceSize; i++) {
localTimeSlices[i] = new AtomicLong(0);
}
timeSlices = localTimeSlices;
}
/**
* 核算当时地点的时刻片的方位
*/
private int locationIndex() {
long now = SystemClock.now();
//假如当时的key现已超出一整个时刻窗口了,那么就直接初始化就行了,不用去核算了
if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
reset();
}
int index = (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
if (index < 0) {
return 0;
}
return index;
}
/**
* 添加count个数量
*/
public synchronized boolean addCount(long count) {
//当时自己地点的方位,是哪个小时刻窗(时刻片)
int index = locationIndex();
// System.out.println("index:" + index);
//然后清空自己前面windowSize到2*windowSize之间的数据格的数据
//比如1秒分4个窗口,那么数组合计8个窗口
//当时index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来便是该窗口内的总和
//后边的一半需要整理的, 前面的一般,是要核算的, 一半是一个时刻窗的巨细
clearFromIndex(index);
int sum = 0;
// 在当时时刻片里持续+1
sum += timeSlices[index].addAndGet(count);
//加上前面几个时刻片
for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
lastAddTimestamp = SystemClock.now();
return sum >= threshold;
}
private void clearFromIndex(int index) {
for (int i = 1; i <= windowSize; i++) {
int j = index + i;
if (j >= windowSize * 2) {
j -= windowSize * 2;
}
timeSlices[j].set(0);
}
}
}
核心字段
/**
* 循环行列,便是装多个窗口用,该数量是windowSize的2倍
*/
private AtomicLong[] timeSlices;
/**
* 行列的总长度
*/
private int timeSliceSize;
/**
* 每个时刻片的时长,以毫秒为单位
*/
private int timeMillisPerSlice;
/**
* 共有多少个时刻片(即窗口长度)
*/
private int windowSize;
/**
* 在一个完好窗口期内答应经过的最大阈值
*/
private int threshold;
/**
* 该滑窗的开始创立时刻,也便是第一个数据
*/
private long beginTimestamp;
/**
* 终究一个数据的时刻戳
*/
private long lastAddTimestamp;
SlidingWindow:结构函数
- 10 是说采集的一个数组下标 的时刻是 10*100/1000=1s 也便是除以10 的秒数
- 小于=5的话 就默许翻倍 除以10 也便是 5 代表着1s 而不是0.5s
- 1秒一个时刻片,窗口共5个 大于1s的时刻片窗口是10个
SlidingWindow window = new SlidingWindow(10, 40 ); - 窗口 我们设置为10个时刻片
public SlidingWindow(int duration, int threshold) {
//超过10分钟的按10分钟
if (duration > 600) {
duration = 600;
}
//要求5秒内勘探出来的,
if (duration <= 5) {
this.windowSize = 5;
this.timeMillisPerSlice = duration * 200;
} else {
this.windowSize = 10;
this.timeMillisPerSlice = duration * 100;
}
this.threshold = threshold;
// 确保存储在至少两个window
this.timeSliceSize = windowSize * 2;
reset();
}
我们结构滑动窗口的时分先设置一个时刻片的时长,和阈值 一般会核算最近十个时刻片的核算数量 和阈值 比较 看是不是热度key
每个能够占用的核算时长为
timeMillisPerSlice * windowSize
locationIndex:核算当时地点的时刻片的方位
核算当时地点的时刻片的方位源码
/**
* 核算当时地点的时刻片的方位
*/
private int locationIndex() {
long now = SystemClock.now();
//假如当时的key现已超出一整个时刻窗口了,那么就直接初始化就行了,不用去核算了
if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
reset();
}
int index = (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
if (index < 0) {
return 0;
}
return index;
}
这个核算公式 意思便是 从开始 到现在 过了多少个时刻片, 由于运用的是 长度为timeSliceSize 的环形buffer 方式,所以再跟行列长度取模下 得到的成果便是 当时地点的时刻片的方位
int index = (int) (((now – beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
有两个时刻戳 beginTimestamp 和 lastAddTimestamp
/**
* 该滑窗的开始创立时刻,也便是第一个数据 reset 的时分更新
*/
private long beginTimestamp;
/**
* 终究一个数据的时刻戳 addcount的时分更新
*/
private long lastAddTimestamp;
addCount:添加count个数量
/**
* 添加count个数量
*/
public synchronized boolean addCount(long count) {
//当时自己地点的方位,是哪个小时刻窗(时刻片)
int index = locationIndex();
// System.out.println("index:" + index);
//然后清空自己前面windowSize到2*windowSize之间的数据格的数据
//比如1秒分4个窗口,那么数组合计8个窗口
//当时index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来便是该窗口内的总和
//后边的一半需要整理的, 前面的一般,是要核算的, 一半是一个时刻窗的巨细
clearFromIndex(index);
int sum = 0;
// 在当时时刻片里持续+1
sum += timeSlices[index].addAndGet(count);
//加上前面几个时刻片
for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
lastAddTimestamp = SystemClock.now();
return sum >= threshold;
}
滑动窗口存在的高并发问题
hotkey 热度核算的高并发问题
作者在进行热度核算的时分,为了提升 核算效率 消费行列运用了多线程(cpu核数)进行消费
@Bean
public Consumer consumer() {
int nowCount = CpuNum.workerCount();
//将实践值赋给static变量
if (threadCount != 0) {
nowCount = threadCount;
} else {
if (nowCount >= 8) {
nowCount = nowCount / 2;
}
}
List<KeyConsumer> consumerList = new ArrayList<>();
for (int i = 0; i < nowCount; i++) {
KeyConsumer keyConsumer = new KeyConsumer();
keyConsumer.setKeyListener(iKeyListener);
consumerList.add(keyConsumer);
threadPoolExecutor.submit(keyConsumer::beginConsume);
}
return new Consumer(consumerList);
}
public void beginConsume() {
while (true) {
try {
HotKeyModel model = QUEUE.take();
if (model.isRemove()) {
iKeyListener.removeKey(model, KeyEventOriginal.CLIENT);
} else {
iKeyListener.newKey(model, KeyEventOriginal.CLIENT);
}
//处理完毕,将数量加1
totalDealCount.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void newKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
//cache里的key
String key = buildKey(hotKeyModel);
//判别是不是刚热不久
Object o = hotCache.getIfPresent(key);
if (o != null) {
return;
}
SlidingWindow slidingWindow = checkWindow(hotKeyModel, key);
//看看hot没
boolean hot = slidingWindow.addCount(hotKeyModel.getCount());
。。。。。。。
}
}
/**
* 添加count个数量
*/
public synchronized boolean addCount(long count) {
//当时自己地点的方位,是哪个小时刻窗(时刻片)
int index = locationIndex();
clearFromIndex(index);
int sum = 0;
// 在当时时刻片里持续+1
sum += timeSlices[index].addAndGet(count);
//加上前面几个时刻片
for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
lastAddTimestamp = SystemClock.now();
return sum >= threshold;
}
每个线程获取到HotKeyModel今后 都在进行 iKeyListener.newKey(model, KeyEventOriginal.CLIENT); 热度核算。终究都会调用
boolean hot = slidingWindow.addCount(hotKeyModel.getCount());
从而调用
sum += timeSlices[index].addAndGet(count);
假如多个线程 对同一个key进行热度核算,也就会获取同一个 滑动窗口,但是
sum += timeSlices[index].addAndGet(count);
是并发不安全的
好在 热度核算这种成果数据成果正确性要求不严格,能够必定程度承受,
就像caffiene缓存 进行内存淘汰的时分,对缓存key 的调用次数核算运用W-TinyLFU 算法,也是不精确核算,一般这种热度核算或许点击次数核算,不要求 肯定精度的事务场景,要优先考虑的时分 内存占用 核算复杂度等
解决方案
能够参阅 caffiene缓存 条状环形buff 替换掉目前的环形buff 循环行列,
/**
* 循环行列,便是装多个窗口用,该数量是windowSize的2倍
*/
private AtomicLong[] timeSlices;
这样每个线程的核算节点都会被分散到不同的索引方位 ,一起这样也添加的设计和核算的复杂度。