前言
最近因为一些个人原因,未能抽出太多精力更新Java多线程系列,一搁置便是好几个月,先向读者诸君致歉。
在本系列的其他文章中,现已说到过线程之间的相互协作, 经过分工,将程序体系的不同使命进行线程别离,充分运用机器功能、提升特定线程的运用率和程序的体验感。
详见拙作:Java多线程根底–线程生命周期与线程协作详解.
并在线程池相关文章中说到:作为程序构建者,咱们更关怀线程(组)的特性和它们所履行的使命,并不愿意分神去做线程操作。
详见拙作:Java多线程根底–线程的创立与线程池办理
然而实际开发中,咱们同样关怀一个使命对程序体系发生的影响,习惯上称之为使命的的履行成果。
Runnable的局限性
在前文中咱们谈到,经过编码完成Runnable接口,将取得具有边界性的 “使命”,在指定的线程(或许线程池)中运转。
从头调查该接口,不难发现它并没有办法回来值:
public interface Runnable {
void run();
}
在JDK1.5之前,想运用使命的履行成果,需求小心的操作线程拜访临界区资源。运用 回调
进行解耦是十分不错的选择。
练手小Demo — 回顾既往文章知识
注意,为了减少篇幅运用了lambda,但jdk1.5之前并不支撑lambda
将计算使命别离到其他线程履行,再回到主线程消费成果
咱们将计算、IO等耗时使命丢到其他线程,让主线程专注于自身事务,设想它在接受用户输入以及处理反应,但咱们省略这一部分
咱们能够设计出类似下面的代码:
尽管它还有许多不合理之处值得优化,但也足以用于演示
class Demo {
static final Object queueLock = new Object();
static List<Runnable> mainQueue = new ArrayList<>();
static boolean running = true;
static final Runnable FINISH = () -> running = false;
public static void main(String[] args) {
synchronized (queueLock) {
mainQueue.add(Demo::onStart);
}
while (running) {
Runnable runnable = null;
synchronized (queueLock) {
if (!mainQueue.isEmpty())
runnable = mainQueue.remove(0);
}
if (runnable != null) {
runnable.run();
}
Thread.yield();
}
}
public static void onStart() {
//...
}
public static void finish() {
synchronized (queueLock) {
mainQueue.clear();
mainQueue.add(FINISH);
}
}
}
再模仿一个计算的线程和使命回调:
interface Callback {
void onResultCalculated(int result);
}
class CalcThread extends Thread {
private final Callback callback;
private final int a;
private final int b;
public CalcThread(Callback callback, int a, int b) {
this.callback = callback;
this.a = a;
this.b = b;
}
@Override
public void run() {
super.run();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
final int result = a + b;
System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());
synchronized (queueLock) {
mainQueue.add(() -> callback.onResultCalculated(result));
}
}
}
填充一下onStart事务:
class Demo {
public static void onStart() {
System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());
new CalcThread(result -> {
System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis());
finish();
}, 200, 300).start();
}
}
复习:优化为运用Runnable
在前文咱们说到,假如事务仅重视使命的履行,并不过于关怀线程本身,则能够运用Runnable:
class Demo {
static class CalcRunnable implements Runnable {
private final Callback callback;
private final int a;
private final int b;
public CalcRunnable(Callback callback, int a, int b) {
this.callback = callback;
this.a = a;
this.b = b;
}
@Override
public void run() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
final int result = a + b;
System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());
synchronized (queueLock) {
mainQueue.add(() -> callback.onResultCalculated(result));
}
}
}
public static void onStart() {
System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());
new Thread(new CalcRunnable(result -> {
System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis());
finish();
}, 200, 300)).start();
}
}
不难想象出:咱们十分需求
- 让特定线程、特定类型的线程方便地接纳使命,回顾本系列文章中的 线程池篇 ,线程池是应运而生
- 具有比Synchronize更轻量的机制
- 具有更方便的数据结构
至此,咱们能够体会到:JDK1.5之前,因为JDK的功能不足,Java程序关于线程的运用 较为粗糙。
为异步而生的Future
总算在JDK1.5中,迎来了新特性: Future
以及从前文章中说到的线程池, 时光荏苒,一晃将近20年了。
/**
* 略
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
尽管现已移除了API注释,但仍然能够了解每个API的含义,不多做赘述。
清楚明了,为了增加回来值,没有必要用如此复杂的 接口来替代 Runnable
。简略思考后能够对回来值的状况进行概括:
- 回来Runnable中事务的成果,例如计算、读取资源等
- 单纯的在Runnable履行完毕后回来一个成果
从事务层上看,仅需求如下接口即可,它增加了回来值、并能够更友爱地让运用者处理反常:
作者按:抛开底层完成,仅看事务方编码需求
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
* 防盗戳 leobert-lan https:///user/2066737589654327
*/
V call() throws Exception;
}
明显,JDK需求供给后向兼容能力:
- Runnable 不能够丢弃,也不应当丢弃
- 不能要求运用者彻底的重构代码
所以同时供给了适配器,让运用者进行简略的部分重构即可用上新特性
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
而Future恰如其名,它代表了在 “未来” 的一个成果和状况,为了更方便地处理异步而生。
而且内置了 FutureTask
,在 FutureTask详解 章节中再行展开。
类图
在JDK1.8的根底上,看一下精简的类图结构:
FutureTask详解
构造函数
public class FutureTask {
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
}
生命周期
public class FutureTask {
//新建
private static final int NEW = 0;
//处理中
private static final int COMPLETING = 1;
//正常
private static final int NORMAL = 2;
//反常
private static final int EXCEPTIONAL = 3;
//已撤销
private static final int CANCELLED = 4;
//中止中
private static final int INTERRUPTING = 5;
//已中止
private static final int INTERRUPTED = 6;
}
可能的生命周期转化如下:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
JDK中原汁原味的解说如下:
The run state of this task, initially NEW. The run state transitions to a terminal state only in methods set, setException, and cancel. During completion, state may take on transient values of COMPLETING (while outcome is being set) or INTERRUPTING (only while interrupting the runner to satisfy a cancel(true)). Transitions from these intermediate to final states use cheaper ordered/lazy writes because values are unique and cannot be further modified.
核心办法
本节从以下三块下手阅览源码
- 状况判别
- 撤销
- 获取成果
状况判别API的完成十分简略
public class FutureTask {
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
}
撤销:
- 当时状况为
NEW
且 CAS修正 state 成功,否则回来撤销失利 - 假如
mayInterruptIfRunning
则中止在履行的线程并CAS修正state为INTERRUPTED - 调用 finishCompletion
- 删除并通知所有等候的线程
- 调用done()
- 设置callable为null
public class FutureTask {
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) {
return false;
}
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null; ) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (; ; ) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
}
获取成果: 先判别状况,假如未进入到 COMPLETING
(即为NEW状况),则阻塞等候状况改动,回来成果或抛出反常
public class FutureTask {
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V) x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable) x);
}
}
如何运用
而运用则十分简略,也十分的朴素。
咱们以文中的的例子进行改造:
- 沿用原Runnable逻辑
- 移除回调,增加
CalcResult
- 将
CalcResult
目标作为既定回来成果,Runnable中设置其特点
class Demo {
static class CalcResult {
public int result;
}
public static void onStart() {
System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());
final CalcResult calcResult = new CalcResult();
Future<CalcResult> resultFuture = Executors.newSingleThreadExecutor().submit(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
final int result = 200 + 300;
System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());
calcResult.result = result;
}, calcResult);
System.out.println("threadId" + Thread.currentThread().getId() + "横竖干点什么," + System.currentTimeMillis());
if (resultFuture.isDone()) {
try {
final int ret = resultFuture.get().result;
System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
finish();
}
}
假如直接运用新特性Callback,则如下:
直接回来成果,当然也能够直接回来Integer,不再包裹一层
class Demo {
public static void onStart() {
System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<CalcResult> resultFuture = executor.submit(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
final int result = 200 + 300;
System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());
final CalcResult calcResult = new CalcResult();
calcResult.result = result;
return calcResult;
});
System.out.println("threadId" + Thread.currentThread().getId() + "横竖干点什么," + System.currentTimeMillis());
if (resultFuture.isDone()) {
try {
final int ret = resultFuture.get().result;
System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
finish();
}
}
信任读者诸君会有这样的疑惑:
为何运用Future比原先的回调看起来粗糙?
首先要清晰一点:文中前段的回调Demo,尽管达成了既定目标,但效率并不高!!在当时计算很贵重的布景下,并不会如此莽撞地运用!
而在JDK1.5开始,供给了很多内容支撑多线程开发。考虑到篇幅,会在系列文章中逐步展开。
另外,FutureTask中的CAS与Happens-Before本篇中亦不做展开。
接下来,再做一些引申,简略看一看多线程事务形式。
引申,多线程事务形式
常用的多线程设计形式包含:
- Future形式
- Master-Worker形式
- Guarded Suspension形式
- 不变形式
- 生产者-消费
Future形式
文中关于Future的运用方法遵循了Future形式。
事务方在运用时,现已清晰了使命被别离到其他线程履行时有等候期,在此期间,能够干点别的工作,不必糟蹋体系资源。
Master-Worker形式
在程序体系中设计两类线程,并相互协作:
- Master线程(单个)
- Worker线程
Master线程负责接受使命、分配使命、接纳(必要时进一步组合)成果并回来;
Worker线程负责处理子使命,当子使命处理完成后,向Master线程回来成果;
作者按:此刻可再次回想一下文章最初的Demo
Guarded Suspension形式
- 运用缓存行列,使得 服务线程/服务进程 在未安排妥当、繁忙时能够推迟处理恳求。
- 运用等候-通知机制,将消费
服务的回来成果
的方法规范化
不变形式
在并行开发过程中,为保证数据的一致性和正确性,有必要对目标进行同步,而同步操作会对程序体系的功能发生适当的损耗。
因此,运用状况不可改动的目标,依托其不变性来保证 并行操作 在 没有同步机制 的状况下,坚持一致性和正确性。
- 目标创立后,其内部状况和数据不再发生改动
- 目标被同享、被多个线程拜访
生产者-消费
设计两类线程:若干个生产者线程和若干个顾客线程。
生产者线程负责提交用户恳求,顾客线程负责处理用户恳求。生产者和顾客之间经过同享内存缓冲区进行通信。
内存缓冲区的意义:
- 处理是数据在多线程间的同享问题
- 缓解生产者和顾客之间的功能差
这几种形式从不同角度出发处理特定问题,但亦有一定的相似之处,不再展开。
后记
至此,咱们现已进入结尾,JDK1.5中,对多线程的支撑迎来一波井喷。本文以及系列文章中关于线程池的内容也仅仅是根底中的根底,仍旧有很多的内容值得深入,本篇不再往下挖掘。
在后续的系列文章中,咱们将展开AQS、HAPPENS-BEFORE等内容,以及和本文高度相关的CompleteFutureTask,JUC东西等。