• [1.首先是springBoot的项目结构如下:]
  • [2.事务测验流程涉及的类,如下]
  • [3.运用JMeter模仿并发下单]

主要是自己在项目中(中小型项目) 有支付下单事务(仅仅处理VIP,没有涉及到产品库存),现在用户量还没有上来,现在没有出现问题,可是想到假如用户量变大,下单并发量变大,或许会出现一系列的问题,趁着闲暇时刻,做了这个demo测验相关问题。

或许遇到的问题如下:

  1. 订单重复
  2. 高并发下,功能变慢

处理方法:ThreadPoolExecutor线程池 + Queue行列

[1.首先是springBoot的项目结构如下:]

SpringBoot 引入线程池+Queue缓冲队列实现高并发下单业务

[2.事务测验流程涉及的类,如下]

– BusinessThread 类

packagecom.springboot.demo.Threads;
importorg.springframework.context.annotation.Scope;
importorg.springframework.stereotype.Component;
@Component
@Scope("prototype")//spring多例
publicclassBusinessThreadimplementsRunnable{
privateStringacceptStr;
publicBusinessThread(StringacceptStr){
this.acceptStr=acceptStr;
}
publicStringgetAcceptStr(){
returnacceptStr;
}
publicvoidsetAcceptStr(StringacceptStr){
this.acceptStr=acceptStr;
}
@Override
publicvoidrun(){
//事务操作
System.out.println("多线程现已处理订单刺进体系,订单号:"+acceptStr);
//线程阻塞
/*try{
Thread.sleep(1000);
 System.out.println("多线程现已处理订单刺进体系,订单号:"+acceptStr);
}catch(InterruptedExceptione){
e.printStackTrace();
}*/
}
}

– TestThreadPoolManager 类

packagecom.springboot.demo.Threads;
importorg.springframework.beans.BeansException;
importorg.springframework.beans.factory.BeanFactory;
importorg.springframework.beans.factory.BeanFactoryAware;
importorg.springframework.stereotype.Component;
importjava.util.Map;
importjava.util.Queue;
importjava.util.concurrent.*;
@Component
publicclassTestThreadPoolManagerimplementsBeanFactoryAware{
//用于从IOC里取对象
privateBeanFactoryfactory;//假如实现Runnable的类是通过spring的application.xml文件进行注入,可通过factory.getBean()获取,这儿仅仅提一下
//线程池保护线程的最少数量
privatefinalstaticintCORE_POOL_SIZE=2;
//线程池保护线程的最大数量
privatefinalstaticintMAX_POOL_SIZE=10;
//线程池保护线程所答应的闲暇时刻
privatefinalstaticintKEEP_ALIVE_TIME=0;
//线程池所运用的缓冲行列大小
privatefinalstaticintWORK_QUEUE_SIZE=50;
@Override
publicvoidsetBeanFactory(BeanFactorybeanFactory)throwsBeansException{
factory=beanFactory;
}
/**
*用于储存在行列中的订单,避免重复提交,在实在场景中,可用redis替代验证重复
*/
Map<String,Object>cacheMap=newConcurrentHashMap<>();
/**
*订单的缓冲行列,当线程池满了,则将订单存入到此缓冲行列
*/
Queue<Object>msgQueue=newLinkedBlockingQueue<Object>();
/**
*当线程池的容量满了,履行下面代码,将订单存入到缓冲行列
*/
finalRejectedExecutionHandlerhandler=newRejectedExecutionHandler(){
@Override
publicvoidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor){
//订单参加到缓冲行列
msgQueue.offer(((BusinessThread)r).getAcceptStr());
System.out.println("体系使命太忙了,把此订单交给(调度线程池)逐个处理,订单号:"+((BusinessThread)r).getAcceptStr());
}
};
/**创立线程池*/
finalThreadPoolExecutorthreadPool=newThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS,newArrayBlockingQueue(WORK_QUEUE_SIZE),this.handler);
/**将使命参加订单线程池*/
publicvoidaddOrders(StringorderId){
System.out.println("此订单准备添加到线程池,订单号:"+orderId);
//验证当时进入的订单是否现已存在
if(cacheMap.get(orderId)==null){
cacheMap.put(orderId,newObject());
BusinessThreadbusinessThread=newBusinessThread(orderId);
threadPool.execute(businessThread);
}
}
/**
*线程池的守时使命---->称为(调度线程池)。此线程池支持守时以及周期性履行使命的需求。
*/
finalScheduledExecutorServicescheduler=Executors.newScheduledThreadPool(5);
/**
*检查(调度线程池),每秒履行一次,检查订单的缓冲行列是否有订单记载,则重新参加到线程池
*/
finalScheduledFuturescheduledFuture=scheduler.scheduleAtFixedRate(newRunnable(){
@Override
publicvoidrun(){
//判别缓冲行列是否存在记载
if(!msgQueue.isEmpty()){
//当线程池的行列容量少于WORK_QUEUE_SIZE,则开端把缓冲行列的订单参加到线程池
if(threadPool.getQueue().size()<WORK_QUEUE_SIZE){
StringorderId=(String)msgQueue.poll();
BusinessThreadbusinessThread=newBusinessThread(orderId);
threadPool.execute(businessThread);
System.out.println("(调度线程池)缓冲行列出现订单事务,重新添加到线程池,订单号:"+orderId);
}
}
}
},0,1,TimeUnit.SECONDS);
/**获取消息缓冲行列*/
publicQueue<Object>getMsgQueue(){
returnmsgQueue;
}
/**中止订单线程池+调度线程池*/
publicvoidshutdown(){
//true表明假如守时使命在履行,当即间断,false则等待使命完毕后再中止
System.out.println("中止订单线程池+调度线程池:"+scheduledFuture.cancel(false));
scheduler.shutdown();
threadPool.shutdown();
}
}

– TestController 类

packagecom.springboot.demo;
importcom.springboot.demo.Threads.TestThreadPoolManager;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.web.bind.annotation.GetMapping;
importorg.springframework.web.bind.annotation.PathVariable;
importorg.springframework.web.bind.annotation.RestController;
importjava.util.Queue;
importjava.util.UUID;
/**
*CreatedbyAdministratoron2018/5/9.
*/
@RestController
publicclassTestController{
@Autowired
TestThreadPoolManagertestThreadPoolManager;
/**
*测验模仿下单请求进口
*@paramid
*@return
*/
@GetMapping("/start/{id}")
publicStringstart(@PathVariableLongid){
//模仿的随机数
StringorderNo=System.currentTimeMillis()+UUID.randomUUID().toString();
testThreadPoolManager.addOrders(orderNo);
return"TestThreadPoolExecutorstart";
}
/**
*中止服务
*@paramid
*@return
*/
@GetMapping("/end/{id}")
publicStringend(@PathVariableLongid){
testThreadPoolManager.shutdown();
Queueq=testThreadPoolManager.getMsgQueue();
System.out.println("封闭了线程服务,还有未处理的信息条数:"+q.size());
return"TestThreadPoolExecutorstart";
}
}

[3.运用JMeter模仿并发下单请求]

SpringBoot 引入线程池+Queue缓冲队列实现高并发下单业务

[4.成果]

打印的日志阐明,开端的订单直接履行刺进到体系,当线程池的容量现已满了,则运用RejectedExecutionHandler方法把后边的订单添加到 Queue缓冲行列,运用ScheduledFuture方法守时(我这儿是每秒一次)检查Queue行列,重新把行列里边的订单添加到线程池,履行后边的刺进使命。

部分日志如下

SpringBoot 引入线程池+Queue缓冲队列实现高并发下单业务