布景

众所周知,数据库单行并发写才能极为有限,比如 MySQL单行并发写大概在300~500TPS之间。所以,将数据分桶存储能够线性提升并发写入才能。分桶处理的是单个数据库的并发才能

分桶模型

秒杀中的分桶策略 —— 提高单台数据库的性能

每一个咱们能够当作单个数据库中的一行记载,将原来的一行记载存储100件库存,变为用5行记载别离存储20件库存。在对库存进行操作的时分,就能够通过对用户ID取模,确认该用户操纵的是那一行记载,然后进步单个数据库的并发才能

秒杀场景中,咱们要对库存的数量进行缓存,所以也要对缓存进行分桶。每一个桶当作Redis中的一个记载即可。

缓存中的数据相当于库存的预扣减,预扣减成功那么就让该请求去修改数据库,失利直接拒绝该请求即可。这里尽量坚持缓存数据(弱共同)与数据库中的数据(强共同)的共同性,但是缓存和数据库分桶之间的关系是一定要确保的。

分桶规划与完成

分桶编列思路

在整个分桶编列的过程,有几个重要的点:

  1. 在进行分桶编列之前,要先暂停分桶服务,设置为维护状况,此时用户无法下单
  2. 暂停分桶服务时,有必要运用独立业务手动提交,确保在继续执行分桶前,分桶状况已经提交到数据库;
  3. 分桶保存到数据库后,应同步数据到缓存中;
  4. 全量和增量:全量分桶意味着将当时传入的库存总量作为最终总量,从头计算分桶数据;而增量分桶则是将传入的库存总量累加到已有的库存中,然后再从头计算分桶数据;
  5. 有无前史分桶数据:假如此前已有分桶数据,那么在分桶时则要先进行库存收回,随后再统一分配;假如此前无分桶数据,则直接创立新的分桶集;
  6. 分桶中出现任何反常应抛出以触发业务回滚
  7. 不管分桶成功或失利,最终都要从头翻开分桶服务,即撤销分桶维护状况,不然秒杀品将无法售卖;

分桶编列代码完成

分桶编列代码

    public void arrangeStockBuckets(Long itemId, Integer totalStocksAmount, Integer bucketsQuantity, Integer arrangementMode) {
        logger.info("arrangeBuckets|预备库存分桶|{},{},{}", itemId, totalStocksAmount, bucketsQuantity);
        if (itemId == null || totalStocksAmount == null || totalStocksAmount < 0 || bucketsQuantity == null || bucketsQuantity <= 0) {
            throw new StockBucketException(ErrorCode.INVALID_PARAMS);
        }
        // 确保只要一个线程对itemId进行更新
        DistributedLock distributedLock = distributedLockFactoryService.getDistributedLock(ITEM_STOCK_BUCKETS_SUSPEND_KEY + itemId);
        try {
            boolean tryLock = distributedLock.tryLock(5, 5, TimeUnit.SECONDS);
            if (!tryLock) {
                logger.info("arrangeStockBuckets|获取锁失利|{}", itemId);
                return;
            }
            // 手动添加业务
            TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
            try {
                // 设置为禁用状况
                logger.info("suspendBuckets|禁用库存分桶|{}", itemId);
                int updateStatusByItemId = seckillBucketMapper.updateStatusByItemId(itemId, SeckillBucketStatus.DISABLED.getCode());
                if (updateStatusByItemId < 0) {
                    logger.info("arrangeBuckets|关闭库存分桶失利|{}", itemId);
                    throw new StockBucketException(ErrorCode.ARRANGE_STOCK_BUCKETS_FAILED);
                }
                logger.info("suspendBuckets|库存分桶已禁用|{}", itemId);
                dataSourceTransactionManager.commit(transactionStatus);
            } catch (Exception e) {
                logger.info("arrangeBuckets|关闭分桶失利回滚中|{}", itemId, e);
                dataSourceTransactionManager.rollback(transactionStatus);
            }
            List<SeckillBucket> seckillBuckets = seckillBucketMapper.selectByItemId(itemId);
            if (seckillBuckets == null || seckillBuckets.size() == 0) {
                initStockBuckets(itemId, totalStocksAmount, bucketsQuantity);
                return;
            }
            // 依据总量分桶
            if (ArrangementMode.isTotalAmountMode(arrangementMode)) {
                arrangeStockBucketsBasedTotalMode(itemId, totalStocksAmount, bucketsQuantity, seckillBuckets);
            }
            // 依据增量分桶
            if (ArrangementMode.isIncrementalAmountMode(arrangementMode)) {
                rearrangeStockBucketsBasedIncrementalMode(itemId, totalStocksAmount, bucketsQuantity, seckillBuckets);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

构建(初始化)分桶代码:

   private void initStockBuckets(Long itemId, Integer totalStocksAmount, Integer bucketsQuantity) {
        SeckillBucket primaryBucket = new SeckillBucket()
                .initPrimary()
                .setItemId(itemId)
                .setTotalStocksAmount(totalStocksAmount);
        List<SeckillBucket> presentBuckets = buildBuckets(itemId, totalStocksAmount, bucketsQuantity, primaryBucket);
        submitBucketsToArrange(itemId, presentBuckets);
    }
    private List<SeckillBucket> buildBuckets(Long itemId, Integer totalStocksAmount, Integer bucketsQuantity, SeckillBucket primaryBucket) {
        if (itemId == null || totalStocksAmount == null || bucketsQuantity == null || bucketsQuantity <= 0) {
            throw new StockBucketException(ErrorCode.INVALID_PARAMS);
        }
        List<SeckillBucket> seckillBucketList = new ArrayList<>();
        Integer averageStockAmount = totalStocksAmount / bucketsQuantity;
        Integer remainStockAmount = totalStocksAmount % bucketsQuantity;
        for (int i = 0; i < bucketsQuantity; i++) {
            if (i == 0) {
                if (primaryBucket == null) {
                    primaryBucket = new SeckillBucket();
                }
                primaryBucket
                        .setAvailableStocksAmount(averageStockAmount)
                        .setSerialNo(i)
                        .setStatus(SeckillBucketStatus.ENABLED.getCode());
                seckillBucketList.add(primaryBucket);
                continue;
            }
            SeckillBucket seckillBucket = new SeckillBucket()
                    .setSerialNo(i)
                    .setStatus(SeckillBucketStatus.ENABLED.getCode())
                    .setItemId(itemId);
            if (i < bucketsQuantity - 1) {
                seckillBucket.setAvailableStocksAmount(averageStockAmount)
                        .setTotalStocksAmount(averageStockAmount);
            }
            if (i == bucketsQuantity - 1) {
                Integer restAvailableStocksAmount = averageStockAmount + remainStockAmount;
                seckillBucket.setAvailableStocksAmount(restAvailableStocksAmount)
                        .setTotalStocksAmount(restAvailableStocksAmount);
            }
            seckillBucketList.add(seckillBucket);
        }
        return seckillBucketList;
    }

存储入缓存和数据库代码

先入数据库再入缓存。

private void submitBucketsToArrange(Long itemId, List<SeckillBucket> presentBuckets) {
        logger.info("arrangeBuckets|编列库存分桶|{},{}", itemId, JSON.toJSONString(presentBuckets));
        if (itemId == null || itemId <= 0 || CollectionUtils.isEmpty(presentBuckets)) {
            logger.info("arrangeBuckets|库存分桶参数过错|{}", itemId);
            throw new BusinessException(ErrorCode.INVALID_PARAMS);
        }
        // 先删除再参加
        seckillBucketMapper.deleteById(itemId);
        int insertBatch = seckillBucketMapper.insertBatch(presentBuckets);
        if (insertBatch > 1) {
            // 存入缓存
            presentBuckets.forEach((seckillBucket -> {
                distributedCacheService.put(getBucketAvailableStocksCacheKey(itemId, seckillBucket.getSerialNo()), seckillBucket.getAvailableStocksAmount());
                distributedCacheService.put(getItemStockBucketsQuantityCacheKey(itemId), presentBuckets.size());
            }));
        } else {
            logger.info("submitBucketsToArrange|库存分桶过错|{}, {}", itemId, JSON.toJSONString(presentBuckets));
            throw new StockBucketException(ErrorCode.ARRANGE_STOCK_BUCKETS_FAILED);
        }
    }

依据全量分桶

private void arrangeStockBucketsBasedTotalMode(Long itemId, Integer totalStocksAmount, Integer bucketsQuantity, List<SeckillBucket> existingBuckets) {
        // 计算子桶的剩下的库存数
        int remainAvailableStocks = existingBuckets.stream()
                .filter(SeckillBucket::isSubSeckillBucket)
                .mapToInt(SeckillBucket::getAvailableStocksAmount).sum();
        Optional<SeckillBucket> optionalSeckillBucket = existingBuckets.stream().filter(SeckillBucket::isPrimarySeckillBucket).findFirst();
        if (!optionalSeckillBucket.isPresent()) {
            throw new StockBucketException(ErrorCode.PRIMARY_BUCKET_IS_MISSING);
        }
        // 收回分桶库存到主桶
        SeckillBucket primarySeckillBucket = optionalSeckillBucket.get();
        primarySeckillBucket.addAvailableStocks(remainAvailableStocks);
        // 已售出的库存
        int soldStocksAmount = primarySeckillBucket.getTotalStocksAmount() - primarySeckillBucket.getAvailableStocksAmount();
        if (soldStocksAmount > totalStocksAmount) {
            throw new StockBucketException(799, "已售库存大于当期所设库存总量!");
        }
        // 设置最新库存,从头分桶
        primarySeckillBucket.setTotalStocksAmount(totalStocksAmount);
        List<SeckillBucket> seckillBucketList = buildBuckets(itemId, totalStocksAmount, bucketsQuantity, primarySeckillBucket);
        submitBucketsToArrange(itemId, seckillBucketList);
    }

依据增量分桶

	private void rearrangeStockBucketsBasedIncrementalMode(Long itemId, Integer incrementalStocksAmount, Integer bucketsQuantity, List<SeckillBucket> existingBuckets) {
        Optional<SeckillBucket> optionalSeckillBucket = existingBuckets.stream().filter(SeckillBucket::isPrimarySeckillBucket).findFirst();
        if (!optionalSeckillBucket.isPresent()) {
            throw new StockBucketException(ErrorCode.PRIMARY_BUCKET_IS_MISSING);
        }
        // 收回分桶库存 (获取当时一切桶剩下的可用库存数)
        int remainAvailableStocks = existingBuckets.stream().mapToInt(SeckillBucket::getAvailableStocksAmount).sum();
        // 加上要添加的库存数
        Integer totalAvailableStocks = remainAvailableStocks + incrementalStocksAmount;
        int presentAvailableStocks = remainAvailableStocks + incrementalStocksAmount;
        if (presentAvailableStocks < 0) {
            throw new StockBucketException(ErrorCode.STOCK_NOT_ENOUGH);
        }
        SeckillBucket primarySeckillBucket = optionalSeckillBucket.get();
        primarySeckillBucket.increaseTotalStocksAmount(incrementalStocksAmount);
        List<SeckillBucket> seckillBucketList = buildBuckets(itemId, totalAvailableStocks, bucketsQuantity, primarySeckillBucket);
        submitBucketsToArrange(itemId, seckillBucketList);
    }

不同分桶之间的数量差异

存在问题

用户在访问可用库存的时分,会存在一个问题:路由到不同分桶的流量或许存在差异和不均,这会导致不同分桶的余量不同,展现到不同用户上的数量就会不同。例如:#1桶中库存为0,但#2桶中库存大于0。

处理方法

  1. 规划库存借用机制,当某个分桶库存缺乏时,能够从其他桶借库存;
  2. 主桶和分桶留有一定冗余库存,分桶库存缺乏时能够向主桶申请;
  3. 答应不同用户看到不同的库存余量,所路由到的分桶没有库存时直接展现无库存;

在秒杀场景中,咱们一般挑选第三种,因为它满足的简略高效,重点维护服务端的数据共同性极致的功能。前面两种方法会大大添加体系的复杂度,在挑选的时分要慎重考虑

扣减库存完成代码

先扣除缓存在扣减数据库,缓存充当一个预扣减的作用,这里不再详细讨论。参考文章:/post/718520…

扣减缓存库存的Lua脚本

--- 对应的库存键不存在
if (redis.call('exists', KEYS[1]) == 0) then
    return -996
end
--- 分桶禁用锁
if (redis.call('exists', KEYS[2]) == 1) then
    return -998
end
--- 库存调度锁
if (redis.call('exists', KEYS[3]) == 1) then
    return -997
end
if (redis.call('exists', KEYS[1]) == 1) then
    local stocksAmount = tonumber(redis.call('get', KEYS[1]))
    local quantity = tonumber(ARGV[1])
    --- 库存不行
    if (stocksAmount < quantity) then
        return -1
    end
    if (stocksAmount >= quantity) then
        redis.call('incrby', KEYS[1], 0 - quantity)
        return 1
    end
end
return -10000

运用达观锁扣减数据库库存

<update id="decreaseBucketStock">
        update seckill_bucket
        set available_stocks_amount = available_stocks_amount - #{quantity,jdbcType=NUMERIC}
        where item_id = #{itemId,jdbcType=NUMERIC}
          AND serial_no = #{serialNo,jdbcType=NUMERIC}
          AND available_stocks_amount = #{oldAvailableStocksAmount,jdbcType=NUMERIC}
          AND available_stocks_amount <![CDATA[  >= ]]> #{quantity,jdbcType=NUMERIC}
</update>