前语
最近遇到了一个业务场景,涉及到多数据源之间的恳求的流程编列,正好看到了一篇某团介绍CompletableFuture原理和运用的技术文章,主要还是涉及运用层面。网上很多文章涉及原理的部分讲的不是特别具体且比较笼统。因为涉及到多线程的工具必需求了解原理,否则一旦遇到问题排查起来就只能凭形而上学,正好借此梳理一下CompletableFuture的作业原理
背景
咱们把Runnable了解为最基本的线程使命,只具备在线程下履行一段逻辑的才干。为了获取履行的回来值,发明了Callable和与其配合运用的Future。为了将使命之间进行逻辑编列,就诞生了CompletableFuture。关于怎样了解使命的逻辑编列,举一个简略的比方:
打开电脑-更新体系这两个操作是有先后次序的,可是泡茶和这两个操作没有先后次序,是能够并行的,而开始作业必需求等候其他操作结束之后才干进行,这就形成了使命编列的履行链。
在IO密集型体系中,相似的场景有很多。因为不同数据集的查询依靠主键不同,A数据集的查询主键是B数据集的一个字段这种情况很常见,一般还需求并发查询多个数据集的数据,所以对于多线程的履行编列是有需求的。
一种解决办法是CountDownLatch,让线程履行到某个当地后进行等候,直到依靠的使命履行结束。对于一些简略的履行链是能够满意的,可是当编列逻辑杂乱起来,CountDownLatch会导致代码难以保护和调试。所以诞生了CompletableFuture用来描绘和保护使命之间的依靠联系以进行使命编列。在实践运用中,有以下两类场景是适合运用使命编列的:
-
多数据源恳求的流程编列
-
非堵塞化网关等NIO场景
运用办法
创立与履行
同步办法
和FutureTask相似,CompletableFuture也经过get()办法获取履行成果。可是不同的是,CompletableFuture自身能够不承载可履行的使命(比较FutureTask则必须承载一个可履行的使命Callable),经过一个用于符号履行成功并设置回来值的函数,在运用上也更为灵敏,如下:
CompletableFuture<String> demo = new CompletableFuture<>();
demo.complete("success");
System.out.println(demo.get());
履行成果:success
和Future相似,get()函数也是同步堵塞的,调用get函数后线程会堵塞直到调用complete办法符号使命现已履行成功。
除了手动触发使命的完结,也能够让创立目标的一起就符号使命完结:
CompletableFuture<String> demo = CompletableFuture.completedFuture("success");
System.out.println(demo.get());
履行成果:success
异步办法
比较于同步办法,异步履行更为常见。比方下面这个比方:
CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> {
System.out.println("do something by thread" + Thread.currentThread().getName());
return "success";
});
System.out.println(demo.get());
履行成果:
do something by threadForkJoinPool.commonPool-worker-9
success
supplyAsync办法接纳一个Supplier目标,逻辑函数交给线程池中的线程异步履行
默许会运用ForkJoinPool的公共线程池来履行代码(不推荐),当然也能够指定线程池,如下:
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> {
System.out.println("do something by thread" + Thread.currentThread().getName());
return "success";
}, executor);
System.out.println(demo.get());
履行成果:
do something by threadpool-1-thread-1
success
假如不需求履行成果,也能够用runAsync办法:
CompletableFuture.runAsync(() -> {
System.out.println("do something by thread" + Thread.currentThread().getName());
});
履行成果:
do something by threadForkJoinPool.commonPool-worker-9
多使命编列
多使命编列是CompletableFuture的中心,这儿列举不同的场景来进行阐明
一元依靠
过程2需求依靠过程1履行结束才干履行,相似主线程的次序履行,能够经过以下办法完结:
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
System.out.println("履行【过程1】");
return "【过程1的履行成果】";
}, executor);
CompletableFuture<String> step2 = step1.thenApply(result -> {
System.out.println("上一步操作成果为:" + result);
return "【过程2的履行成果】";
});
System.out.println("过程2的履行成果:" + step2.get());
履行成果:
履行【过程1】
上一步操作成果为:【过程1的履行成果】
过程2的履行成果:【过程2的履行成果】
经过thenApply办法,接纳上一个CompletableFuture目标的回来值,其间隐含的逻辑是,该处逻辑只有等上一个CompletableFuture目标履行完后才会履行
二元依靠
比较于一元依靠的次序履行链,二元依靠更为常见,比方下面这个场景:
过程1和2是并行的,而过程3需求等过程1和2履行完之后才干履行,经过CompletableFuture是这么完结的:
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
System.out.println("履行【过程1】");
return "【过程1的履行成果】";
}, executor);
CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> {
System.out.println("履行【过程2】");
return "【过程2的履行成果】";
}, executor);
CompletableFuture<String> step3 = step1.thenCombine(step2, (result1, result2) -> {
System.out.println("前两步操作成果分别为:" + result1 + result2);
return "【过程3的履行成果】";
});
System.out.println("过程3的履行成果:" + step3.get());
履行成果:
履行【过程1】
履行【过程2】
前两步操作成果分别为:【过程1的履行成果】【过程2的履行成果】
过程3的履行成果:【过程3的履行成果】
经过thenCombine办法,等候step1和step2都履行结束后,获取其回来成果并履行一段新的逻辑
多元依靠
当然还可能有下面这种场景,过程M需求依靠1-N的N个前置节点:
这种情况会更为杂乱,完结办法如下:
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
System.out.println("履行【过程1】");
return "【过程1的履行成果】";
}, executor);
CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> {
System.out.println("履行【过程2】");
return "【过程2的履行成果】";
}, executor);
CompletableFuture<String> step3 = CompletableFuture.supplyAsync(() -> {
System.out.println("履行【过程3】");
return "【过程3的履行成果】";
}, executor);
CompletableFuture<Void> stepM = CompletableFuture.allOf(step1, step2, step3);
CompletableFuture<String> stepMResult = stepM.thenApply(res -> {
// 经过join函数获取回来值
String result1 = step1.join();
String result2 = step2.join();
String result3 = step3.join();
return result1 + result2 + result3;
});
System.out.println("过程M的成果:" + stepMResult.get());
履行成果:
履行【过程1】
履行【过程2】
履行【过程3】
过程M的成果:【过程1的履行成果】【过程2的履行成果】【过程3的履行成果】
经过allOf函数声明当参数中的全部使命履行结束后,才会履行下一步操作,可是要注意,allOf自身仅仅界说节点,真实堵塞的方位是thenApply函数。
和之前的办法不同,因为采用了不定变量,所以要经过CompletableFuture#join来获取每个使命的回来值。
除了allOf之外,假如咱们需求恣意一个使命完结后就履行下一步操作,能够运用anyOf办法,如下:
// step1/2/3的界说相同
// ...
CompletableFuture<Object> stepM = CompletableFuture.anyOf(step1, step2, step3);
System.out.println("过程M的成果:" + stepM.get());
履行成果:
过程M的成果:【过程1的履行成果】
与allOf不同,anyOf的回来值即为榜首个履行结束的使命
作业原理
概念
在讲原理之前,先来了解一下CompletableFuture的界说。在完结上,CompletableFuture承继了Future和CompletionStage
Future毋庸置疑,CompletableFuture最基本的才干便是获取异步计算的成果。CompletionStage则是声明了编列节点的才干,每一个CompletionStage都声明了流程树上的一个节点(见下图)
CompletionStage声明的接口thenXXX,包括thenApply、thenCompose等,界说了节点之间的连接办法(实践情况更为杂乱,具体原理参考下节函数剖析),经过这种办法,终究界说出一颗流程树,从而完结了多线程的使命编列。CompletionStage的办法回来值一般是另一个CompletionStage,从而构成了链式调用。
结构剖析
CompletableFuture里包括两个变量,result和stack
result很好了解,便是当时节点的履行成果。stack就比较杂乱,是一个无锁并发栈,声明了当时节点履行结束后要触发的节点列表,接下来咱们具体讲一下
CompletableFuture中的栈规划
Completion是一个无锁并发栈,声明了当时节点履行结束后要触发的节点列表。在结构上是一个链式节点,其间只包括了一个指向下一个节点的next目标
咱们能够看到Completion有繁多的完结类,表明不同的依靠办法。
咱们知道,在CompletableFuture中的流程编列是经过thenApply、thenAccept、thenCombine等办法来完结的,
-
thenApply:接纳上一步的处理成果,进行下一步消费,并回来成果
-
thenAccept:和thenApply相似,不过无成果回来
-
thenCombine:一起接纳两个流程节点,等其都履行结束后一起处理成果
每个函数实践分别对应了一种Completion完结类,以刚才的三种函数为例,分别对应了UniApply、UniAccept、UniCombine三个目标。所以Completion能够以为是流程编列逻辑的笼统目标,能够了解为流程节点,或许使命节点。
以UniCompletion为例,结构如下:
abstract static class UniCompletion<T,V> extends Completion {
Executor executor; // 线程池
CompletableFuture<V> dep; // 完结使命依靠的cf
CompletableFuture<T> src; // 完结使命所需资源地点的cf
/**
* 假如使命能够被履行则回来true,经过FJ符号位保证只有一个线程判别成功。
* 假如使命是异步的,则在使命启动后经过tryFire来履行使命
*/
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (e == null)
return true;
executor = null; // disable
e.execute(this);
}
return false;
}
/**
* 假如dep不为空回来true,用以判别当时使命节点是否已被激活
*/
final boolean isLive() {
return dep != null;
}
}
先来看claim函数,这个比较简单解说,该函数用于判别使命是否可被履行。经过compareAndSetForkJoinTaskTag函数的CAS操作保证只有一个线程履行成功,主要作用便是在多线程情况下保证使命的正确履行。
接下来便是重头戏,源使命与依靠使命,这两个概念是CompletableFuture的中心,贯穿了全部逻辑的履行,只有了解了这两个概念,才干对履行原理有比较透彻的了解
源使命与依靠使命
源使命和依靠使命在UniCompletion中分别为src和dep特点,举个具体的比方,比方下面这段代码:
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
return "A";
});
CompletableFuture<String> b = a.thenApply(res -> {
return "B " + res;
});
调用a.thenApply(Function fn)时,能够以为是生成了一个UniApply的流程节点(具体怎样生成的下文会提到),其间源使命便是a,而依靠使命则是thenApply的回来值。
换个简略的说法,在上面的代码中,咱们有a、b两个使命,b使命的完结需求依靠于a使命的完结,所以a会生成一个流程节点(UniApply目标),其间包括了b想要履行完结的全部资源(a的履行成果等),这时a使命就叫做源使命(因为a使命中有使命资源)。而b使命需求依靠a使命来完结,所以b使命叫做依靠使命。
源使命的完结会触发依靠使命的履行,这个便是使命编列的基本原理
函数剖析
在本节中,CompletableFuture因为名字太长,会以cf来代指
因为thenAccept、thenCombine函数等逻辑比较相似,咱们以最根底的thenApply函数为例进行剖析
中心函数
咱们先不要直接从thenApply、complete等函数入手,咱们先来看这几个中心函数,不明白做什么的不要紧,先了解这几个函数的原理就好
uniApply
CompletableFuture的逻辑在于“只有当X条件满意时,再履行Y逻辑”,uniApply函数便是担任这样的逻辑判别,首要看源码:
final <S> boolean uniApply(CompletableFuture<S> a,
Consumer<? super S> f, UniApply<S> c) {
Object r; Throwable x;
// 1
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
// 2
if (c != null && !c.claim())
return false;
// 3
S s = (S) r;
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
整个办法能够分为三段(已在代码中标出),咱们分隔来说。
榜首段,判别所给的使命节点是否现已履行结束,假如现已履行结束则进入下一步
第二段,假如有相关的流程节点,则经过claim函数判别当时使命是否可被履行,假如可履行则进入下一步(保证多线程情况下使命的正确履行)
第三段,履行传入的函数并把值设置到当时目标中。
整个逻辑是这样的,首要咱们传入了一个cf目标、一个函数,和一个流程节点。只有当传入的cf目标履行完结(result不为空),再履行给定的函数,并把履行成果设置到当时目标中。假如不考虑特殊情况,uniApply办法用一句话解说便是:假如给定的使命现已履行结束,就履行传入的函数并把履行成果设置到当时目标中
tryFire
uniApply函数仅仅是一个有条件的函数履行器,真实想要达到使命编列还需求其他函数的参与,咱们先来看tryFire办法:
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
tryFire依据相关联系的不同有多种完结,实践履行流程相差不大,这儿以常用的UniApply的完结来举例。
首要这个办法接纳了一个mode参数,有以下几种取值:
-
-1:传播形式,或许叫嵌套形式。表明使命实践现已履行结束,仅仅在传递状态
-
0:同步形式。使命由当时线程调用处理
-
1:异步形式。使命需求提交到指定线程池处理
依据mode的不同,实践tryFire履行的流程也会发生很大差异。不过归根结底,tryFire办法的本质是调用了uniApply履行一次使命,假如履行成功,则会清空dep、src等自身特点(清空之后isLive办法会回来false,表明使命现已履行结束),一起经过postFire办法履行该使命下的其他依靠使命,完结使命的传播履行。
postFire办法因为和tryFire办法相关比较亲近,这儿放在一起阐明:
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
if (a != null && a.stack != null) {
if (mode < 0 || a.result == null)
a.cleanStack();
else
a.postComplete();
}
if (result != null && stack != null) {
if (mode < 0)
return this;
else
postComplete();
}
return null;
}
这儿简略概括一下履行原理,假如是嵌套形式,则整理栈内无效使命,并回来目标自身(能够以为什么都没做);否则经过postComplete办法履行栈内依靠此使命的其他使命项
postComplete
当一个CompletionStage履行完结之后,会触发依靠它的其他CompletionStage的履行,这些Stage的履行又会触发新一批的Stage履行,这便是使命的次序编列
假如说uniApply是根底功能,是担任线程安全且恪守依靠次序地履行一个函数,那么postComplete便是中心逻辑,担任当一个使命履行结束后触发依靠该使命的其他使命项,先来看源码:
final void postComplete() {
CompletableFuture<?> f = this; Completion h;
// 1
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
// 2
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
// 3
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
在源码上符号了三个方位,分别代表三层结构,首要是榜首层while循环,只需当时目标栈中还有流程节点,那么就循环履行内部逻辑。
第二层,因为continue的存在,和榜首层结合起来看便是一个批量压栈的操作,将全部需求触发的依靠树按次序压入当时目标栈中。
第三层,经过tryFire按次序触发栈中全部的依靠使命。上节咱们能够看到tryFire函数内依据mode的不同会触发不同的逻辑,这儿mode指定为NESTED便是为了避免循环调用postComplete
履行函数
几个中心函数介绍完了,接下来咱们回到最外层,来看看使命是怎样履行的,首要咱们以thenApply为例剖析中心履行函数
supplyAsync(实践调用为asyncSupplyStage)
该办法用于提交一个使命到线程池中履行,并将该使命打包为一个CompletableFuture节点
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
其间AsyncSupply完结了Runnable接口,所以了解为一种特殊的使命即可。这种使命在履行完结后会调用completeValue将函数履行的成果设置当时目标中。
所以全体逻辑为,首要创立一个cf目标,并立即将使命添加到线程池履行,在履行结束后会将使命履行的成果保存到所创立的cf目标中。
complete
public boolean complete(T value) {
boolean triggered = completeValue(value);
postComplete();
return triggered;
}
该办法直接调用completeValue办法设置值,设置完值之后调用postComplete办法来依次履行后续使命。当调用该办法时,会完结使命的依靠扩散履行
thenApply(实践调用为uniApplyStage)
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
结合上节剖析的中心函数,咱们很简单能够剖析该函数的流程:履行function函数,假如条件不满意则履行失败,会生成一个流程节点并压入栈,一起再经过tryFire再测验履行一次,假如条件仍然不满意,那么只能等候所依靠的使命履行完结后经过postComplete触发履行。
get
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
办法中心在于waitingGet,内部运用了ForkJoinPool.managedBlock来堵塞线程直到履行结束
流程剖析
在函数剖析中,咱们实践现已阐明了使命依靠履行的基本原理,这儿为了更为具体地阐明,咱们以一个简略的比方来剖析。
首要咱们抛开全部杂乱的因素,以最基本的同步串行代码来讲,咱们现在有这样一个目标:
CompletableFuture<String> A = new CompletableFuture<>();
然后咱们这时候给其加上了使命编列,增加了一个thenApply依靠
AtomicInteger seq = new AtomicInteger(0);
Function<String, String> func = s -> s + " | " + seq.incrementAndGet();
CompletableFuture<String> a = new CompletableFuture<>();
CompletableFuture<String> b = a.thenApply(func);
所以咱们就有了这样一个结构,A的stack中压入了一个Completion节点,该节点的源使命指向A自身,而依靠使命指向了B,表明B使命依靠A使命的履行。
接下来咱们再加一条依靠
AtomicInteger seq = new AtomicInteger(0);
Function<String, String> func = s -> s + " | " + seq.incrementAndGet();
CompletableFuture<String> a = new CompletableFuture<>();
CompletableFuture<String> b = a.thenApply(func);
CompletableFuture<String> c = a.thenApply(func);
咱们会发现两个特色:
-
和栈的性质相同,越晚添加的编列逻辑越早被履行
-
根据同一个目标衍生出来的流程节点的源使命是一致的
以此类推,thenXXX的其他逻辑也是相似的原理,当a调用complete函数时(无论是同步还是异步),都会依次触发A使命的stack下挂接的其他依靠使命。而只需a没有调用complete函数,那么thenApply中挂接的依靠使命无论怎样都无法履行(因为a目标的result特点为空)
注意事项
避免主使命和子使命向同一个线程池中请求线程,因为存在依靠联系,当经过join来获取子使命的值时,一旦子使命因为线程队列已满进入堵塞队列,那么将会形成死锁