滑动窗口源码

核心代码类-SlidingWindow

package com.jd.platform.hotkey.worker.tool;
import cn.hutool.core.date.SystemClock;
import org.checkerframework.checker.units.qual.C;
import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;
/**
 * 滑动窗口。该窗口相同的key都是单线程核算。
 *
 * @author wuweifeng wrote on 2019-12-04.
 */
public class SlidingWindow {
    /**
     * 循环行列,便是装多个窗口用,该数量是windowSize的2倍
     */
    private AtomicLong[] timeSlices;
    /**
     * 行列的总长度
     */
    private int timeSliceSize;
    /**
     * 每个时刻片的时长,以毫秒为单位
     */
    private int timeMillisPerSlice;
    /**
     * 共有多少个时刻片(即窗口长度)
     */
    private int windowSize;
    /**
     * 在一个完好窗口期内答应经过的最大阈值
     */
    private int threshold;
    /**
     * 该滑窗的开始创立时刻,也便是第一个数据
     */
    private long beginTimestamp;
    /**
     * 终究一个数据的时刻戳
     */
    private long lastAddTimestamp;
    public static void main(String[] args) throws InterruptedException {
        //1秒一个时刻片,窗口共5个
        SlidingWindow window = new SlidingWindow(2, 40);
        //循环屏障
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
        CountDownLatch latch=new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //步骤一致
                        cyclicBarrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    boolean hot = window.addCount(2);
                    System.out.println(hot);
                    window.print();
                    latch.countDown();
                }
            }).start();
        }
        latch.await();
        for (int i = 0; i < 100; i++) {
                System.out.println(window.addCount(2));
            window.print();
            System.out.println("--------------------------");
            try {
                Thread.sleep(102);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    private void print() {
        Arrays.asList(timeSlices).stream().forEach(e->System.out.print(e+" "));
        System.out.println();
    }
    public SlidingWindow(int duration, int threshold) {
        //超过10分钟的按10分钟
        if (duration > 600) {
            duration = 600;
        }
        //要求5秒内勘探出来的,
        if (duration <= 5) {
            this.windowSize = 5;
            this.timeMillisPerSlice = duration * 200;
        } else {
            this.windowSize = 10;
            this.timeMillisPerSlice = duration * 100;
        }
        this.threshold = threshold;
        // 确保存储在至少两个window
        this.timeSliceSize = windowSize * 2;
        reset();
    }
    public SlidingWindow(int timeMillisPerSlice, int windowSize, int threshold) {
        this.timeMillisPerSlice = timeMillisPerSlice;
        this.windowSize = windowSize;
        this.threshold = threshold;
        // 确保存储在至少两个window
        this.timeSliceSize = windowSize * 2;
        reset();
    }
    /**
     * 初始化
     */
    private void reset() {
        beginTimestamp = SystemClock.now();
        //窗口个数
        AtomicLong[] localTimeSlices = new AtomicLong[timeSliceSize];
        for (int i = 0; i < timeSliceSize; i++) {
            localTimeSlices[i] = new AtomicLong(0);
        }
        timeSlices = localTimeSlices;
    }
    /**
     * 核算当时地点的时刻片的方位
     */
    private int locationIndex() {
        long now = SystemClock.now();
        //假如当时的key现已超出一整个时刻窗口了,那么就直接初始化就行了,不用去核算了
        if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
            reset();
        }
        int index = (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
        if (index < 0) {
            return 0;
        }
        return index;
    }
    /**
     * 添加count个数量
     */
    public synchronized boolean addCount(long count) {
        //当时自己地点的方位,是哪个小时刻窗(时刻片)
        int index = locationIndex();
//        System.out.println("index:" + index);
        //然后清空自己前面windowSize到2*windowSize之间的数据格的数据
        //比如1秒分4个窗口,那么数组合计8个窗口
        //当时index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来便是该窗口内的总和
        //后边的一半需要整理的, 前面的一般,是要核算的, 一半是一个时刻窗的巨细
        clearFromIndex(index);
        int sum = 0;
        // 在当时时刻片里持续+1
        sum += timeSlices[index].addAndGet(count);
        //加上前面几个时刻片
        for (int i = 1; i < windowSize; i++) {
            sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
        }
        lastAddTimestamp = SystemClock.now();
        return sum >= threshold;
    }
    private void clearFromIndex(int index) {
        for (int i = 1; i <= windowSize; i++) {
            int j = index + i;
            if (j >= windowSize * 2) {
                j -= windowSize * 2;
            }
            timeSlices[j].set(0);
        }
    }
}

核心字段

/**
 * 循环行列,便是装多个窗口用,该数量是windowSize的2倍
 */
private AtomicLong[] timeSlices;
/**
 * 行列的总长度
 */
private int timeSliceSize;
/**
 * 每个时刻片的时长,以毫秒为单位
 */
private int timeMillisPerSlice;
/**
 * 共有多少个时刻片(即窗口长度)
 */
private int windowSize;
/**
 * 在一个完好窗口期内答应经过的最大阈值
 */
private int threshold;
/**
 * 该滑窗的开始创立时刻,也便是第一个数据
 */
private long beginTimestamp;
/**
 * 终究一个数据的时刻戳
 */
private long lastAddTimestamp;

SlidingWindow:结构函数

  • 10 是说采集的一个数组下标 的时刻是 10*100/1000=1s 也便是除以10 的秒数
  • 小于=5的话 就默许翻倍 除以10 也便是 5 代表着1s 而不是0.5s
  • 1秒一个时刻片,窗口共5个 大于1s的时刻片窗口是10个
    SlidingWindow window = new SlidingWindow(10, 40 );
  • 窗口 我们设置为10个时刻片
public SlidingWindow(int duration, int threshold) {
    //超过10分钟的按10分钟
    if (duration > 600) {
        duration = 600;
    }
    //要求5秒内勘探出来的,
    if (duration <= 5) {
        this.windowSize = 5;
        this.timeMillisPerSlice = duration * 200;
    } else {
        this.windowSize = 10;
        this.timeMillisPerSlice = duration * 100;
    }
    this.threshold = threshold;
    // 确保存储在至少两个window
    this.timeSliceSize = windowSize * 2;
    reset();
}

我们结构滑动窗口的时分先设置一个时刻片的时长,和阈值 一般会核算最近十个时刻片的核算数量 和阈值 比较 看是不是热度key

每个能够占用的核算时长为
timeMillisPerSlice * windowSize

locationIndex:核算当时地点的时刻片的方位

核算当时地点的时刻片的方位源码

/**
 * 核算当时地点的时刻片的方位
 */
private int locationIndex() {
    long now = SystemClock.now();
    //假如当时的key现已超出一整个时刻窗口了,那么就直接初始化就行了,不用去核算了
    if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
        reset();
    }
    int index = (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
    if (index < 0) {
        return 0;
    }
    return index;
}

这个核算公式 意思便是 从开始 到现在 过了多少个时刻片, 由于运用的是 长度为timeSliceSize 的环形buffer 方式,所以再跟行列长度取模下 得到的成果便是 当时地点的时刻片的方位

int index = (int) (((now – beginTimestamp) / timeMillisPerSlice) % timeSliceSize);

有两个时刻戳 beginTimestamp 和 lastAddTimestamp

/**
 * 该滑窗的开始创立时刻,也便是第一个数据 reset 的时分更新
 */
private long beginTimestamp;
/**
 * 终究一个数据的时刻戳 addcount的时分更新
 */
private long lastAddTimestamp;

addCount:添加count个数量

/**
     * 添加count个数量
     */
    public synchronized boolean addCount(long count) {
        //当时自己地点的方位,是哪个小时刻窗(时刻片)
        int index = locationIndex();
//        System.out.println("index:" + index);
        //然后清空自己前面windowSize到2*windowSize之间的数据格的数据
        //比如1秒分4个窗口,那么数组合计8个窗口
        //当时index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来便是该窗口内的总和
        //后边的一半需要整理的, 前面的一般,是要核算的, 一半是一个时刻窗的巨细
        clearFromIndex(index);
        int sum = 0;
        // 在当时时刻片里持续+1
        sum += timeSlices[index].addAndGet(count);
        //加上前面几个时刻片
        for (int i = 1; i < windowSize; i++) {
            sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
        }
        lastAddTimestamp = SystemClock.now();
        return sum >= threshold;
    }

滑动窗口存在的高并发问题

hotkey 热度核算的高并发问题

作者在进行热度核算的时分,为了提升 核算效率 消费行列运用了多线程(cpu核数)进行消费

@Bean
public Consumer consumer() {
    int nowCount = CpuNum.workerCount();
    //将实践值赋给static变量
    if (threadCount != 0) {
        nowCount = threadCount;
    } else {
        if (nowCount >= 8) {
            nowCount = nowCount / 2;
        }
    }
    List<KeyConsumer> consumerList = new ArrayList<>();
    for (int i = 0; i < nowCount; i++) {
        KeyConsumer keyConsumer = new KeyConsumer();
        keyConsumer.setKeyListener(iKeyListener);
        consumerList.add(keyConsumer);
        threadPoolExecutor.submit(keyConsumer::beginConsume);
    }
    return new Consumer(consumerList);
}

public void beginConsume() {
    while (true) {
        try {
            HotKeyModel model = QUEUE.take();
            if (model.isRemove()) {
                iKeyListener.removeKey(model, KeyEventOriginal.CLIENT);
            } else {
                iKeyListener.newKey(model, KeyEventOriginal.CLIENT);
            }
            //处理完毕,将数量加1
            totalDealCount.increment();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
@Override
public void newKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
    //cache里的key
    String key = buildKey(hotKeyModel);
    //判别是不是刚热不久
    Object o = hotCache.getIfPresent(key);
    if (o != null) {
        return;
    }
    SlidingWindow slidingWindow = checkWindow(hotKeyModel, key);
    //看看hot没
    boolean hot = slidingWindow.addCount(hotKeyModel.getCount());
    。。。。。。。
    }
}
/**
     * 添加count个数量
     */
    public synchronized boolean addCount(long count) {
        //当时自己地点的方位,是哪个小时刻窗(时刻片)
        int index = locationIndex();
        clearFromIndex(index);
        int sum = 0;
        // 在当时时刻片里持续+1
        sum += timeSlices[index].addAndGet(count);
        //加上前面几个时刻片
        for (int i = 1; i < windowSize; i++) {
            sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
        }
        lastAddTimestamp = SystemClock.now();
        return sum >= threshold;
    }

每个线程获取到HotKeyModel今后 都在进行 iKeyListener.newKey(model, KeyEventOriginal.CLIENT); 热度核算。终究都会调用

boolean hot = slidingWindow.addCount(hotKeyModel.getCount());

从而调用

sum += timeSlices[index].addAndGet(count);

假如多个线程 对同一个key进行热度核算,也就会获取同一个 滑动窗口,但是

sum += timeSlices[index].addAndGet(count);

是并发不安全的

好在 热度核算这种成果数据成果正确性要求不严格,能够必定程度承受,
就像caffiene缓存 进行内存淘汰的时分,对缓存key 的调用次数核算运用W-TinyLFU 算法,也是不精确核算,一般这种热度核算或许点击次数核算,不要求 肯定精度的事务场景,要优先考虑的时分 内存占用 核算复杂度等

解决方案

能够参阅 caffiene缓存 条状环形buff 替换掉目前的环形buff 循环行列,

/**
 * 循环行列,便是装多个窗口用,该数量是windowSize的2倍
 */
private AtomicLong[] timeSlices;

这样每个线程的核算节点都会被分散到不同的索引方位 ,一起这样也添加的设计和核算的复杂度。