前言

咱们都知道分布式环境下要运用分布式锁才行。那么分布式锁都需求有哪些特色呢?单机redis怎么加锁?redis集群加锁有哪些坑呢?别急,下面咱们一步步解开Redis分布式锁的面纱。

欢迎重视个人大众号【好好学技术】交流学习

分布式锁的特色

  • 1.独占性

不论在任何情况下都只能有一个线程持有锁。

  • 2.高可用

redis集群环境不能由于某一个节点宕机而呈现获取锁或开释锁失利。

  • 3.防死锁

必须有超时操控机制或许吊销操作。

  • 4.不乱抢

自己加锁,自己开释。不能开释他人加的锁。

  • 5.重入性

同一线程能够多次加锁。

redis单机怎么完成

一般情况下都是运用setnx+lua脚本完成。

直接贴代码

package com.fandf.test.redis;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
 * redis 单机锁
 *
 * @author fandongfeng
 * @date 2023/3/29 06:52
 */
@Slf4j
@Service
public class RedisLock {
    @Resource
    RedisTemplate<String, Object> redisTemplate;
    private static final String SELL_LOCK = "kill:";
    /**
     * 模仿秒杀
     *
     * @return 是否成功
     */
    public String kill() {
        String productId = "123";
        String key = SELL_LOCK + productId;
        //锁value,解锁时 用来判别当时锁是否是自己加的
        String value = IdUtil.fastSimpleUUID();
        //加锁 十秒钟过期 防死锁
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS);
        if (!flag) {
            return "加锁失利";
        }
        try {
            String productKey = "good123";
            //获取商品库存
            Integer stock = (Integer) redisTemplate.opsForValue().get(productKey);
            if (stock == null) {
                //模仿录入数据, 实践应该加载时从数据库读取
                redisTemplate.opsForValue().set(productKey, 100);
                stock = 100;
            }
            if (stock <= 0) {
                return "卖完了,下次早点来吧";
            }
            //扣减库存, 模仿随机卖出数量
            int randomInt = RandomUtil.randomInt(1, 10);
            redisTemplate.opsForValue().decrement(productKey, randomInt);
            // 修正db,能够丢到行列里渐渐处理
            return "成功卖出" + randomInt + "个,库存剩下" + redisTemplate.opsForValue().get(productKey) + "个";
        } finally {
//            //这种办法会存在删去他人加的锁的或许
//            redisTemplate.delete(key);
//            if(value.equals(redisTemplate.opsForValue().get(key))){
//                //由于if条件的判别和 delete不是原子性的,
//                //if条件判别成功后,刚好锁到期自己解锁
//                //此刻别的线程假如持有锁了,就会把他人的锁删去去
//                redisTemplate.delete(key);
//            }
            //运用lua脚本确保判别和删去的原子性
            String luaScript =
                    "if (redis.call('get',KEYS[1]) == ARGV[1]) then " +
                            "return redis.call('del',KEYS[1]) " +
                            "else " +
                            "return 0 " +
                            "end";
            redisTemplate.execute(new DefaultRedisScript<>(luaScript, Boolean.class), Collections.singletonList(key), value);
        }
    }
}

进行单元测验,模仿一百个线程同时进行秒杀

package com.fandf.test.redis;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
/**
 * @Description:
 * @author: fandongfeng
 * @date: 2023-3-24 16:45
 */
@SpringBootTest
class SignServiceTest {
    @Resource
    RedisLock redisLock;
    @RepeatedTest(100)
    @Execution(CONCURRENT)
    public void redisLock() {
        String result = redisLock.kill();
        if("加锁失利".equals(result)) {
        }else {
            System.out.println(result);
        }
    }
}

只有三个线程抢到了锁

成功卖出5个,库存剩下95个
成功卖出8个,库存剩下87个
成功卖出7个,库存剩下80

redis锁有什么问题?

总的来说有两个:

  • 1.无法重入。
  • 2.咱们为了防止死锁,加锁时都会加上过期时刻,这个时刻大部分情况下都是依据经历对现有业务评估得出来的,可是假如程序阻塞或许反常,导致执行了很长时刻,锁过期就会主动开释了。此刻假如别的线程拿到锁,执行逻辑,就有或许呈现问题。

那么这两个问题有没有办法解决呢?有,接下来咱们就来讲讲Redisson

Redisson完成分布式锁

Redisson是什么?

Redisson是一个在Redis的基础上完成的Java驻内存数据网格(In-Memory Data Grid)。它不仅供给了一系列的分布式的Java常用对象,还供给了许多分布式服务。其中包括(BitSet,Set,Multimap,SortedSet,Map,List,Queue,BlockingQueue,Deque,BlockingDeque,Semaphore,Lock,AtomicLong,CountDownLatch,Publish / Subscribe,Bloom filter,Remote service,Spring cache,Executor service,Live Object service,Scheduler service) Redisson供给了运用Redis的最简略和最快捷的办法。Redisson的主旨是促进运用者对Redis的重视分离(Separation of Concern),然后让运用者能够将精力更集中地放在处理业务逻辑上。

springboot集成Redisson

集成很简略,只需两步

  1. pom引入依赖
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
</dependency>
  1. application.yml增加redis装备
spring:
  application:
    name: test
  redis:
    host: 127.0.0.1
    port: 6379

运用也很简略,只需求注入RedissonClient即可

package com.fandf.test.redis;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author fandongfeng
 */
@Component
@Slf4j
public class RedissonTest {
    @Resource
    RedissonClient redissonClient;
    public void test() {
        RLock rLock = redissonClient.getLock("anyKey");
        //rLock.lock(10, TimeUnit.SECONDS);
        rLock.lock();
        try {
            // do something
        } catch (Exception e) {
            log.error("业务反常", e);
        } finally {
            rLock.unlock();
        }
    }
}

或许不了解redisson的小伙伴会不禁宣布疑问。
what?加锁时不需求加过期时刻吗?这样会不会导致死锁啊。解锁不需求判别是不是自己持有吗?
哈哈,别着急,咱们接下来一步步揭开redisson的面纱。

Redisson lock()源码盯梢

咱们来一步步跟着lock()办法看下源码(本地redisson版本为3.20.0)

//RedissonLock.class
@Override
public void lock() {
    try {
        lock(-1, null, false);
    } catch (InterruptedException e) {
        throw new IllegalStateException();
    }
}

检查lock(-1, null, false);办法

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        //获取当时线程id
        long threadId = Thread.currentThread().getId();
        //加锁代码块, 回来锁的失效时刻
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }
        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
        pubSub.timeout(future);
        RedissonLockEntry entry;
        if (interruptibly) {
            entry = commandExecutor.getInterrupted(future);
        } else {
            entry = commandExecutor.get(future);
        }
        try {
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }
                // waiting for message
                if (ttl >= 0) {
                    try {
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        entry.getLatch().acquire();
                    } else {
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(entry, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

咱们看下它是怎么上锁的,也就是tryAcquire办法

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    //真假加锁办法 tryAcquireAsync
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime > 0) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        //waitTime和leaseTime都是-1,所以走这儿   
        //过期时刻internalLockLeaseTime初始化的时候赋值commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();
        //跟进去源码发现默许值是30秒, private long lockWatchdogTimeout = 30 * 1000;
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
    ttlRemainingFuture = new CompletableFutureWrapper<>(s);
    //加锁成功,开启子线程进行续约
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                //假如指定了过期时刻,则不续约
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                //没指定过期时刻,或许小于0,在这儿完成锁主动续约
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

上面代码里边包括加锁和锁续约的逻辑,咱们先来看看加锁的代码

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if ((redis.call('exists', KEYS[1]) == 0) " +
                        "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

这儿就看的很理解了吧,redisson运用了lua脚本来确保了指令的原子性。
redis.call(‘hexists’, KEYS[1], ARGV[2]) 检查 key value 是否存在。

Redis Hexists 指令用于检查哈希表的指定字段是否存在。
假如哈希表含有给定字段,回来 1 。 假如哈希表不含有给定字段,或 key 不存在,回来 0 。

127.0.0.1:6379> hexists 123 uuid
(integer) 0
127.0.0.1:6379> hincrby 123 uuid 1
(integer) 1
127.0.0.1:6379> hincrby 123 uuid 1
(integer) 2
127.0.0.1:6379> hincrby 123 uuid 1
(integer) 3
127.0.0.1:6379> hexists 123 uuid
(integer) 1
127.0.0.1:6379> hgetall 123
1) "uuid"
2) "3"
127.0.0.1:6379>

当key不存在,或许现已含有给定字段(也就是现已加过锁了,这儿是为了完成重入性),直接对字段的值+1
这个字段的值,也就是ARGV[2], 取得是getLockName(threadId)办法,咱们再看看这个字段的值是什么

    protected String getLockName(long threadId) {
        return id + ":" + threadId;
    }
    public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.id = commandExecutor.getServiceManager().getId();
        this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();
        this.entryName = id + ":" + name;
    }
    //commandExecutor.getServiceManager() 的id默许值
    private final String id = UUID.randomUUID().toString();

这儿就理解了,字段名称是 uuid + : + threadId

接下来咱们看看锁续约的代码scheduleExpirationRenewal(threadId);

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    //判别该实例是否加过锁
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        //重入次数+1
        oldEntry.addThreadId(threadId);
    } else {
        //第一次加锁
        entry.addThreadId(threadId);
        try {
            //锁续约核心代码
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                //假如线程反常终止,则封闭锁续约线程
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

咱们看看renewExpiration()办法

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    //新建一个线程执行
    Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            //设置锁过期时刻为30秒
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock {} expiration", getRawName(), e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                //检查锁是还否存在
                if (res) {
                    // reschedule itself 10后调用自己
                    renewExpiration();
                } else {
                    //封闭续约
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    //留意上行代码internalLockLeaseTime / 3,
    //internalLockLeaseTime默许30s,那么也就是10s检查一次
    ee.setTimeout(task);
}
//设置锁过期时刻为internalLockLeaseTime  也就是30s  lua脚本确保原子性
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

OK,剖析到这儿咱们现已知道了,lock(),办法会默许加30秒过期时刻,并且开启一个新线程,每隔10秒检查一下,锁是否开释,假如没开释,就将锁过期时刻设置为30秒,假如锁现已开释,那么就将这个新线程也关掉。

咱们写个测验类看看

package com.fandf.test.redis;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/**
 * @Description:
 * @author: fandongfeng
 * @date: 2023-3-2416:45
 */
@SpringBootTest
class RedissonTest {
    @Resource
    private RedissonClient redisson;
    @Test
    public void watchDog() throws InterruptedException {
        RLock lock = redisson.getLock("123");
        lock.lock();
        Thread.sleep(1000000);
    }
}

检查锁的过期时刻,及是否续约

127.0.0.1:6379> keys *
1) "123"
127.0.0.1:6379> ttl 123
(integer) 30
127.0.0.1:6379> ttl 123
(integer) 26
127.0.0.1:6379> ttl 123
(integer) 24
127.0.0.1:6379> ttl 123
(integer) 22
127.0.0.1:6379> ttl 123
(integer) 21
127.0.0.1:6379> ttl 123
(integer) 20
127.0.0.1:6379> ttl 123
(integer) 30
127.0.0.1:6379> ttl 123
(integer) 28
127.0.0.1:6379>

咱们再改改代码,看看是否可重入和字段名称是否和咱们预期一致

package com.fandf.test.redis;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/**
 * @Description:
 * @author: fandongfeng
 * @date: 2023-3-24 16:45
 */
@SpringBootTest
class RedissonTest {
    @Resource
    private RedissonClient redisson;
    @Test
    public void watchDog() throws InterruptedException {
        RLock lock = redisson.getLock("123");
        lock.lock();
        lock.lock();
        lock.lock();
        //加了三次锁,此刻重入次数为3
        Thread.sleep(3000);
        //解锁一次,此刻重入次数变为3
        lock.unlock();
        Thread.sleep(1000000);
    }
}
127.0.0.1:6379> keys *
1) "123"
127.0.0.1:6379>
127.0.0.1:6379> ttl 123
(integer) 24
127.0.0.1:6379> hgetall 123
1) "df7f4c71-b57b-455f-acee-936ad8475e01:12"
2) "3"
127.0.0.1:6379>
127.0.0.1:6379> hgetall 123
1) "df7f4c71-b57b-455f-acee-936ad8475e01:12"
2) "2"
127.0.0.1:6379>

咱们加锁了三次,重入次数是3,字段值也是 uuid+:+threadId,和咱们预期结果是一致的。

Redlock算法

redisson是根据Redlock算法完成的,那么什么是Redlock算法呢?

假设当时集群有5个节点,那么运转redlock算法的客户端会一次执行下面过程

  • 1.客户端记载当时体系时刻,以毫秒为单位
  • 2.顺次测验从5个redis实例中,运用相同key获取锁
    当redis请求获取锁时,客户端会设置一个网络连接和响应超时时刻,避免由于网络故障等原因导致阻塞。
  • 3.客户端运用当时时刻减去开始获取锁时刻(过程1的时刻),得到获取锁消耗的时刻
    只有当半数以上redis节点加锁成功,并且加锁消耗的时刻要小于锁失效时刻,才算锁获取成功
  • 4.假如获取到了锁,key的真实有效时刻等于锁失效时刻 减去 获取锁消耗的时刻
  • 5.假如获取锁失利,所有的redis实例都会进行解锁
    防止由于服务端响应音讯丢掉,可是实践数据又增加成功导致数据不一致问题

这儿有下面几个点需求留意:

  • 1.咱们都知道单机的redis是cp的,可是集群情况下redis是ap的,所以运转Redisson的节点必须是主节点,不能有从节点,防止主节点加锁成功未同步从节点就宕机,而客户端却收到加锁成功,导致数据不一致问题。
  • 2.为了提高redis节点宕机的容错率,能够运用公式2N(n指宕机数量)+1,假设宕机一台,Redisson还要继续运转,那么至少要部署2*1+1=3台主节点。