for update
select column from table where column = ... for update
在select的sql上加上for update会对此记录加上行级锁,在超时,提交,回滚会进行开释。
缺陷
- 当恳求等候锁开释时,不能灵敏的操控加锁时刻、等候锁的时刻
- 假如在一个事务中,开始的时分就运用for update的话,则需要这个事务履行完提交或回滚才能够解锁,不能很好的操控锁的粒度,并发性会降低。
- 在Repeatable Read的隔离级别下有可能会产生死锁。www.cnblogs.com/micrari/p/8…
项目中的redis锁
public ResultMap<IDCardOCRVo> IDCardOCR(IDCardOCRDto dto){
//部分省掉。。。
//经过redis防重提交
Boolean ifAbsent = stringRedisTemplate.opsForValue().setIfAbsent(userId, "1");
if (ifAbsent) {
stringRedisTemplate.expire(userId, 15, TimeUnit.SECONDS);
}else {
throw new BusinessException(ResultCode.NOT_FREQUENTLY_OPERATE);
}
}
假如履行到if (ifAbsent)服务挂掉,那么这个userId就会一向存在redis中,其他恳求一向获取不到,相当于死锁。
Redisson
地址
github.com/redisson/re…
特点
Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格结构, 充分利用 Redis 键值数据库供给的一系列优势, 基于 Java 实用工具包中常用接口, 为运用者供给了 一系列具有分布式特性的常用工具类
- 指定一个 key 作为锁标记,存入 Redis 中,指定一个 仅有的用户标识 作为 value。
- 当 key 不存在时才能设置值,确保同一时刻只有一个客户端进程取得锁,满意 互斥性 特性。
- 设置一个过期时刻,防止因体系反常导致没能删除这个 key,满意 防死锁 特性。
- 当处理完事务之后需要铲除这个 key 来开释锁,铲除 key 时需要校验 value 值,需要满意 只有加锁的人才能开释锁。
- WatchDog 机制 能够很好的解决锁续期的问题,防备死锁。
- 能够灵敏的设置加锁时刻,等候锁时刻,开释锁失利后锁的存在时刻。
流程图
原理
构建进程
org.redisson.Redisson#getLock
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
//异步处理的指令履行器
this.commandExecutor = commandExecutor;
//生成仅有id
this.id = commandExecutor.getConnectionManager().getId();
//锁存活时刻,默许30s
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
//将id和事务key拼接,作为实际的key
this.entryName = id + ":" + name;
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
加锁进程
org.redisson.RedissonLock#lock()
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
咱们直接调用的lock办法,这时leaseTime为-1,不履行if分支。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
这时leaseTime为默许的30s,这段lua的履行是要点:
- 首要呢,他先用exists指令判别了待获取锁的key anyLock 存不存在,假如不存在,就运用hset指令将锁key testlock作为key的map结构中存入一对键值对,4afd01d9-48e8-4341-9358-19f0507a9dcc:397 1
- 一起还运用了pexpire指令给anyLock设置了过期时刻30000毫秒,然后回来为空;
- 假如anyLock已经存在了,会走另一个分支,此时会判别anyLock Map中是否存在37f75873-494a-439c-a0ed-f102bc2f3204:1,假如存在的话,就调用hincrby指令自增这个key的值,而且将anyLock的过期时刻设置为30000毫秒,而且回来空。
- 假如上面俩种情况都不是,那么就回来这个anyLock的剩余存活时刻。
脚本也能够确保履行指令的原子性。然后呢就直接回来了一个RFuture ttlRemainingFuture,而且给他加了一个监听器,假如当时的这个异步加锁的过程完结的时分调用,假如履行成功,就直接同步获取一个Long类型的ttlRemaining。经过加锁的lua脚本可知,假如加锁或许重入锁成功的话会发现TTLRemaining是为null的,那么就会履行下面的这一行代码,咱们能够看到注释 锁已取得。
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
以上咱们剖析了redisson加锁的进程,总结来说,流程不复杂,代码也很直观,首要是异步经过lua脚本履行了加锁的逻辑。
看门狗机制
其间,咱们注意到了一些细节,比如 RedissonLock中的变量internalLockLeaseTime,默许值是30000毫秒,还有调用tryLockInnerAsync()传入的一个从连接管理器获取的getLockWatchdogTimeout(),他的默许值也是30000毫秒,这些都和redisson官方文档所说的watchdog机制有关,看门狗,还是很形象的描绘这一机制,那么看门狗到底做了什么,为什么怎样做呢?下面咱们就来剖析和探讨一下。
加锁成功后的问题
- 假设在一个分布式环境下,多个服务实例恳求获取锁,其间服务实例1成功获取到了锁,在履行事务逻辑的进程中,服务实例忽然挂掉了或许hang住了,那么这个锁会不会开释,什么时分开释?
- 回答这个问题,天然想起来之前咱们剖析的lua脚本,其间第一次加锁的时分运用pexpire给锁key设置了过期时刻,默许30000毫秒,由此来看假如服务实例宕机了,锁终究也会开释,其他服务实例也是能够持续获取到锁履行事务。可是要是30000毫秒之后呢,要是服务实例1没有宕机可是事务履行还没有完毕,所开释掉了就会导致线程问题,这个redisson是怎样解决的呢?这个就一定要完结主动延伸锁有效期的机制。
之前,咱们剖析到异步履行完lua脚本履行完结之后,设置了一个监听器,来处理异步履行完毕之后的一些作业
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
- 首要,会先判别在expirationRenewalMap中是否存在了entryName,这是个map结构,首要还是判别在这个服务实例中的加锁客户端的锁key是否存在,假如已经存在了,就直接回来;第一次加锁,肯定是不存在的。
- 接下来便是搞了一个TimeTask,推迟internalLockLeaseTime/3之后履行,这儿就用到了文章一开始就提到奇妙的变量,算下来便是大约10秒钟履行一次,调用了一个异步履行的办法,renewExpirationAsync办法,也是调用异步履行了一段lua脚本
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
首要判别这个锁key的map结构中是否存在对应的4afd01d9-48e8-4341-9358-19f0507a9dcc:397,假如存在,就直接调用pexpire指令设置锁key的过期时刻,默许30000毫秒。
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
- 在上面使命调度的办法中,也是异步履行而且设置了一个监听器,在操作履行成功之后,会回调这个办法,假如调用失利会打一个错误日志并回来,更新锁过期时刻失利;
- 然后获取异步履行的结果,假如为true,就会调用自身,如此说来又会推迟10秒钟去履行这段逻辑,所以,这段逻辑在你成功获取到锁之后,会每隔十秒钟去履行一次,而且,在锁key还没有失效的情况下,会把锁的过期时刻持续延伸到30000毫秒,也便是说只要这台服务实例没有挂掉,而且没有主动开释锁,看门狗都会每隔十秒给你续约一下,确保锁一向在你手中。完美的操作。
其他实例没有取得锁的进程
这时假如有其他服务实例来测验加锁又会产生什么情况呢?或许当时客户端的其他线程来获取锁呢?很显然,肯定会阻塞住,咱们来经过代码看看是怎样做到的。还是把眼光放到之前剖析的那段加锁lua代码上。
当加锁的锁key存在的时分而且锁key对应的map结构中当时客户端的仅有key也存在时,会去调用hincrby指令,将仅有key的值自增一,而且会pexpire设置key的过期时刻为30000毫秒,然后回来nil,能够幻想这儿也是加锁成功的,也会持续去履行守时调度使命,完结锁key过期时刻的续约,这儿呢,就完结了锁的可重入性。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
那么当以上这种情况也没有产生呢,这儿就会直接回来当时锁的剩余有效期,相应的也不会去履行续约逻辑。此时一向回来到上面的办法:
假如加锁成功就直接回来,不然就会进入一个死循环,去测验加锁,而且也会在等候一段时刻之后一向循环测验加锁,阻塞住,直到第一个服务实例开释锁。对于不同的服务实例测验会获取一把锁,也和上面的逻辑相似,都是这样完结了锁的互斥。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
开释锁
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
判别当时客户端对应的仅有key的值是否存在,假如不存在就会回来nil;不然,值自增-1,判别仅有key的值是否大于零,假如大于零,则回来0;不然删除当时锁key,并回来1。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
回来到上一层办法,也是针对回来值进行了操作,假如回来值是1,则会去撤销之前的守时续约使命,假如失利了,则会做一些相似设置状态的操作。
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
现在来说,redis分布式锁,redisson去加锁,也便是去redis集群中挑选一台master实例去完结锁机制,而且能由于一台master可能会挂载多台slave实例,这样也就完结了高可用性。可是呢,不得不去考虑,假如master和salve同步的进程中,master宕机了,偏偏在这之前某个服务实例刚刚写入了一把锁,这时分就为难了,salve还没有同步到这把锁,就被切换成了master,那么这时分能够说就有问题了,另一个服务实例在新的master上获取到一把新锁,这时分就会呈现俩台服务实例都持有锁,履行事务逻辑的场景,这个是有问题的。也是在出产环境中咱们需要去考虑的一个问题。
参考资料
- blog.csdn.net/ice24for/ar…
- blog.csdn.net/ice24for/ar…
- mp.weixin.qq.com/s?__biz=MzU…