做过企业微信开发的同学应该知道,企业微信有一个很厌烦的报错–接口并发超越约束(45033)。报错的原因呢便是由于有多个线程在一起调用企业微信的接口,为了不让接口调用一向报错,我的服务就也要有一个接口并发操控体系。
首要想到的便是用线程池来完成
刚开始看起来好像没什么问题,可是企业微信的接口之间并发约束是互相阻隔的,那就意味着,我需要创立几十个线程池。。。你以为这就完了?其实不同企业用户调用的时分又是互相阻隔的,所以,我需要为每个企业。。。。
线程池肯定是用不了了,我猛然想起前天晚上做梦,大师好像跟我讲了信号量能够用来操控并发,考虑到线上环境有多个节点,一起Redisson也供给了Semaphore,打算用一下试试。想想间隔上次运用信号量已经过去了∞天(究竟我从来都没有用过),我直接翻开IDEA,先用java的试试水,public static void main一气呵成
public static void main(String[] args) {
//创立一个信号量
Semaphore semaphore = new Semaphore(1);
//假如没获取到就抛出反常
if (!semaphore.tryAcquire()) {
throw new RuntimeException();
}
//try不能括到上面,获取不到的时分是不需要开释的
try{
//做一些事务
System.out.println("do something");
}finally {
//开释掉
semaphore.release();
}
}
又检查了一下逻辑,看起来好像没什么问题,然后我突发奇想,假如我直接release()会怎么样
Semaphore semaphore = new Semaphore(1);
semaphore.release();
System.out.println(semaphore.availablePermits());
。。。0.0
此刻我的心态出现了亿点变化,我意识到,其实初始化的new Semaphore(1)
仅仅设置了一个初始值,并不是一个最大值,直接调用release()
,可获取信号量的值会直接+1,这时我赶忙用Redisson的api redissonClient.getSemaphore("test");
(其他代码和上面的相同)试了一下,假如呢
果然作用和Java的是一模相同。
不过,问题好像也不是很大?假如我确保事务在拿到信号后必定会在finally{..}
中开释掉,并且semaphore.trySetPermits(1);
这个办法只初始化一次,并且必定是单线程,那其实就没有问题。
企业微信的这个并发约束,没有明文规定,那上线之后,大概是要在违法的边缘多试探试探的,所以就涉及到更新信号量的问题了
能够看到,假如在多线程的情况下,轻率更新信号量,由于模型没有最大值的概念,更新后的信号量总量比起期望值是只多不少的。
问题首要还是出在release()
这个办法没有上限概念上,那我来弄一个有上限的release办法就好了。此刻第一个思路便是继承RedissonSemaphore
这个类重写一下release()
,可是后来发现自己的子类无法用redissonClient
初始化出来,计划就pass掉了。所谓工作时间长了,什么都敢干,我直接翻开源码,我只需仿照着写个相同的是不就行了!
public RFuture<Void> releaseAsync(int permits) {
...
RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
"redis.call('publish', KEYS[2], value); ",
Arrays.asList(getRawName(), getChannelName()), permits);
...
}
好家伙,核心逻辑其实就一行lua脚本redis.call('incrby', KEYS[1], ARGV[1]);
,我只需一个判别上限的就好了。虽然之前也没写过lua脚本,可是好在逻辑不杂乱,只需要一个if
判别就行了,redisson源码中有许多这样的判别,现学现卖就能够了。
另外,鉴于面向目标的思想,假如我自己工具类写一个release()
办法,并让这个办法直接出现在事务中多少有一些不妥。所以新建了一个目标,封装一下获取和开释两个办法,之后项目中所有信号量的相关事务,都用这个目标,redisson的信号量概念就不会再出现在具体的事务中了,能够进步事务代码的可读性,也会信号量相关操作的修改操控在这个类中。
经过了一番折腾,总算是把功能完成了。部分代码如下,tryAcquire()
办法还是运用redisson供给的,可是release()
办法用了自己完成的~
public class SemaphoreModel {
/**
* 信号量本体
*/
private RedissonSemaphore semaphore;
private RedissonClient redissonClient;
/**
* 获取到信号的mark
*/
private boolean mark = false;
public void release() {
//获取信号量的值
String script = "local value = redis.call('get', '" + semaphore.getRawName() + "'); " +
//假如没有设置过(false意味着undefind)或者当时值小于5(我设置的最大值)就做release的逻辑
"if (value ~= false and tonumber(value) < 5) then " +
//信号量加一
"local val = redis.call('incrby', '" + semaphore.getRawName() + "', 1); " +
"redis.call('publish', '" + RedissonSemaphore.getChannelName(semaphore.getRawName()) + "', val); " +
"return 1; " +
"end; " +
"return 0;";
redissonClient.getScript().eval(RScript.Mode.READ_WRITE, script, RScript.ReturnType.VALUE);
}
/**
* 获取一个信号量
*/
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
}
经过了这样的规划,就造出了一个有上限的信号量,问题也都处理了。
本文的内容到这里就结束了,假如有不对的地方,欢迎在评论区指出,觉得有用的话别忘了点个赞哦~