我正在参加「启航计划」

读前问题:Map的特性和Queue的特性相结合,咱们会联想到什么功能呢?

接下来看看我的完成是不是和你的主意不约而同

布景

前一段时间保护的老项目一向呈现MQ音讯堆积的问题

其实这个音讯量本身我觉得不算是很大,但可能是之前的代码问题,导致音讯一向堆积

这个老项目是用于办理设备状况的,一些设备上报的数据会经过中台服务推到MQ中,然后这个项目会监听这部分音讯更新设备状况

之前这个项目一向跑的好好的,一共差不多就几百个设备,虽然存在少数的音讯堆积,可是设备在每天的一个时间段是不会上报数据的,这个项目在设备不上报数据的这段时间里能把之前堆积的少数音讯给处理完,所以等于是达到了一个平衡

后面因为项目需求又加了一批设备,然后MQ音讯堆积的问题就被放大了

又因为之前的同事离任了,然后。。。dddd

开端的规划

在我研讨了一段时间后(问问同事,看看代码)我发现他们之前用了最土豪的办法

那便是每收到一条数据就在数据库更新一次

这还不是最骚的

其时他们将设备的特点规划成了动态的,也便是说会有一张表来独自保存特点

id device attr value
1 10001 brightness 100
2 10001 power 30
3 10002 brightness 80
4 10002 power 20

相似上面这样的办法来保存设备的特点

所以设备的一个特点就等于一条记载,而有些设备一次性会保存几十条特点

而且他们还在更新特点的办法上还加了@Transactional

可能其时以为这个项目的体量不会很大,就怎样简略怎样来?

我仿佛能看到这个老项目一边没日没夜的处理数据一边撕扯着嗓子:臣妾做不到啊!

Queue & Map 武魂融合技:MapQueue

问题剖析 & 解决计划

其实这个问题便是服务处理数据的速度跟不上设备上报数据的速度

研讨了解的需求和代码之后有了一些优化的思路

其时就罗列了下所有能想到的解决计划再进行对比评估

扩展服务/MQ/硬件资源(老项目能不动就不动)

依据咱们之前的了解,其实代码端就有很大的优化空间,何况假如瓶颈是在数据库的话,单纯扩展服务或MQ估计没有什么作用,而且还需求额定的保护和费用

加大设备上报的时间间隔(老项目能不动就不动)

同样的,一个是代码端有很大的优化空间,而且这个计划治标不治本,假如后续设备数量又增加,不能确保不会再次呈现这个问题

最重要的是,就几百个设备,我可没有这个脸提这个计划

优化底层更新逻辑(这下不得不修正代码)

假如要在代码层面进行优化,首先这些设备特点能够直接在Redis中更新,然后守时刷到数据库

一起假如特点的值没有变化的话乃至能够直接疏忽,这样在更新数据库的时分进一步减少了数据量

可是,这种办法需求花费很多额定的时间来排查可能引发的连锁问题,特别是有很多基于设备特点更新而联动的其他模块的业务功能,而我并不想了解其他的业务模块(白眼)

运用背压战略(这下不得不修正代码)

假如咱们触摸过RxJava应该会了解关于生产者的生产速度大于顾客的消费速度时,咱们能够用特定的背压战略来避免一些问题

因为这个项目接纳的数据仅仅用于更新数据库,没有其他的处理,所以咱们只需求重视最新的数据就行了,在这之前的数据直接丢掉也没有太大问题

这样的话只需求在承受数据和处理数据之间加一个背压战略

//仅仅思路,非真实代码
public class Receiver {
    //背压战略
    private Backpressure backpressure = 
        new Backpressure().onEach(this::handle);
    //接纳数据-背压
    public void receiveNew(Object data) {
        backpressure.put(data);
    }
    //接纳数据-原有办法
    public void receive(Object data) {
        handle(data);
    }
    //处理数据
    public void handle(Object data) {
        //省掉
    }
}

就像上面的代码,收到数据后放入Backpressure中,Backpressure会将大部分中间状况的数据丢掉,将最新的状况回调到onEach

假如真的能完成这样的背压战略,就能够少处理很多数据,一起改动很小

(这儿其实也能够直接把接纳到的数据放到Redis,然后用守时使命拉Redis中的数据更新数据库,相当于是把上一个计划的逻辑移到最外层,可是我在最开端居然没想到!)

只处理每个设备最新数据的背压战略

为了完成上述背压的功能,咱们需求有一个数据筛选模型,也便是咱们怎样判别哪些数据要处理哪些数据要丢掉

规划一个缓存行列,当接纳到新的数据而行列现已满了的时分,就丢掉最早的数据?

这样确实能达到相似的作用,可是有可能只能处理到一部分设备数据,而另一部分设备数据永远都被丢掉

那么怎样确保每个设备都能处理到呢?

假如能有这样一个行列,当接纳到一个新的设备数据时,假如当时行列中现已存在对应设备的数据就更新这个设备数据,否则在最后增加这条设备数据

简略模拟一下流程,如下:

  1. 设备A上报数据A1,当时行列:A1
  2. 设备B上报数据B1,当时行列:A1,B1
  3. 设备A上报数据A2,当时行列:A2,B1
  4. 进行一次消费,最前面的A2被消费,当时行列:B1
  5. 设备B上报数据B2,当时行列:B2
  6. 设备A上报数据A3,当时行列:B2,A3
  7. 设备B上报数据B3,当时行列:B3,A3
  8. 进行一次消费,最前面的B3被消费,当时行列:A3

在行列的基础上,存在的就更新,没有的就增加,这不便是Map的特性么?而且LinkedHashMap就支持放入的次序

假如能让QueueMap结合在一起,说不定就能完成这样的功能!

而且咱们能够运用BlockingQueue的特性,运用对应的阻塞办法,用put入队,用take出队

改造 LinkedBlockingQueue(JDK8)

咱们能够组合LinkedBlockingQueueLinkedHashMap来完成咱们需求的功能

改造LinkedBlockingQueue将内部的链表改为LinkedHashMap,这样相同的设备数据会掩盖,一起会按照上报的次序进行消费,确保每个设备某一时刻的最新数据都会被消费

为什么是改造LinkedBlockingQueue而不是ArrayBlockingQueue,因为LinkedBlockingQueue用的是两把锁,入队和出队各一把锁,性能应该更好一点

将Node链表改为Map

咱们先增加咱们的Map特点用于替换链表的操作

先来看看LinkedBlockingQueue本来的链表特点

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /**
     * Linked list node class
     */
    static class Node<E> {
        E item;
        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;
        Node(E x) { item = x; }
    }
    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;
    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;
}

现在咱们去掉链表的这些内容,一起增加咱们的Map特点

改完之后咱们的特点大概是这样的,其他的特点都保留

//暂时就叫 LinkedBlockingMapQueue 吧
public class LinkedBlockingMapQueue<K, V> {
    /**
     * The capacity bound, or Integer.MAX_VALUE if none
     */
    private final int capacity;
    /**
     * Current number of elements
     */
    private final AtomicInteger count = new AtomicInteger();
    /**
     * Lock held by take, poll, etc
     */
    private final ReentrantLock takeLock = new ReentrantLock();
    /**
     * Wait queue for waiting takes
     */
    private final Condition notEmpty = takeLock.newCondition();
    /**
     * Lock held by put, offer, etc
     */
    private final ReentrantLock putLock = new ReentrantLock();
    /**
     * Wait queue for waiting puts
     */
    private final Condition notFull = putLock.newCondition();
    //咱们增加的 Map
    private final Map<K, V> map = new LinkedHashMap<>();
}

简略的介绍一下

capacity是行列的容量

count是当时元素的数量

takeLock是出队锁

notEmpty用来操控是否能够出队

putLock是入队锁

notFull用来操控是否能够入队

改造 put 办法

咱们先来看下本来的put办法

/**
 * Inserts the specified element at the tail of this queue, waiting if
 * necessary for space to become available.
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
/**
 * Links node at end of queue.
 *
 * @param node the node
 */
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

加上入队锁之后,当时行列不满的状况下,将新的节点放到链表最后,一起count+1,如有必要再唤醒一下其他线程

咱们只要把enqueue办法改成往Map中增加即可

public V put(K k, V v) throws InterruptedException {
    int c;
    V x;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        //从这儿开端修正
        if (map.containsKey(k)) {
            x = map.put(k, v);
            //假如现已存在则数量不增加
            c = count.get();
        } else {
            //假如现已存在,只需求修正不会增加数量
            //只要需求增加count才要等候
            while (count.get() == capacity) {
                notFull.await();
            }
            x = map.put(k, v);
            //假如不存在则数量+1
            c = count.getAndIncrement();
        }
        //到这儿修正完毕
        if (c + 1 < capacity) 
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0) 
        signalNotEmpty();
    return x;
}

这儿需求留意的一点是,假如Map中现已存在Key那么count就不必加了

改造 take 办法

take办法也依样画葫芦

咱们先看下本来的take办法

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
/**
 * Removes a node from head of queue.
 *
 * @return the node
 */
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

加上出队锁之后,当时行列不空的状况下,将最前面的节点拿出来,一起count-1,如有必要再唤醒一下其他线程

咱们只要把dequeue办法改成从Map中移除即可

public Map.Entry<K, V> take() throws InterruptedException {
    Map.Entry<K, V> x;
    int c;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        //从这儿开端修正
        Iterator<Map.Entry<K, V>> iterator = map.entrySet().iterator();
        x = iterator.next();
        iterator.remove();
        //到这儿修正完毕
        c = count.getAndDecrement();
        if (c > 1) 
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity) 
        signalNotFull();
    return x;
}

嘿嘿,功德圆满!

该吃饭吃饭,该下班下班,该睡觉睡觉,该干嘛干嘛

事情顺利的如同有点不太对劲

在回家了路上我一向在想LinkedBlockingQueueArrayBlockingQueue的锁的数量问题

ArrayBlockingQueue用一把锁是不是因为操作的是同一个数组,简略呈现抵触或是处理抵触的判定非常复杂

LinkedBlockingQueue用的是链表,每个节点相对独立,头部节点出队和在尾部增加节点不会相互影响

那么我现在把链表改为Map会有问题么?虽然咱们现已操控了不能一起入队或一起出队,可是在入队的一起出队是没有限制的

入队问题

咱们再来回忆一下咱们的put办法的其中一段

//加锁
if (map.containsKey(k)) {
    //假如现已存在对应的 key
    x = map.put(k, v);
    //假如现已存在则数量不增加
    c = count.get();
} else {
    //省掉
}
//解锁

假设咱们现在的行列里边有{"A": "A1"}这一个元素

现在咱们调用办法put办法增加元素{"A": "A2"}

因为现已存在A对应的元素,所以会进入上面的这个if分支

这个时分,另一个线程调用take办法把{"A": "A1"}取出,count变为0

接着执行map.put(k, v)后,map中的元素数量为1count还是0

这不就。。。出大问题了吗

咱们直接调试一下是不是真的会呈现这样的状况

Queue & Map 武魂融合技:MapQueue

我写了一段测试的代码,先增加一个{"A": "A1"},然后在增加{"A": "A2"}之前打上断点,运转

Queue & Map 武魂融合技:MapQueue

能够看到现在count = 1然后map里边有一个元素{"A": "A1"}

然后一步一步跟进去

Queue & Map 武魂融合技:MapQueue

果然进到了if分支里边,这个时分咱们调用一下take办法

Queue & Map 武魂融合技:MapQueue

能够看到count = 0一起map为空了

然后接着执行map.put(k, v)办法

Queue & Map 武魂融合技:MapQueue

好家伙,mapsize = 1count = 0

因为咱们之前判别了A存在,所以不会进行count + 1的操作

Queue & Map 武魂融合技:MapQueue

解决计划

一种计划是去掉count的保护逻辑,数量直接经过map.size()获取

可是这样就需求Map在必定程度上线程安全,而LinkedHashMap是线程不安全的

难道咱们还要改造一下LinkedHashMap,让它在一起读写的时分能够不出问题?

Queue & Map 武魂融合技:MapQueue

还有另一种计划

那便是把锁改成一把,出队的时分就不能入队,入队的时分就不能出队

决断挑选了这种计划

改成一把锁之后,整个逻辑也变得更简略了

public V put(K key, V value) throws InterruptedException {
    V x;
    lock.lockInterruptibly();
    try {
        if (map.containsKey(k)) {
            //现已存在
            //更新节点
            //count不变
            x = map.put(k, v);
        } else {
            //不存在
            while (count == capacity) {
                //满了,等未满的信号
                notFull.await();
            }
            //增加节点
            //count+1
            x = map.put(k, v);
            count++;
        }
        //发送未空信号
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return x;
}
public Map.Entry<K, V> take() throws InterruptedException {
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            notEmpty.await();
        }
        Iterator<Map.Entry<K, V>> iterator = map.entrySet().iterator();
        Map.Entry<K, V> entry = iterator.next();
        iterator.remove();
        count--;
        notFull.signal();
        return entry;
    } finally {
        lock.unlock();
    }
}

来看看作用,先写一个Demo

@Slf4j
public class MapQueueSample {
    private final Map<String, String> map;
    private final BlockingQueue<String> queue;
    public MapQueueSample() {
        BlockingMapQueue<String, String> bmq = new LinkedBlockingMapQueue<>();
        bmq.addSynchronizer(new MapQueue.Synchronizer<String, String>() {
            @Override
            public void afterEnqueue(String key, String value, Map<String, String> readOnly) {
                log.info("Put: " + value + ", Current: " + readOnly.toString());
            }
            @Override
            public void afterDequeue(String key, String value, Map<String, String> readOnly) {
                log.info("Take: " + value + ", Current: " + readOnly.toString());
            }
        });
        this.map = bmq.map();
        this.queue = bmq.queue();
    }
    public void start() {
        startPut("A");
        startPut("B");
        startPut("C");
        startPut("D");
        startTake();
    }
    public void startPut(String s) {
        Thread thread = new Thread() {
            int i;
            @SneakyThrows
            @Override
            public void run() {
                while (true) {
                    String v = s + i++;
                    map.put(s, v);
                    Thread.sleep(500);
                }
            }
        };
        thread.setName("Put" + s);
        thread.start();
    }
    @SneakyThrows
    public void startTake() {
        while (true) {
            String s = queue.take();
            Thread.sleep(1200);
        }
    }
}

4个线程放ABCD,然后主线程不断读取

Queue & Map 武魂融合技:MapQueue

便是咱们要的作用!

Queue & Map 武魂融合技:MapQueue

完毕

我觉得我现在现已能够独自写一篇LinkedHashMap的源码剖析了

然后我将LinkedBlockingMapQueue发布到了Maven中央仓库上

简略介绍一下集成和运用

implementation 'com.github.linyuzai:concept-mapqueue-core:1.1.0'

<dependency>
  <groupId>com.github.linyuzai</groupId>
  <artifactId>concept-mapqueue-core</artifactId>
  <version>1.1.0</version>
</dependency>

运用办法如下

BlockingMapQueue<String, String> bmq = new LinkedBlockingMapQueue<>();
//经过 Map 操作
Map<String, String> map = bmq.map();
//经过 Queue 操作
BlockingQueue<String> queue = bmq.queue();

经过map()queue()能够获得对应的实例调用对应的办法

有兴趣的话能够看看文档和源码,顺便给个Star吧,嘿嘿