本文主要有以下内容
- 介绍Java中内置的线程池
- 线程池的七大参数
- 线程池的生命周期
- 线程池的作业原理
- 自界说线程池
Java内置的线程池
线程池:为了节约系统资源,减少创立线程的开支以及更好的办理线程,Java
供给了一套Executor
结构,封装了对多线程的控制,其体系结构如下图所示:
Executor
自身是一个接口,其代码如下:
public interface Executor {
void execute(Runnable command);
}
ExecutorService
接口对该接口进行了扩展,添加许多办法
- shutdown()
- shutdowmNow()
- isShutdown()
- isTerminated()
- awaitTermination()
- submit(Callable<T>)
- submit(Runnable,T)
- submit(Runnable)
- invokeAll()等重载办法
重点关注前五个办法:
-
shutdown()
: 调用此办法告诉线程池shutdown
,调用此办法后,线程池不再承受新的使命,已经提交的使命不会受到影响,会按照顺序履行完毕。不会堵塞调用此办法的线程。 -
shutdowmNow()
,当即测验中止一切正在运转的使命,回来一个待履行的使命列表。不会堵塞调用此办法的线程。该办法除了尽力去测验中止线程外,没有任何保证,任何呼应中止失利的线程可能永久不会中止(如:经过thread.interrupted()中止线程时)。 -
isShutdown()
:回来一个boolean值,假如已经shutdown
回来true,反之false。 -
awaitTermination(timeout,timeUnit)
:堵塞直到一切使命全部完结,或者等候 timeout ,或者在等候timeout期间当时线程抛出InterruptedException
-
isTerminated()
: 回来true
假如一切的使命已经完结且关闭,不然回来false
除非在从前已经调用过shutdown()/shutdownNow()
AbstractExecutorService
是一个抽象类,完成了ExecutorService
,其子类ThreadPoolExecutor
进一步扩展了相关功能,在Java中,贴心的Doug Lea
供给了一个东西类供咱们去运用ThreadPoolExecutor
,在Executors
中供给了如下几种线程池
办法名 | 描绘 |
---|---|
newCachedThreadPool() | 必要时创立新的线程,闲暇线程保存60s |
newFixedThreadPool() | 创立固定数目的线程;闲暇线程会一向保存 |
newWorkStealThreadExecutor() | 一种适合fork-join 使命的线程池,杂乱使命拆分为简单的使命,闲暇线程会来帮助 |
newSingleThreadExecutor() | 只有一个线程的线程池,按顺序履行所提交的使命 |
newScheduledThreadPool() | 用于调度履行的固定线程池 |
newSingleThreadScheduledExecutor() | 用于调度履行的单线程池 |
虽然有这么多的线程池,但都是给ThreadPoolExecutor
的结构函数传递不同的参数算了!
上面所说到的线程池中,需求留意的一个线程池为newScheduledThreadPool()
,他的源码如下
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
回来的是一个ScheduledThreadPoolExecutor
对象,在这个类中咱们需求留意这三个办法的运用
- public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay, TimeUnit unit);
schedule()
会在给定的时间进行一次调度,后面的两办法会周期性的对使命进行调用,可是还有些许差异,scheduleWithFixedDelay()
会在上一次使命履行完毕后等候给定的delay时间后再履行,可是假如代码运转的时长大于delay,则会在运转完毕后当即运转。scheduleAtFixedRate()
则是在上次使命履行的开端时间之后的period后就履行。
scheduleAtFixedRate()
的运用:
代码示例:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
AtomicInteger i = new AtomicInteger(1);
System.out.println("当时时间是:"+LocalDateTime.now());
executorService.scheduleAtFixedRate(() -> {
System.out.println(LocalDateTime.now());
System.out.println("得到" + i + "次履行拉");
i.getAndIncrement();
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}, 3, 2, TimeUnit.SECONDS);
运转成果如下图:
从上面的运转成果能够知道,基本上每一次调度都是在上一次开端之后的 2s
之后,
留意:假如代码运转的时间超越了等候时间,则上一次调度完毕后,立马履行。
翻开注释的代码,得到的运转成果如下:
能够看到,每一次运转的成果时间距离并不是之前的 2s
,而是 3s!!
小结: scheduleAtFixedRate的调度流程
- 先等候 delay 时间后运转
- 此时假如代码运转的时间 < period,则下次运转的时间是上一次开端调度的时间的period时间后。
- 假如代码运转时间 > period,则下次运转的时间是在上一次完毕之后立马运转。
scheduleWithFixedDelay()
的用法:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
AtomicInteger i = new AtomicInteger(1);
System.out.println("当时时间是:"+LocalDateTime.now());
executorService.scheduleWithFixedDelay(()->{
System.out.println("运转开端: "+LocalDateTime.now());
System.out.println("得到"+i+"履行拉");
i.getAndIncrement();
// try {
// Thread.sleep(4000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println("运转完毕: "+ LocalDateTime.now());
},3,2,TimeUnit.SECONDS);
翻开注释,得到如下运转成果:
从上面两次运转的成果能够看到 scheduleWithFixedDelay()
的调度距离和其代码的运转时间没有关系,相邻的距离时间固定。
线程池的七大参数
在ThreadPoolExecutor
供给了如下的结构办法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
corePoolSize:中心线程数,即一向存活的线程数量,
-
maximumPoolSize:线程池允许的最大线程数量
-
keepAliveTime:当线程池中的线程数量超越中心线程数量时,闲暇线程的最大的存活时间
-
unit:keepAliveTime的时间单位
-
workQueue:作业行列,用于存放未履行的使命的行列
-
threadFactory:线程工厂,创立新线程的当地
-
Handler:回绝战略,当线程池不承受使命时采取的战略
- DiscardPolicy:直接丢掉
- DiscardOldestPolicy:丢掉等候最长时间的使命u
- AbortPolicy:默许的回绝战略,抛出
RejectedExecutionException
异常 - CallerRunsPolicy:只需线程池不处有shutdown,则将使命交给调用者线程履行,即调用
execute()
办法的线程,假如处于shutdown,则会被丢掉履行。
在上面的表中,列出了Executors东西类中所供给的创立线程的办法,本质上便是7大参数的不同值。
如newFixedThreadPool
,中心线程等于最大线程则为固定线程
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
在比如newCachedThreadPool()
,假如当时线程池有闲暇线程可用,则当即履行,假如没有闲暇线程,则当即创立新的线程履行,不把他放入作业行列中去。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),threadFactory);
}
SynchronousQueue
是BlockingQueue
的一种完成,他没有任何的内部容量,往这个行列中进行插入操作有必要等候另一个线程的remove
操作,鄙人一篇并发容器的相关文章中做详细介绍。
线程池的生命周期
在ThreadPoolExecutor
类中界说了如下几种线程池的状态
- RUNNING:接纳新使命和处理行列中的使命
- SHUTDOWN:不在接纳新的使命,可是会处理等候行列中的使命
- STOP:不在接纳新使命且不会处理等候行列中的使命,还要中止正在运转的使命
- TIDYING:一切使命完结且作业线程数为0,调用terminate()会进入到此状态
- TERMINATED:terminate()运转完毕之后会进入这种状态
线程池的作业原理
在ThreadPoolExecutor
这个类中,其execute()
办法,展示了线程池的作业原理
源码如下:
public void execute(Runnable command) {
if (command == null) // 1
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 2
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //3
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 4
reject(command);
}
第一:查看传进来的对象是否为null,假如是则抛出空指针异常。
第二:假如作业线程数小于中心线程数则经过addWoker()
将使命交给线程池处理,即立马履行当时的使命
第三:假如作业线程数大于等于中心线程数,则将使命加入到作业行列
第四:假如作业行列已满,则交给线程池处理,假如当时线程数小于最大线程数,则创立新的线程运转使命,反之回绝该使命
自界说线程池
在思考如何自界说线程池之前,需求首要回顾下线程池的七大参数:
- 中心线程数:常驻线程池的线程
- 最大线程数:线程池的最多的线程容量
- 存活时间和时间单位:闲暇线程的最大存活时间
- 作业行列:存储还没来得及处理的使命的容器
- 线程工厂:创立新线程的当地
- 回绝战略:说 “No” 的办法
在这7大参数中:中心线程数,最大线程数,以及存活时间和时间单位在我个人看来不是那么重要!毕竟在生产中这些参数在详细场景下都会得到确认,不会有什么特别能够定制的当地,(他如同没那么重要.jpg)
而作业行列的挑选和回绝战略则能够有较多的挑选只需是完成了BlockedQueue接口的容器都能够当作作业行列,换句话说便是只需完成了该接口,都能够充当作业行列。相同的,在 jdk 中默许完成的4种回绝战略,他们都完成了RejectedExecutionHandler
接口!而这个接口的作用便是界说哪些不能够被线程池处理的使命,这个接口里面只有一个办法rejectedExecution(Runnable r, ThreadPoolExecutor executor);
这个办法的调用时机便是线程池无法接纳新的使命时!
线程工厂:ThreadFactory
接口中,界说了newThread(Runnable r);
办法进行创立新线程。因而只需完成了改接口,也能够依据自己的意愿去创立新线程!
因而在创立自界说线程池的时分,咱们能够进行如下挑选
- 完成ThreadFactory:界说创立线程的办法,在Executors中有默许完成,但也能够自己去完成!
- 完成RejectedExecutionHandler:自界说回绝战略
- 完成BlockingQueue:就能够自界说作业行列
除此之外,在Executors
东西类中,其创立线程的办法如以下几种:
// Executors
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
// ThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
// Executors
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
大多都是依据不同的作业行列,创立了不同特性的线程池!
创立自界说回绝战略的线程池
参照 netty 的代码,创立如下的回绝战略:
public class MyRejectHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
System.out.println("给你时机,你不中用啊");
final Thread t = new Thread(r, "new thread execute new task");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
这里创立的线程池是凭借了ThreadPoolExecutor
, 假如对自己的能力自信,能够自己去完成一个自己的"ThreadPoolExecutor"
,来给人们一点小小的震慑!
ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
2,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
new MyRejectHandler());
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
System.out.println("new Task submit " + LocalDateTime.now());
});
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
先翻开注释,怠慢使命提交,此时没有触发回绝战略
注释掉这段线程延时代码,能够得到如下输出,打印了七次给你时机,你不中用啊
,这是由于常驻线程为2,能够处理2个使命,作业行列的容量为1,能够保存一个,因而,能够处理3个使命,而经过下面打印的时间,就能够发现在同一时间,提交了9个使命,因而处理不过来所以打印了 10- 3 = 7次
参考资料:
- Java中心技术卷1
- Java高并发程序设计
- Executors,ThreadPoolExecutor等源码