线程池的运用场景

一般来说,我运用线程池最多的当地有两点:

  • 对大量耗时使命的并发处理
  • 对频发事务的线程池阻隔

通过一年来在作业中关于线程池的运用,也总结了一些还算是比较实用的经历。假如各位小伙伴有更好的经历,欢迎指正和共享!

线程池的创立办法

我信任很多小伙伴都很了解一些常用线程池的创立办法,比如说通过Executors来获取JDK封装的那几个常用的线程池,以及通过new ThreadPoolExecutor()的办法来创立自定义线程池。

网上关于这两种创立办法的解说材料能够说现已烂大街了哈,在这儿就不再一一解说(不了解的小伙伴能够先去学习一下线程池的根本运用和阿里开发者手册中关于线程池的运用标准)。

那么在这儿就先聊一下我个人关于他们的认识吧:

首先我觉得关于线程池的运用并不需求一直的保持那么严苛,就像下面这种办法:

// 参数随便举个例子
ExecutorService executor = new ThreadPoolExecutor(
        2, 
        10, 
        1000, 
        TimeUnit.MICROSECONDS, 
        new LinkedBlockingQueue<>(100)
);

其实我觉得还是要去区别必定的事务场景的。我觉得这种场景愈加适用于数据量规模较为明确,但某个时刻段内的数据量并不均匀的状况。

但除此之外的另一种场景,我觉得愈加适用于固定线程数的线程池。就比如说,我一个List中有100条数据,每条数据的处理时刻为100ms。那么针关于这种数据量固定且并不是很大的场景来说,我觉得咱们只将要点放在用多少线程才能用最短的时刻去向理完事务就好了,这个时分就能够直接去运用Executors.newFixedThreadPool(),简略粗暴!

ExecutorService executorService = Executors.newFixedThreadPool(10);

还有另外一种创立线程池的办法,也是我平常最喜欢用的。便是运用Spring为咱们封装的ThreadPoolTaskExecutor。
我为什么喜欢它呢?由于他能够更便利的自定义线程前缀,更利于咱们后期关于线程池的监控和故障排查。用ThreadPoolExecutor的话还得去手动创立线程工厂,不喜欢那么麻烦!

他的创立办法如下:

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(2);
// 最大线程数
executor.setMaxPoolSize(10);
/**
 * 堵塞行列长度
 * 长度=0时,运用SynchronousQueue
 * 长度>0时,运用LinkedBlockingQueue
 */
executor.setQueueCapacity(100);
// 临时线程在闲暇状态下的存活时刻(单位为秒)
executor.setKeepAliveSeconds(1);
// 回绝战略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 线程名前缀
executor.setThreadNamePrefix("executor-");
// 显式调用shutdown时,等候线程池中所有的使命均完成后再毁掉线程池,默认为false
executor.setWaitForTasksToCompleteOnShutdown(true);
// 显式调用shutdown时,假如在指定时刻内线程池内的线程还没有悉数履行完成,就强制毁掉线程池(单位为秒)
executor.setAwaitTerminationSeconds(30);

以上是一些常规线程池的运用,关于调度线程池的运用不再本文进行共享,后续会共享一篇关于使命调度的文章。大家敬请期待呀,哈哈!

对大量耗时使命的并发处理

这种事务场景一般来说会有两种处理思路,主要还是依据具体的事务场景来进行挑选:

  • 后端线程池异步处理,主线程不进行等候
  • 主线程等候线程池处理完毕后再结束

下面针关于第二种状况进行共享,由于第一种状况比较简略。第二种状况的处理便是在第一种状况的基础上凭借CountDownLatch来完成主线程的等候。

// 要处理的数据
List list = new ArrayList();
// 创立固定线程池
ExecutorService executor = new ThreadPoolExecutor(
        list.size(), list.size(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()
);
// 创立计数器,初始数量为要处理的使命数量
CountDownLatch latch = new CountDownLatch(list.size());
// 创立数据列表
for(int i = 0; i < list.size(); i++) {
    executor.execute(() -> {
        try {
            // 要处理的事务逻辑
        } catch (Exception e) {
            // 事务反常时的处理
        } finally {
            // 计数器-1
            latch.countDown();
        }
    });
}
// 持续处理线程池中剩余的使命且不再接纳新使命
executor.shutdown();
try {
    // 计数器 > 0时,主线程堵塞等候
    latch.await();
} catch (InterruptedException e) {
    e.printStackTrace();
}

以上这种状况是通过运用固定线程数的线程池进行事务处理的。一般来说在这种状况下,假如List不造成OOM的话、不呈现长事务的话,根本上也就不会呈现什么问题。

这儿需求注意的便是:

  1. List中的数据量要操控好,别搞成OOM了。
  2. 通过屡次的压测来确认适宜的线程数。
  3. CountDownLatch的初始化容量必定要等于List的长度,否则有你舒适的。
  4. 线程池内必定要用try-catch-fianlly,而且latch.countDown()必定要放在finally中,保证无论如何也要保证计数器的自减。
  5. 关于事务的处理必定要把控好总体时刻,别搞的长事务了。能够通过指定主线程堵塞的最大时刻来作为保底方案,防止长事务的呈现。例如:latch.await(10, TimeUnit.SECONDS);

对频发事务的线程池阻隔

一般状况下,咱们也需求对访问量较大的接口或下流事务运用线程池阻隔的办法,防止由于某一项事务将整个体系的线程池资源占用的状况。

关于某个访问量较大的接口进行线程池阻隔

这种处理方案便是客户端调用接口后,主线程将使命分配给线程池中的子线程去履行。这种解决方案虽然说在必定程度上保证了体系的可靠性,但是我觉得运用信号量阻隔要更好一些(便是咱们常说的限流)。

对下流事务进行线程池阻隔

比如说现在产品产出事务履行完毕后要通过异步使命去履行关于下流事务接口参数的封装以及接口调用的作业。前面两篇关于异步调用的文章中现已共享了异步的运用和优化

而咱们也都清楚,在运用@Async进行异步使命调用的时分,关于一些恳求量较大的下流事务,咱们都会给它分配一个线程池进行阻隔,防止当时事务频频调用时拖垮整个体系的线程池。

@Async("pushWMSExecutor")
public void pushWMS() {
    ...
}
@Bean("pushWMSExecutor")
public Executor executorService() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(2);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(100);
    executor.setKeepAliveSeconds(1);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    executor.setThreadNamePrefix("executor-");
    return executor;
}

但是咱们在运用这种办法前要注意一个工作,便是回绝战略。假如在某个场景下,这个事务调用频率出乎了咱们的预料,就很有或许触发回绝战略。这个回绝战略咱们必定要依据实际的事务场景进行挑选好,否则一旦呈现事端便是非常严峻的。

JDK为咱们供给了四种回绝战略:

回绝战略 描述
AbortPolicy(默认) 丢掉当时使命并抛出运行时反常RejectedExecutionException
DiscardPolicy 丢掉当时使命,不会有回来任何的提示
DiscardOldestPolicy 丢掉最早进入行列且未被消费的那个使命,并将新使命放进行列中
CallerRunsPolicy 使命不进入线程池,而是由当时线程履行

但是仔细想一想,在这个事务中,这四种回绝战略好像并没有适宜的。假如选用DiscardPolicy或DiscardOldestPolicy的话,要合作定时使命去扫描未处理的数据进行补偿;假如运用CallerRunsPolicy,但是假如使命过多呢?都用主线程履行吗?显然不太合理的!

这个时分咱们但是去自定义一个回绝战略。看看线程池的源码就知道,要自定义一个回绝战略无非便是完成它的反常处理器接口,然后重写他的办法就ok了。以上四种也是这么干的。

public class CustomRejectionPolicy implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        .....
    }
}

在这儿咱们能够依据自己的需求对触发回绝战略的使命进行移送备用行列进行延时缓存处理或做一些其他的工作。