背景

异步削峰,目的是削峰,方式是异步。面临瞬时压力,都需求异步削峰,其关键都在于拉长时刻线,削平毛刺,终究全体提高吞吐量

中心流程

秒杀场景下异步下单实现

提交下单使命:用户发出下单恳求后,先要获取订单答应,若获取成功才干将下单使命提交给音讯行列进行处理;

下单使命轮询:异步处理,不会立刻返回成果。所以用户需求必定频率来轮询处理成果

完结原理

下单答应

所谓下单答应,指的是在秒杀品开始秒杀前,依据秒杀品的库存配置特定份额(可调理) 的下单答应,只有取得下单答应的用户才干提交下单使命。个比如,假设秒杀品A有库存10000个,那么咱们能够将秒杀答应设定为10000*1.210000**1.5

  1. 下降不必要的资源竞赛与糟蹋。约束订单使命放入音讯行列,假如不设置12000的下单答应,则可能有10,000乃至100,0000的用户将恳求提交到使命行列,可是咱们只有戋戋12000的库存,这会给行列和相关计算方造成巨大压力。
  2. 提高用户体会。有了下单答应之后,当答应被抢完的时候,咱们即可当即向用户展现“售罄”或“暂无库存”等更为友好且贴近事实的提示,不存在忽悠,是真没有库存了。
  3. 保证一切库存能够卖出。依照1.2或1.5等份额设置高出库存数量的下单答应,便是为了预留必定的Buffer,允许一些无效提交但不会影响全体售卖。

提交下单使命

异步处理进程中,提交下单使命是第一步,留意事项

  • 提交下单使命之前,应经过根底且必要的账号、安全相关的校验
  • 下单要加锁,防止颤动、连续点击等导致用户层面出现重复提交问题
  • 所提交的下单使命,应该具有清晰且仅有的编码以便于跟踪,即 placeOrderTaskId,当用户取得提交答应时,应向用户提供placeOrderTaskId用以后续的成果轮询;
  • 同一用户、同一秒杀品,不应出现重复提交(能够经过placeOrderTaskId判断)
  • 下单答应的规划应结合本地缓存+中心化缓存,以下降网络恳求负载并提高处理效率
  • 在处理本地缓存和中心化缓存时,要侧重留意过期时刻的设置和更新时的锁竞赛问题;

生成 placeOrderTaskId 代码

经过userIditemId 生成 placeOrderTaskId 能够保证同一用户只能购买一个商品

/**
 * 订单使命id
 *
 * @param userId
 * @param itemId
 * @return
 */
private String generatePlaceOrderTaskId(Long userId, Long itemId) {
	String toEncrypt = userId + "_" + itemId;
	return DigestUtils.md5DigestAsHex(toEncrypt.getBytes());
}

生成下单答应代码

结合本地缓存 + 分布式缓存的完结

private final static Cache<Long, Integer> availableOrderTokensLocalCache = CacheBuilder.newBuilder().initialCapacity(20).concurrencyLevel(5).expireAfterWrite(20, TimeUnit.MILLISECONDS).build();
// 从本地获取orderToken
private Integer getAvailableOrderTokens(Long itemId) {
	Integer localAvailableToken = localAvailableTokens.getIfPresent(itemId);
	if (localAvailableToken != null) {
		logger.info("本地缓存射中|{}", itemId);
		return localAvailableToken;
	}
	return refreshLocalAvailableTokens(itemId);
}
// 从本地获取长途的缓存并更新
private synchronized Integer refreshLocalAvailableTokens(Long itemId) {
	// 再次从本地缓存获取
	Integer localAvailableToken = localAvailableTokens.getIfPresent(itemId);
	if (localAvailableToken != null) {
		logger.info("本地缓存射中|{}", itemId);
		return localAvailableToken;
	}
	String goodAvailableTokensKey = getGoodAvailableTokensKey(itemId);
	Integer goodAvailableTokensInRedis = redisCacheService.getInteger(goodAvailableTokensKey);
	if (goodAvailableTokensInRedis != null) {
		logger.info("长途缓存射中|{}", itemId);
		localAvailableTokens.put(itemId, goodAvailableTokensInRedis);
		return goodAvailableTokensInRedis;
	}
	return refreshLatestAvailableTokens(itemId);
}
// 获取最新的库存信息
private Integer refreshLatestAvailableTokens(Long itemId) {
	DistributedLock distributedLock = distributedLockFactoryService.getDistributedLock(getRefreshTokensLockKey(itemId));
	try {
		boolean isSuccessLock = distributedLock.tryLock(500, 1000, TimeUnit.MILLISECONDS);
		if (!isSuccessLock) {
			return null;
		}
		GoodStockCache availableItemStock = goodStockCacheService.getAvailableItemStock(-1L, itemId);
		if (availableItemStock == null || availableItemStock.getAvailableStock() == null || !availableItemStock.isSuccess()) {
			logger.info("库存不存在|{}", itemId);
			return null;
		}
		Integer availableOrderTokens = (int) Math.ceil(availableItemStock.getAvailableStock() * 1.5);
		// 更新长途的下单答应
		redisCacheService.put(getGoodAvailableTokensKey(itemId), availableOrderTokens, 24, TimeUnit.HOURS);
		// 存入本地缓存
		localAvailableTokens.put(itemId, availableOrderTokens);
		return availableOrderTokens;
	} catch (InterruptedException e) {
		logger.info("改写tokens失利|{}", itemId, e);
		return null;
	}
}

扣减下单答应代码

运用 Lua代码扣减 长途缓存中存储的 答应数量(本地缓存保存的时刻十分短,基本都会到分布式缓存中读取)

if (redis.call('exists', KEYS[1]) == 1) then
    local availableTokensCount = tonumber(redis.call('get', KEYS[1]))
    if availableTokensCount == 0 then
        return -1
    end
    if availableTokensCount > 0 then
        redis.call('incrby', KEYS[1], -1)
        return 1
    end
end
return -100

运用Java代码多次调用Lua脚本

为了保证可用下单答应数量的有效性,咱们给下单答应设置了过期时刻,这会导致在履行LUA脚本时数据不存在,所以为了应对这种情况,在数据不存在时当前线程会主动测验改写数据,然后继续履行LUA脚本。也便是说,当用户抢到了下单答应可是下单失利或取消订单时,系统会定时对数据进行纠正,腾出来空余的答应给后边需求的用户,保证一切库存均可对外出售

// 扣减答应
private boolean takeOrRecoverToken(PlaceOrderTask placeOrderTask) {
	ArrayList<String> keys = new ArrayList<>();
	keys.add(getGoodAvailableTokensKey(placeOrderTask.getItemId()));
	for (int i = 0; i < 3; i++) {
		Long result = redisCacheService.getRedisTemplate().execute(TAKE_ORDER_TOKEN_LUA, keys);
		if (result == null) {
			return false;
		}
		// 没有库存
		if (result == -1L) {
			logger.info("库存为0|{}", JSON.toJSONString(placeOrderTask));
			return false;
		}
		// 数据在缓存中不存在,先去更新
		if (result == -100L) {
			refreshLatestAvailableTokens(placeOrderTask.getItemId());
			continue;
		}
		return result == 1L;
	}
	return false;
}

异步处理下单使命

经过异步提交到MQ中的订单,MQ会依据订阅将音讯推送给订阅方处理。

  • 异步处理下单使命时,应先确定TOPIC和订阅关系
  • 使命处理成果存储到缓存中,方便客户端轮询;
  • 使命处理成功后,将订单ID返回给客户端,方便查看订单详情;

异步发送音讯

运用 RabbitMQ 发送音讯

@Override
public boolean post(PlaceOrderTask placeOrderTask) {
	logger.info("投递下单使命|{}", JSON.toJSONString(placeOrderTask));
	if (placeOrderTask == null) {
		logger.info("下单使命参数为空");
		return false;
	}
	String placeOrderTaskString = JSON.toJSONString(placeOrderTask);
	try {
		rabbitTemplate.convertAndSend(MqConfig.SECKILL_EXCHANGE_NAME, MqConfig.SECKILL_ROUTING_KEY, placeOrderTaskString);
		logger.info("OrderTaskPostServiceImpl|使命投递成功|{}", placeOrderTaskString);
		return true;
	} catch (AmqpException e) {
		logger.info("OrderTaskPostServiceImpl|使命投递失利|{}", placeOrderTaskString);
		return false;
	}
}

库存扣减

异步下单不存在库存扣减逻辑,由于异步并发可控,将直接在数据库层面进行竞赛扣减。使命处理完毕后,会将成果写入到缓存中以供查询。

/**
 * 处理下单使命
 *
 * @param placeOrderTask
 */
@Transactional
public void handlePlaceOrderTask(PlaceOrderTask placeOrderTask) {
	Long userId = placeOrderTask.getUserId();
	SeckillGoodResponse seckillGoodResponse = seckillGoodClient
			.getSeckillGood(userId, placeOrderTask.getActivityId(), placeOrderTask.getItemId())
			.getData();
	// 构造 实体类
	SeckillOrder seckillOrder = SeckillOrderBuilder.toDomain(placeOrderTask);
	seckillOrder.setItemTitle(seckillGoodResponse.getItemTitle());
	seckillOrder.setFlashPrice(seckillGoodResponse.getFlashPrice());
	seckillOrder.setUserId(userId);
	StockDeduction stockDeduction = new StockDeduction()
			.setItemId(placeOrderTask.getItemId())
			.setUserId(userId)
			.setQuantity(seckillOrder.getQuantity());
	Long orderId = null;
	try {
		// 正式扣减库存
		boolean isDecreaseStock = goodStockDeductionService.decreaseItemStock(stockDeduction);
		if (!isDecreaseStock) {
			logger.info("正式库存失利|{}, {}", userId, JSON.toJSONString(placeOrderTask));
			return;
		}
		// 创立订单
		logger.info("placeOrder|下单|{},{}", userId, JSON.toJSONString(placeOrderTask));
		seckillOrder.setStatus(SeckillOrderStatus.CREATED.getCode());
		boolean isSuccessSave = seckillOrderMapper.insert(seckillOrder) > 0;
		if (!isSuccessSave) {
			logger.info("订单创立失利|{},{}", userId, JSON.toJSONString(seckillOrder));
			throw new BusinessException(PLACE_ORDER_FAILED);
		}
		orderId = seckillOrder.getId();
		redisCacheService.put(PLACE_ORDER_TASK_ORDER_ID_KEY + placeOrderTask.getPlaceOrderTaskId(), orderId, 24, TimeUnit.HOURS);
		placeOrderTaskService.updateTaskHandleResult(placeOrderTask.getPlaceOrderTaskId(), true);
		logger.info("订单已创立成功|{},{}", userId, JSON.toJSONString(seckillOrder));
	} catch (Exception e) {
		// 扣减成功了才恢复
		logger.error("下单失利|{},{}", userId, JSON.toJSONString(placeOrderTask), e);
		placeOrderTaskService.updateTaskHandleResult(placeOrderTask.getPlaceOrderTaskId(), false);
		throw new BusinessException(ErrorCode.PLACE_ORDER_FAILED);
	}
}

客户端轮询成果

用户提交下单使命后,会返回使命ID,随后就能够经过使命ID来查询该使命的成果:

  • 初始提交: SUBMITTED,即没有处理;
  • 下单成功:SUCCESS,完结处理并成功下单入库;
  • 下单失利:FAILED,完结处理但下单失利;
  • 使命不存在:过错的使命ID或使命缓存已过期被删去。
    /**
     * 获取订单成果
     * @param userId
     * @param itemId
     * @param placeOrderTaskId
     * @return
     */
    public BaseResponse<SeckillOrderMessageResponse> getPlaceOrderResult(Long userId, Long itemId, String placeOrderTaskId) {
        String orderTaskId = generatePlaceOrderTaskId(userId, itemId);
        if (!orderTaskId.equals(placeOrderTaskId)) {
            logger.info("下单ID过错|{}, {}, {}", userId, itemId, placeOrderTaskId);
            return ResultUtils.error(ErrorCode.PLACE_ORDER_TASK_ID_INVALID);
        }
        // 获取订单的状况
        OrderTaskStatus taskStatus = placeOrderTaskService.getTaskStatus(placeOrderTaskId);
        if (taskStatus == null) {
            logger.info("使命状况为空|{}", placeOrderTaskId);
            return ResultUtils.error(ErrorCode.PLACE_ORDER_TASK_ID_INVALID);
        }
        if (!taskStatus.getStatus().equals(OrderTaskStatus.SUCCESS.getStatus())) {
            logger.info("订单使命没有成功|{}", placeOrderTaskId);
            return ResultUtils.success(SeckillOrderMessageResponse.ok().setCode(taskStatus.getStatus()));
        }
        // 获取订单号
        Long orderId = redisCacheService.getLong(PLACE_ORDER_TASK_ORDER_ID_KEY + placeOrderTaskId);
        if (orderId == null) {
            logger.info("订单id没有存在|{}, {}, {}", userId, itemId, placeOrderTaskId);
            return null;
        }
        // 封装对象
        SeckillOrderMessageResponse seckillOrderMessageResponse = SeckillOrderMessageResponse.ok().setOrderId(orderId)
                .setPlaceOrderTaskId(orderTaskId)
                .setCode(OrderTaskStatus.SUCCESS.getStatus());
        return ResultUtils.success(seckillOrderMessageResponse);
    }

提供接口完结轮询

在接口方面,本次方案仅增加了使命成果轮询接口。需求留意的是,该接口用于轮询成果,且没有杂乱计算和数据状况改变,在限流方面的阈值能够相对调高些

/**
 * 获取订单成果
 * @param userId
 * @param itemId
 * @param placeOrderTaskId
 * @return
 */
@GetMapping("/result/{itemId}/{placeOrderTaskId}")
public BaseResponse<SeckillOrderMessageResponse> getPlaceOrderResult(@RequestHeader("TokenInfo") Long userId,
																	 @PathVariable Long itemId,
																	 @PathVariable String placeOrderTaskId) {
	return seckillOrderService.getPlaceOrderResult(userId, itemId, placeOrderTaskId);
}
}

总结

单异步削峰的方针,即提高用户体会和提高系统吞吐能力。在中心流程上,咱们将异步下单进程拆分为三个环节:提交下单使命处理下单使命轮询下单成果