作者:京东科技 张石磊

1 案例引入

名词简介:

资源:可以理解为一条内容,或者图+文字+链接的载体。

档位ID: 资源的分类组,资源必须归归于档位。

问题描述:当同一个档位下2条资源同时批阅经过时,收到擎天批阅体系2条音讯,消费者应用布置了2台机器,此刻正好由2台机器别离消费,在并发消费时,先更新资源状况,然后写缓存,每次取前100条资源,相似select * from resource where gear_id=xxx limit 100 order by id desc;

在写档位缓存,此刻事务未提交,并发查询时依据档位Id查询时查询不到对方的数据,全量写缓存时导致后写的缓存覆盖了先写的缓存,即缓存被覆盖,导致投进资源缺失。

计划考虑 :

计划1:一台机器消费mq–单点问题

计划2:将同档位ID的资源路由到同一个queue,需求批阅体系合作依据档位Id做路由,批阅体系发的音讯不只是cms批阅数据,此计划不适用。

计划3:在档位等级加分布式锁。

经比较,终究采用计划3是适宜的计划.

2 锁说明和分布式锁选择

synchronized锁的粒度是JVM进程维度,集群形式下,不能对共享资源加锁,此刻需求跨JVM进程的分布式锁。

分布式锁方法中心完成方法长处缺点剖析

1 数据库:

失望锁,lock

达观锁,经过版别号完成version

完成简单,不依赖中间件

数据库IO瓶颈,功能差

单实例存在单点问题,主从架构存在数据不一致,主从切换时其他客户端可重复加锁。

2 zookeeper

创立暂时节点

CP模型,可靠性高,不存在主从切换不一致问题

频繁创立和销毁暂时节点,且

集群形式下,leader数据需求同步到follower才算加锁成功,功能不如redis

主从切换服务不可用

3 redis集群

setnx+expire

功能高

有封装好的框架redission

支撑超时自动删去锁

集群支撑高可用,AP模型

主从切换时其他客户端可重复加锁。

R2M是基于开源的Redis cluster(Redis 3.0以上版别)构建的高功能分布式缓存体系,咱们体系一直在运用,3.2.5版别开端支撑分布式锁。

3 r2m分布式锁原理

示例代码:

String lockKey = CacheKeyHelper.getGearLockKey(EnvEnum.getEnvFlagEnum(envCode),resource.getGearId());
R2mLock lock = (R2mLock) r2mClusterClient.getLock(lockKey);
//获取锁,锁的默许有效期30s,获取不到锁就阻塞
lock.lock();
try {
    //事务代码
    resourceService.afterApprovedHandle(resource);
}  finally {
    //开释锁
    lock.unlock();
}

1 加锁中心流程:

加锁流程图

一文理清R2M分布式锁原理底层逻辑

1):测验获取锁,经过履行加锁Lua脚原本做;

2):若第一步未获取到锁,则去redis订阅解锁音讯

3):一旦持有锁的线程开释了锁,就会播送解锁音讯,其他线程自旋重新测验获取锁。

中心加锁原理:运用lua脚本封装了hset和pexpire命令,确保是一个原子操作, KEYS[1]是加锁的key,argv[2]是加锁的客户端ID(UUID+线程ID),ARGV[1]是锁的有效期,默许30s.

private Object acquireInternal(List<String> args) {
if (!this.setLocked() && this.getHolderId() != Thread.currentThread().getId()) {
return -1L;
} else {
try {
//hash结构,hash的key是加锁的key,键值对的key为客户端的UUID+线程id,value为锁的重入计数器值。
return this.lockSha() != null ? this.executor.evalR2mLockSha(this.lockSha(),
"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
return -2;", Collections.singletonList(this.lockName), args) : this.executor. == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return -2;", Collections.singletonList(this.lockName), args);
} catch (Exception var3) {
this.setUnlocked();
throw new R2mLockException("Failed to acquire lock " + this.lockName + ".", var3);
}
}
}

args参数

private List<String> acquireArgs(long leaseTime) {
        List<String> args = new ArrayList();
        if (leaseTime != -1L) {
            args.add(String.valueOf(leaseTime));
        } else {
             //默许30s
            args.add(String.valueOf(this.internalLockLeaseTime()));
        }
        //UUID+当时线程id
        args.add(this.currentThreadLockId(Thread.currentThread().getId()));
        return args;
    }


获取锁失利订阅锁的channel

//获取锁失利,订阅开释锁的音讯
 private boolean failedAcquire() {
            this.subLock();
            return false;
        }
 private void subLock() {
            CompletableFuture<Void> cf = R2mLock.this.executor.lockSubscribe(R2mLock.this.lockPubSub(), R2mLock.this.getLockChannelName(), R2mLock.this);
            if (cf != null) {
                cf.handleAsync(this::reSubIfEx);
            }
        }

锁开释后,订阅者经过自旋测验获取锁。

//tryAcquire获取锁,!tryAcquire便是获取锁失利,锁开释后,通知线程唤醒后回来false,然后经过自旋,测验获取锁,
public final void acquire(long arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
 final boolean acquireQueued(final Node node, long arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //内部自旋获取锁
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

2 开释锁中心逻辑:

1)删去分布式锁key(假如可重入锁计数为0)

  1. 发开释锁的播送音讯

3)撤销watchdog

private Object unlockInternal(List<String> args) {
        logger.debug("{} trying to unlock.", Thread.currentThread().getId());
        Object var2;
        try {
     //判别锁 key 是否存在,假如存在,然后递减hash的value值,当value值为0时再删去锁key,并且播送开释锁的音讯
            if (this.unlockSha() == null) {
                var2 = this.executor. == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);
                return var2;
            }
            var2 = this.executor.evalR2mLockSha(this.unlockSha(), "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);
        } catch (Exception var6) {
            throw new R2mLockException("Failed to unlock " + this.lockName + ".", var6);
        } finally {
            this.finalizeRelease();
        }
        return var2;
    }
//撤销当时线程的watchdog
 private void finalizeRelease() {
        long threadId = Thread.currentThread().getId();
        R2mLock.ExpirableEntry entry = (R2mLock.ExpirableEntry)this.entryCache.get(threadId);
        if (entry != null) {
            entry.release(threadId);
            if (entry.isReleased()) {
                //撤销这个线程watchdog定时使命
                entry.getExtTask().cancel();
                this.expEntry.compareAndSet(entry, (Object)null);
                //从缓存watchdog线程的map中删去该线程
                this.entryCache.remove(threadId);
            }
        }
    }

3 锁的健壮性考虑

1 事务没履行完,锁超时过期怎么办?

客户端加锁默许有效期30s,超越有效期后假如事务没履行完,还需求持有这把锁,r2m客户端提供了续期机制,也便是watchdog机制。

watchdog原理:客户端线程维度(UUID+线程ID,客户端保护一个MAP,key便是UUID+线程ID)的后台定时线程,获取锁成功后,假如客户端还持有当时锁,每隔10s(this.internalLockLeaseTime() / 3L),去延伸锁的有效期(internalLockLeaseTime)

//watchdog中心机制 ,internalLockLeaseTime默许30s
private void extendLock(long holderId) {
        if (this.expEntry.get() != null) {
            R2mLock.ExpirableEntry holderEntry = (R2mLock.ExpirableEntry)this.entryCache.get(holderId);
            if (holderEntry != null) {
                 //每隔10s,假如当时线程持有锁,则续期30s
                if (this.expEntry.compareAndSet(holderEntry, holderEntry)) {
                    Timeout task = this.timer().newTimeout((timeout) -> {
                        if (this.extendLockInternal(holderId)) {
                            this.extendLock(holderId);
                        }
                    }, this.internalLockLeaseTime() / 3L, TimeUnit.MILLISECONDS);
                    if (this.expEntry.get() != null) {
                        ((R2mLock.ExpirableEntry)this.expEntry.get()).setExtTask(task);
                    }
                }
            }
        }
    }
 //履行续期lua脚本
 private boolean extendLockInternal(long threadId) {
        Object result;
        try {
           //只续期
            if (this.extendLockSha() != null) {
                result = this.executor.evalR2mLockSha(this.extendLockSha(), "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));
            } else {
                result = this.executor. == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));
            }
        } catch (Exception var5) {
            return false;
        }
        return Long.parseLong(result.toString()) == 1L;
    }

2 客户端宕机,锁如何开释?

分布式锁是有效期的,客户端宕机后,watchdog机制失效,锁过期自动失效。

3 redis分布式锁集群形式下缺点

r2m集群形式,极点状况,master加锁成功,宕机,还未来得及同步到slave,主从切换,slave切换成master,可以继续加锁,对于非及其严厉加锁场景,该计划可满意,归于AP;对于严厉场景下的分布式锁,可采用基于zookeeper的分布式锁,归于CP,leader宕机,folllower推举时不可用。功能上redis更优。

4 锁的开释问题

留意锁的开释在finally中开释,必须由锁的持有者开释,不能由其他线程开释他人的锁,示例代码中lock放到try的外面。