假设有这样一个场景,在一个购票软件上买一张票,可是此刻剩下票数只要一张或几张,这个时分有几十个人都在一起运用这个软件购票。在不考虑任何影响下,正常的逻辑是首要判别当前是否还有剩下的票,假如有,那么就进行购买并扣减库存数,否则就会提示票数缺乏,购买失利。伪代码如下:

void buyTicket() {
    int stockNum = byTicketMapper.selectStockNum();
    if(stockNum>0){
        //TODO 买票流程....
        byTicketMapper.reduceStock(); // 扣减库存
    }else{
        log.info("=====>票卖完了<====");
    }
}

这段代码在逻辑上没有问题,可是在并发场景下,可能会存在一个严峻的问题。当剩下票数为1时,有A,B两个用户一起点击了购买按钮,A用户经过了库存大于0的校验并开端履行购票逻辑,可是因为一些原因形成A用户的购票线程有短暂的堵塞。而在这个堵塞的进程中,用户B建议了购买恳求,而且也经过了库存大于0的校验,直到整个购买流程履行完结而且扣减了库存。那么这个时分剩下库存刚好为0,不会再有用户建议购买恳求,这时用户A的购买恳求堵塞被唤醒,因为在此之前现已校验过库存大于0,所以履行完购买流程后,库存还会被扣减一次。那么此刻的库存为-1,这便是常听到的超卖问题。

Redis分布式锁存在的问题

为了避免这个问题,咱们能够经过加锁了办法,来确保并发的安全性。像JVM供给的内置锁synchronized,JUC供给的重入锁ReentrantLock,可是这两种锁只能确保单机环境下并发安全问题,一般在实践工作中很少会布置单节点的项目,一般都是多节点集群布置,这两个锁就失去了含义。这个时分就能够借助redis来完成分布式锁

setnx

在集群布置的情况下,一般运用redis来完成分布式锁。其中redis供给了setnx指令,标识只要key不存在时才能设值成功,然后到达加锁的效果。下面经过redis来改造上述的代码,其办法是购票线程首要获取锁,假如获取锁成功,那么持续履行购票事务流程,直到一切流程履行完结并扣减库存后,终究在开释锁。假如获取锁失利,那么就给出一个友爱的体系提示。

void buyTicket() {
    // 获取锁
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
    if (lock) {
        int stockNum = byTicketMapper.selectStockNum();
        if(stockNum>0){
            //TODO 买票流程....
            byTicketMapper.reduceStock(); // 扣减库存
        }else{
            log.info("=====>票卖完了<====");
        }
        // 开释锁
        redisTemplate.delete("lock");
    } else {
        log.info("=====>体系繁忙,请稍后!<====");
    }
}

问题1:死锁问题

经过上面的一顿梭哈,你以为这样就能够了吗,其实不然。幻想一下,假如线程A在获取锁成功后,在履行购票的逻辑中呈现了反常,那么这个时分就会形成锁得不到开释,其他线程一直获取不到锁,这就形成严峻的死锁问题。为了避免死锁问题的呈现,咱们能够对反常进行捕获,在finally中去开释锁,这样不论事务履行成功或失利,最终都会去开释锁。

void buyTicket() {
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
    if (lock) {
        try {
            int stockNum = byTicketMapper.selectStockNum();
            if (stockNum > 0) {
                //TODO 买票流程....
                byTicketMapper.reduceStock(); // 扣减库存
            } else {
                log.info("=====>票卖完了<====");
            }
        }finally {
            redisTemplate.delete("lock");   // 开释锁
        }
    } else {
        log.info("=====>体系繁忙,请稍后!<====");
    }
}

你以为这就完毕了吗?死锁就不会产生了吗?假如你以为这样就能避免死锁的产生,那你就太不细心啦。假如在程序刚想像履行开释锁的逻辑时,redis服务忽然宕机了,那么这时锁开释就失利了。在将redis服务重启后,加锁的数据又被康复了,这样又呈现了死锁的现象。为了避免这个问题,能够为锁设置一个过期时刻,这样即使redis重启康复数据后,也会很快的过期掉。不过需求留意的是,在设置锁的过期时刻时,必定要确保原子性操作,不然还是会呈现死锁问题。


//不是原子操作,会呈现死锁问题
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
//假如刚要履行该句子时,redis宕机了。上面的锁无法开释
redisTemplate.expire("lock",Duration.ofSeconds(5L));
//原子操作
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1", Duration.ofSeconds(5L));

问题2:锁被其他线程开释问题

经过上面的又一顿梭哈,死锁的问题能够避免了,这样在高并发的情况下就能安全的履行了吗。假如锁的过期时刻设置了5秒,当A线程建议购票恳求并获取到了锁,可是A线程在履行购票流程时花费了6秒,此刻线程A的锁现已过期。这时线程B从头获取了锁而且也开端履行购票流程,可是A线程要比B线程履行的要快,当A线程开释锁时,问题就呈现了。因为A线程履行的进程锁现已过期了,那么在履行开释锁的流程时,终究被开释的是线程B的锁,这就导致B的锁被A线程开释问题。

Redis分布式锁存在的问题

关于这个现象,能够给每个锁设置一个仅有标识,比方像UUID,线程ID。在开释锁时,校验一下这个锁的标识是否为需求删去的锁,假如是,在进行锁的开释。

public void buyTicket() {
    String uuid = UUID.randomUUID().toString();
    // 为锁设置一个仅有标识
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, Duration.ofSeconds(5L));
    if (lock) {
        try {
            int stockNum = byTicketMapper.selectStockNum();
            if (stockNum > 0) {
                //TODO 买票流程....
                byTicketMapper.reduceStock(); // 扣减库存
            } else {
                log.info("=====>票卖完了<====");
            }
        }finally {
            String lockValue = redisTemplate.opsForValue().get("lock");
            if(lockValue.equals(uuid)){ //校验标识,经过则开释锁
                redisTemplate.delete("lock");   
            }
        }
    } else {
        log.info("=====>体系繁忙,请稍后!<====");
    }
}

问题3:锁续期问题

运用setnx指令做分布式锁时,无法避免的一个问题便是:线程没有履行完结,可是锁现已过期。在处理锁被其他线程误删的代码中,并不是100%能处理的,问题点在于下面这段代码。假如线程A现已履行到了if句子而且经过了判别,当刚要履行开释锁的逻辑时,线程A的锁过期了而且线程B从头获取到了锁,那么线程A在开释锁时,开释的是B的锁。为了彻底能够处理这个问题,能够选用锁续期的办法,其完成办法是独自开一个线程用来定时监听线程的锁是否还被持有,假如还持有,那么就给这把锁添加一些过期时刻,这样就不会呈现上述问题了。现在市面上现已为咱们供给了锁自动续期的中间件,比方redisson

 String lockValue = redisTemplate.opsForValue().get("lock");
  if(lockValue.equals(uuid)){ // 线程A的锁过期
      redisTemplate.delete("lock");   // 线程A删去了线程B的锁
   }

Redisson

redisson一般运用最多的场景便是分布式锁了,它不仅确保了并发场景下线程安全的问题,也处理了锁续期的问题。运用办法也比较简略,以3.5.7版别为例,首要需求装备redisson信息,依据自己的redis集群形式自由选择装备。在装备完结后,再来改造上面的购票办法。

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        // 单机装备
        config.useSingleServer().setAddress("redis://127.0.0.1:3306").setDatabase(0);
        // 主从装备
        // config.useMasterSlaveServers().setMasterAddress("").addSlaveAddress("","");
        // 岗兵装备
        // config.useSentinelServers().addSentinelAddress("").setMasterName("");
        // Cluster装备
        //config.useClusterServers().addNodeAddress("");
        return Redisson.create(config);
    }

关于redisson运用起来也非常简略,经过getLock办法获取到RLock目标。经过RLock的tryLock或lock办法来进行加锁(底层都是经过Lua脚原本完成的)。当获取到锁而且扣减库存后,能够运用unlock办法进行锁开释。

void buyTicket() {
    RLock lock = redissonClient.getLock("lock");
    if (lock.tryLock()) {  // 获取锁
        try {
            int stockNum = byTicketMapper.selectStockNum();
            if (stockNum > 0) {
                //TODO 买票流程....
                byTicketMapper.reduceStock(); // 扣减库存
            } else {
                log.info("=====>票卖完了<====");
            }
        } finally {
            lock.unlock(); //开释锁
        }
    } else {
        log.info("=====>体系繁忙,请稍后!<====");
    }
}
  • Watch Dog机制

那redisson是怎么做到锁续期的呢?其实在redisson内部有一个看watch dog机制(看门狗机制),可是看门狗机制并不是在加锁时就能启动的。需求留意的是在加锁时,假如运用tryLock(long t1,long t2, TimeUnit unit)或lock(long t1,long t2, TimeUnit unit)办法而且将t2参数值设为了一个不为-1的值,那么看门口将无法生效。看门狗在启动后会监听主线程还在履行,假如还在履行那么将会经过Lua脚本每10秒给锁续期30秒。watchlog的延时时刻默以为30秒,这个值能够在装备config时自己界说。

private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1L) { // 假如leaseTime不是-1,那么将无法运用看门狗
        return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    } else {
        RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (future.isSuccess()) {
                    Boolean ttlRemaining = (Boolean)future.getNow();
                    if (ttlRemaining) {
                        // 看门口机制
                        RedissonLock.this.scheduleExpirationRenewal(threadId);
                    }
                }
            }
        });
        return ttlRemainingFuture;
    }
}
private long lockWatchdogTimeout = 30000L; //默认30秒
private void scheduleExpirationRenewal(final long threadId) {
    if (!expirationRenewalMap.containsKey(this.getEntryName())) {
        // 每10秒履行续期
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
            // 经过LUA脚本为锁续期
                RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
                future.addListener(new FutureListener<Boolean>() {
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
                        if (!future.isSuccess()) {
                            RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
                        } else {
                            if ((Boolean)future.getNow()) {
                                RedissonLock.this.scheduleExpirationRenewal(threadId);
                            }
                        }
                    }
                });
            }
        }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); // 每10秒履行一次
        if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {
            task.cancel();
        }
    }
}

问题4:主从切换导致锁丢失问题

尽管redisson协助咱们处理了锁续期的问题,可是在redis集群架构中,因为主从复制具有必定的延时,那么在极端情况下就会呈现这样一个问题:当一个线程获取锁成功,而且成功向主节点保存了锁信息,当主节点还未像从节点同步锁信息时,主节点宕机了,那么当产生毛病转移从节点切换为主节点时,线程加的锁就丢失了。为了处理这个问题,redis引入了红锁RedLock,RedLock与大多数中间件的推举机制类似,选用过半的办法来决议操作成功还是不成功。

RedLock

  • 加锁

RedLock在工作中,并不承受redis的集群架构,无论是主从,岗兵还是Cluster。每台redis服务都是独立的,都是一台独立的Master节点。在加锁的进程中,RedLock会记载开端加锁时的时刻以及加锁成功后的时刻,这两个时刻差便是一台机器加锁成功所需求的时刻。比方启动了5个redis服务,线程A设置锁的超时时刻为5秒,当像第一台redis服务加锁成功后花费了1秒,像第二台服务加锁成功后也花费了一秒。这个时分加到第二台机器时,现已花费了两秒的时刻,可是加锁数并未过半,还需求加锁一台才能彻底算加锁成功,这个时分第三台机器加锁成功又花费了1秒。那么总的加锁时刻便是3秒,锁的实践过期时刻就为2秒。特别需求留意的是,在向redis服务建立网络衔接时,要设置一个超时时刻,避免redis服务宕机时,客户端还在傻傻的等待回应,这里超时时刻官方给到主张是5-50毫秒之间,当衔接超时时,客户端会持续向下一个节点建议衔接。

Redis分布式锁存在的问题

  • 加锁失利

假如因为某些原因,获取锁失利(加锁没有超对折或者取锁时刻现已超过了有用时刻),客户端应该在一切的Redis实例上进行解锁,即便某些Redis实例根本就没有加锁成功。

  • 失利重试

在并发场景下,RedLock会呈现这样一个问题,比方有三个线程一起去获取了同一张票的锁,此刻A线程现已成功给redis-1和reids-2加上了锁,线程B现已成功给redis-3,reids-4加上了锁,线程C成功的给reids-5加上了锁,这个时分三个线程再去加锁时,没有机器可加了,发现加锁成功数都未过半,那么就导致客户端一直获取不到锁。

Redis分布式锁存在的问题

当客户端无法取到锁时,应该在随机推迟必定时刻,然后进行重试,避免多个客户端在一起争夺同一资源的锁。

  • 开释锁

开释锁比较简略,向一切的Redis实例发送开释锁指令即可,不用关心之前有没有从Redis实例成功获取到锁.

在了解了RedLock后,最终再来改造购票的代码逻辑。首要需求依据redis的实例数来界说对应的Bean实例,redis的实例最少要有三台。

@Bean
public RedissonClient redissonClient() {
    Config config = new Config();
    // 单机装备
    config.useSingleServer().setAddress("redis://192.168.36.128:3306").setDatabase(0);
    return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2() {
    Config config = new Config();
    // 单机装备
    config.useSingleServer().setAddress("redis://192.168.36.130:3306").setDatabase(0);
    return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3() {
    Config config = new Config();
    // 单机装备
    config.useSingleServer().setAddress("redis://192.168.36.131:3306").setDatabase(0);
    return Redisson.create(config);
}

在装备完结后,为每台实例都设置同一把锁,最终在调用RedissonRedLock供给的tryLock和unlock进行加锁和解锁。

void buyTicket(){
    RLock lock = redissonClient.getLock("lock");
    RLock lock2 = redissonClient2.getLock("lock");
    RLock lock3 = redissonClient3.getLock("lock");
    RedissonRedLock redLock = new RedissonRedLock(lock,lock2,lock3); // 分别像三台实例加锁
    if (redLock.tryLock()) {
        try {
            int stockNum = byTicketMapper.selectStockNum();
            if (stockNum > 0) {
                //TODO 买票流程....
                byTicketMapper.reduceStock(); // 扣减库存
            } else {
                log.info("=====>票卖完了<====");
            }
        } finally {
            redLock.unlock();  //开释锁
        }
    } else {
        log.info("=====>体系繁忙,请稍后!<====");
    }
}

总结

在运用reids做分布式锁时,并没有幻想中的那么简略,高并发场景下简单呈现死锁,锁被其他线程误删,锁续期,锁丢失等问题,在实践开发中应该考虑到这些问题并依据相应的处理办法来处理这些问题,然后确保体系的安全性。本文中可能会存在一些遗失或错误,后续会持续跟进。