首要咱们一起回忆一些并发的场景

首要最基本的,咱们要弄清楚什么的并发嘞?我简单粗暴的了解便是:一段代码,在同一时间段内,被多个线程一起处理的状况便是并发现象。下面简单画了个图:

分享一些对于并发问题的处理思路
那么只要是并发现象就需求咱们进行并发处理吗?那肯定不是滴。咱们就拿咱们都能了解的订单事务来举例,比方说下面两种简单的场景:

  1. 关于C端事务来讲,基本上是由一串随机的序列号组成,能够为UUID、数字串、年月日商户(加密)+随机仅有序列号等等方法。这样的目的也是为了确保商户订单量的安全,防止他人去进行恶意分析。
  2. 关于B端事务来讲,基本上都是由商户+年月日+次序递加序列号的方法组成。这样便利客户方进行订单的汇总以及后期的追溯事务。

以上两种场景的差异基本上便是随机仅有序列号和次序递加序列号的差异。伪代码如下:

public void addOrder() {
    // 1.获取当时年月日以及商户标识
    String currentDate = "yyyyMMddHHmmss";
    String businessman = "商户标识";
    // 2.获取获取序列号
    long index = getIndex();
    // 3.拼接订单号
    String orderNum = businessman + currentDate + index;
    // 4.生成订单
    save(订单对象);
}

那么关于C端的随机仅有序列号来讲,我以为肯定是没必要进行并发操控的,只要写一个生成随机仅有序列号的算法就好了,这样生成出来的订单号必定是仅有的。

public String getIndex() {
    // 依据算法生成仅有序列号
    return buildIndexUtils.build();
}

但关于B端的次序递加序列号来讲,就需求进行并发操控了。由于既然要确保次序递加,我在生成当时序列号的一起就必定需求之前上一个单子的序列号是什么,因此我就必定需求一个地方去存储这个序列号。伪代码如下:

public String getIndex() {
    // 1.获取当时商户、当时单据已生成的最大序列号
    Integer index = dao.getIndex(商户, 单据) + 1;
    // 2.序列号 + 1
    index = index++;
    // 3.修正当时商户、当时单据已生成的最大序列号
    dao.update(商户, 单据, index);
    // 4.回来序列号
    return index + "";
}

此刻假如事务为可重复读,Thread1敞开事务并获取并修正序列号,此刻在Thread1未提交事务之前Thread2敞开事务并获取序列号。此刻两个线程获取到的序列号必定是一致的,这样就会呈现订单号重复的问题。

假如更换阻隔等级呢?是否能够解决这个问题?

  1. 读已提交?相同假如在Thread1提交事务之前Thread2就履行完第一步获取最大序列号呢?相同有问题。
  2. 读未提交?相同的呀,在两个Thread都履行完第一步,但没有履行update的状况。
  3. 串行化?那就和加同步锁没啥差异的,而且是堵塞式的。一堆恳求占用数据库衔接堵塞在这儿,假如呈现资源耗尽的状况就比较严重了。
  4. 不用事务?这个假如遇到2中的场景也相同的。

那么加锁呢?

  1. 单机环境下咱们能够挑选Synchronized或Lock来进行处理。众所周知,JDK1.6之后就对Synchronized进行了改善,不再是单纯的堵塞,而是先进行自旋处理,在必定程度上也到达了自旋节省资源的效果。可是Synchronized或Lock仍是要依据实践状况来进行处理的。假如咱们为了省劲而运用Synchronized对事务代码进行加锁的话,首要咱们要确保防止长事务的呈现,不然响应超时了,而事务还没有释放,那就比较严重了,异常状况堪比锁表。
  2. 分布式环境下咱们能够依赖Redis或Zookeeper来完成分布式锁。这儿需求留意的是,假如要依赖Redis完成的话,尽或许确保Redis选用单实例或分片集群的方法进行布置。主从的布置方法在某种极端状况下呈现节点宕机时会导致误判的状况。毕竟Redis是AP性质的。
  3. 还能够经过数据库来完成,比方经过select for update来完成行锁、经过version字段完成达观锁、增加仅有束缚的方法。首要select for update完成行锁和上面的串行化事务不同不大,都是数据库衔接的堵塞,不主张运用。而达观锁和仅有束缚的计划更适用于作为一个保底计划,不然人家并发恳求的时候只有一个恳求能成功,其他的都失败。这样的用户体会也不好。

最终咱们能得出一个结论。是否进行并发操控要依据该并发操作是否会造成数据安全问题来决定的。好了,下面向咱们共享一些在学习作业中关于并发问题的处理思路

由于恳求重试导致的并发安全问题

在与第三方体系交互或许微服务内部跨模块交互时,咱们一般会选用HTTP或RPC等方法,并设置最大恳求时间以及重试次数。由于咱们绝对不允许由于下流服务的异常问题而拖累当时服务的正常运转。而一般状况下,最大恳求时间也是依据两个服务之间的实践事务以及下流接口进行多次测验而设定的,一般来说不会随意的呈现恳求超时的状况。可是一旦下流事务的接口由于某种原因(比方网络卡顿或许呈现功率问题)导致恳求超时的状况,就很有或许由于上游服务的重试而导致下流服务数据重复的问题。

这种状况从本质上来说也便是个重复消费的问题。咱们只需求两边合作做好幂等就好了。

  1. 首要,假如涉及到前端,比方说点击前端的按钮触发事务而且调用下流服务的事务。这个时候既要考虑前端重复提交也要考虑后端的重复发送以及重复消费问题。前端最常用的方法便是做一个进度条或进行防抖处理,防止一个用户频频点击按钮。

    那么假如是多个用户一起提交同一条数据呢?这个状况主要是在B端事务中呈现,比方说多个用户均具有这条数据的修正权限,此刻也并发点击按钮提交了这条数据。一般来说,这种状况呈现的概率仍是极少数的,也不会有多少并发量。因此咱们直接选用数据库的达观锁进行保底操控就好了,只允许一个人操作成功,其他人操作失败并提示该数据已被修正。

/**
 * @param id     数据ID
 * @param status 数据的状况
 */
public void update(Long id, Integer status) {
    // 1.依据ID查询数据
    PO po = dao.select(id);
    // 2.判别数据的状况是否契合修正要求(这一步主要是应对两个线程都进入Controller层,其间线程1刚好提交事务后,线程2开端事务的状况)
    if(!status.equals(po.getStatus())) {
        throw new TJCException("数据已被修正,请改写后重试");
    }
    // 3.修正数据(启用达观锁机制,主要应对线程1提交事务之前线程2敞开事务的状况)
    int i = dao.update("update table set xxx = ?, version = version + 1 where id = ? and version > ?");
    if(i == 0) {
        throw new TJCException("数据已被修正,请改写后重试");
    }
    // 持续履行下面事务
}
  1. 上游服务恳求下流服务时,在恳求头或音讯中增加音讯仅有ID。下流服务第一次接收到这个音讯后首要将音讯保存在缓存中并依据测验成果设置合理的有效期(有效期尽或许比正常恳求时间长个一两分钟就好)。这样就能够阻拦上述所说的重试导致的重复消费问题。
// 上游服务发送音讯
public void request() {
    String messageId = "xxxx";
    rpc.request(messageId, message);
}
// 下流服务消费音讯
public void consume(String messageId, String message) {
    // 将messageId存储在redis中, 单机环境也能够直接找个map去存或许存在Guava中
    Boolean flag = stringRedisTemplate.opsForValue()
                .setIfAbsent(messageId, "1", 60, TimeUnit.SECONDS);
    if(!flag) {
       log.error("重复音讯阻拦");
       return;
    }
    // 持续履行下面事务
     .....
    // 事务完成后(提交/回滚),删除标识
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCompletion(int status) {
            stringRedisTemplate.delete(messageId);
        }
    });
}

在这儿是否有小伙伴会有这样的一个疑问,假如重复发送的音讯中messageId不一致或许上游服务接口自身就被调用了多次怎么办?

(1)首要,我觉得在上游服务接口自身就被调用了多次的状况下,第一点中的第2步骤(判别数据状况)这种方法就能够把它阻拦掉。

(2)其次,假如呈现重复发送的音讯中messageId不一致的状况,我以为这就属于程序员问题了,能够不放在这儿进行考虑。假如硬要考虑的话,形似也没什么更好的方法,那就加锁吧。

次序递加订单号问题

在最初咱们经过引证这个生成订单号的例子分析了一些什么状况下需求进行并发处理问题,而且上面是选用加锁方法处理的。那么是否还有其他的方法比加锁更好一些呢?比较加锁影响吞吐量呀,哈哈。非必要状况下,我是不会进行加锁处理的,除非在定制开发的过程中,用户的要求是能用就行,那就能够偷懒了哈哈,节省时间去摸鱼!!!!

下面给咱们共享一些我常用的一种方法:Redis+Lua。咱们都知道操作内存肯定是比操作数据库要更快一些的,那么咱们能够干脆将各个单据的序列号增加到Redis中。而且订单号是依据年月日来进行重置的,所以咱们能够将序列号的过期时间设置为24小时。

伪代码如下:

// 序列号的key能够设置为(模块名:orderIndex:订单类型:yyyyMMdd)
String dateFormat = getCurrentDateFormat("yyyyMMdd");
// key
String key = 模块名 + ":" + orderIndex + ":" + 订单类型 + ":" + dateFormat;
String script = "if (redis.call('exists', KEYS[1]) == 0) then redis.call('setex', KEYS[1], ARGV[1], ARGV[2]) return 1 else return redis.call('incr', KEYS[1]) end";
DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
defaultRedisScript.setResultType(Long.class);
defaultRedisScript.setScriptText(script);
long count = stringRedisTemplate.execute(defaultRedisScript, Arrays.asList(key), (3600 * 24) + "", "1");

咱们都清楚,Redis多指令履行是没方法确保原子性的。所以咱们要凭借Lua脚本将多个Redis履行以脚本的方法履行来确保多指令履行的原子性,再合作Redis根据内存以及单线程履行指令的优势,能够代替锁来赋予功能更大的吞吐量。

计数计算问题

在作业中我还做过这样一个需求。首要经过音讯队列接收、主动拉取数据源的方法获取用户在实践事务中产生的源数据并依据设置的规矩比对校验生成契合条件的数据保存在数据库中。而且对经过各个维度对生成的数据进行计数计算并推送下流单据。

比方说其间有一个计算维度为“在各个班的作业时间内,依据次数计算契合条件的数据并汇总推送下流单据”。那么要做这项事务,首要咱们要对各个班的数据进行分别计数,当时班开端作业时同步敞开计数,结束作业时中止计数,当计数器到达设置的规范后,将这些数据进行计算处理后推送下流单据。

依据上面的事务,一般来说有两种方法解决:

  1. 将班、计数量、数据ID等数据存储在数据库中,并对获取数据、处理数据、计数、推送下流单据等操作一致加锁进行处理,确保数据计数的准确性。
  2. 依然是经过Redis+Lua的方法进行处理。

最终经过实践的事务分析决定选用Redis+Lua的方法进行处理。只不过这次的Lua要写相对复杂的事务了。

伪代码如下:

/**
 * @param indexStdId         规范ID
 * @param currentTeamClassId 班ID
 * @param dataId             数据ID
 * @param count              计数要求
 */
public List<Long> countMonitor(Long indexStdId, Long currentTeamClassId, Long 
dataId, Integer count) {
        StringBuilder countMonitorLua = new StringBuilder();
        countMonitorLua.append("if (redis.call('hget', KEYS[1], KEYS[2]) == ARGV[2]) ");
        countMonitorLua.append("then ");
        countMonitorLua.append("    if (redis.call('hget', KEYS[1], KEYS[3]) == ARGV[3]) ");
        countMonitorLua.append("    then ");
        countMonitorLua.append("        redis.call('hset', KEYS[1], KEYS[3], 0) ");
        countMonitorLua.append("        redis.call('lpush', KEYS[4], ARGV[1]) ");
        countMonitorLua.append("        local list = redis.call('lrange', KEYS[4], 0, -1) ");
        countMonitorLua.append("        redis.call('del', KEYS[4]) ");
        countMonitorLua.append("        return list ");
        countMonitorLua.append("    else ");
        countMonitorLua.append("        redis.call('lpush', KEYS[4], ARGV[1]) ");
        countMonitorLua.append("        redis.call('hincrby', KEYS[1], KEYS[3], 1) ");
        countMonitorLua.append("        return {} ");
        countMonitorLua.append("    end ");
        countMonitorLua.append("else ");
        countMonitorLua.append("    redis.call('del', KEYS[4]) ");
        countMonitorLua.append("    redis.call('lpush', KEYS[4], ARGV[1]) ");
        countMonitorLua.append("    redis.call('hset', KEYS[1], KEYS[3], 1) ");
        countMonitorLua.append("    redis.call('hset', KEYS[1], KEYS[2], ARGV[2]) ");
        countMonitorLua.append("    if (redis.call('hget', KEYS[1], KEYS[3]) == ARGV[4]) ");
        countMonitorLua.append("    then ");
        countMonitorLua.append("        redis.call('hset', KEYS[1], KEYS[3], 0) ");
        countMonitorLua.append("        local list2 = redis.call('lrange', KEYS[4], 0, -1) ");
        countMonitorLua.append("        redis.call('del', KEYS[4]) ");
        countMonitorLua.append("        return list2 ");
        countMonitorLua.append("    else ");
        countMonitorLua.append("        return {} ");
        countMonitorLua.append("    end ");
        countMonitorLua.append("end ");
        DefaultRedisScript<List> defaultRedisScript = new DefaultRedisScript<>();
        defaultRedisScript.setResultType(List.class);
        defaultRedisScript.setScriptText(countMonitorLua.toString());
        List<String> keys = new ArrayList<>();
        keys.add(COUNTMONITOR_HASH.replace("${indexStd}", indexStdId.toString()));
        keys.add(COUNTMONITOR_HASH_CURRENTTEAMCLASSID);
        keys.add(COUNTMONITOR_HASH_COUNT);
        keys.add(COUNTMONITOR_LIST.replace("${indexStd}", indexStdId.toString()));
        List dataIdList = stringRedisTemplate.execute(defaultRedisScript, keys, gapDataId.toString(), currentTeamClassId.toString(), (count - 1) + "", count + "");
        List<Long> collect = null;
        if(!gapDataIdList.isEmpty()) {
            collect = (List<Long>) gapDataIdList.stream().map(o -> Long.valueOf(o.toString())).collect(Collectors.toList());
        }
        return collect;
    }

以上代码是依据我实践的事务代码改编成的伪代码,这个段代码没必要看懂哈,首要是伪代码,其实这个事务比较复杂,我也没写注释。更多的仍是共享一下优化的处理思路:

首要计数量是由客户定的,能够设置的很小也能够设置的很大。由于这一点考虑,我将计数分成的两部分,一个是String类型的key做计数器,一个是List类型的key用来记录正在被计数的数据ID。这个List有或许是一个大key。所以咱们不会去频频的读取它的数量进行判别,而是经过读取这个String类型的计数器来校验计数。当计数契合条件后就将List取出来。这样做的优点是节省了频频读取大key的耗时(实践上Redis读取大Key是十分耗时的,咱们在实践开发中要时间留意这一点)。

总结

总体来说,优化并发问题本质上便是经过优化各种恳求的耗时(例如事务的耗时、数据库衔接的耗时、http/rpc的耗时)来提高功能的吞吐量,到达用最少的资源浪费处理更多的工作。

我处理并发问题的思路总体上也便是经过同步锁、数据库锁以及仅有束缚、Redis单线程的天然优势这三点上进行归纳考虑,挑选中更适合事务场景的一种处理方法。实践上退一万步说,关于一些B端的事务,用户的需求只是能用就行,那咱们做定制开发的小伙伴们就直接一个锁就解决问题了,这样何乐而不为呢?还能节省出更多的摸鱼时间!哈哈!!!

但关于做通用产品来说,仍是要尽或许的考虑更大的吞吐量。有的小伙伴或许有有疑问,Redis一般的运用规范不是只允许寄存那些查询频率十分高的热门数据吗?嗯,那是关于大多数C端互联网项目而言的。而B端项目普遍事务要愈加的复杂,而在这个基础上咱们要想追求更大的吞吐量,其有用一用Redis也未尝不可哈。毕竟B端的QPS相比于C端来说要根本不在一个数量级。就算是偶尔呈现几个大Key,能有什么关系呢,只要咱们规划的谨慎一点,能够把控整体的资源就好啦。