前言
为什么需求异步履行?
场景:电商系统中获取一个完好的产品信息或许分为以下几步:①获取产品基本信息 ②获取产品图片信息 ③获取产品促销活动信息 ④获取产品各品种的基本信息 等操作,假如运用串行办法去履行这些操作,假设每个操作履行1s,那么用户看到完好的产品详情就需求4s的时刻,假如运用并行办法履行这些操作,或许只需求1s就能够完结。所以这便是异步履行的好处。
JDK5的
Future
接口
Future
接口用于代表异步核算的成果,经过Future接口供给的办法能够检查异步核算是否履行完结,或许等候履行成果并获取履行成果,同时还能够撤销履行。
罗列Future
接口的办法:
-
get()
:获取使命履行成果,假如使命还没完结则会堵塞等候直到使命履行完结。假如使命被撤销则会抛出CancellationException
反常,假如使命履行进程产生反常则会抛出ExecutionException
反常,假如堵塞等候进程中被中止则会抛出InterruptedException
反常。 -
get(long timeout,Timeunit unit)
:带超时时刻的get()
办法,假如堵塞等候进程中超时则会抛出TimeoutException
反常。 -
cancel()
:用于撤销异步使命的履行。假如异步使命现已完结或许现已被撤销,或许由于某些原因不能撤销,则会回来false。假如使命还没有被履行,则会回来true而且异步使命不会被履行。假如使命现已开始履行了可是还没有履行完结,若mayInterruptIfRunning
为true,则会当即中止履行使命的线程并回来true,若mayInterruptIfRunning
为false,则会回来true且不会中止使命履行线程。 -
isCanceled()
:判别使命是否被撤销,假如使命在结束(正常履行结束或许履行反常结束)前被撤销则回来true,不然回来false。 -
isDone()
:判别使命是否现已完结,假如完结则回来true,不然回来false。需求注意的是:使命履行进程中产生反常、使命被撤销也属于使命已完结,也会回来true。
运用Future
接口和Callable
接口完结异步履行:
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(4);
// 获取产品基本信息(能够运用Lambda表达式简化Callable接口,这儿为了便于观察不运用)
Future<String> future1 = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "获取到产品基本信息";
}
});
// 获取产品图片信息
Future<String> future2 = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "获取产品图片信息";
}
});
// 获取产品促销信息
Future<String> future3 = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "获取产品促销信息";
}
});
// 获取产品各品种基本信息
Future<String> future4 = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "获取产品各品种基本信息";
}
});
// 获取成果
try {
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
System.out.println(future4.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}finally {
executorService.shutdown();
}
}
既然Future能够完结异步履行并获取成果,为什么还会需求CompletableFuture?
简述一下Future接口的坏处:
-
不支持手动完结
- 当提交了一个使命,可是履行太慢了,经过其他途径现已获取到了使命成果,现在无法把这个使命成果告诉到正在履行的线程,所以必须主动撤销或许一直等候它履行完结。
-
不支持进一步的非堵塞调用
- 经过Future的
get()
办法会一直堵塞到使命完结,可是想在获取使命之后履行额外的使命,由于 Future 不支持回调函数,所以无法完结这个功用。
- 经过Future的
-
不支持链式调用
- 关于Future的履行成果,想持续传到下一个Future处理运用,然后形成一个链式的pipline调用,这在 Future中无法完结。
-
不支持多个 Future 兼并
- 比如有10个Future并行履行,想在一切的Future运行结束之后,履行某些函数,是无法经过Future完结的。
-
不支持反常处理
- Future的API没有任何的反常处理的api,所以在异步运行时,假如出了反常问题不好定位。
运用Future接口能够经过get()
堵塞式获取成果或许经过轮询+isDone()
非堵塞式获取成果,可是前一种办法会堵塞,后一种会消耗CPU资源,所以JDK的Future接口完结异步履行对获取成果不太友好,所以在JDK8时推出了CompletableFuture完结异步编列。
CompletableFuture的运用
CompletableFuture概述
JDK8中新增加了一个包含50个办法左右的类CompletableFuture,供给了非常强壮的Future的扩展功用,能够帮助我们简化异步编程的复杂性,供给了函数式编程的能力,能够经过回调的办法处理核算成果,而且供给了转化和组合CompletableFuture的办法。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
CompletableFuture
类完结了Future
接口和CompletionStage
接口,即除了能够运用Future
接口的一切办法之外,CompletionStage<T>
接口供给了更多办法来更好的完结异步编列,而且大量的运用了JDK8引入的函数式编程概念。后面会细致的介绍常用的API。
① 创立CompletableFuture的办法
- 运用
new
关键字创立
// 无回来成果
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// 已知回来成果
CompletableFuture<String> completableFuture = new CompletableFuture<>("result");
// 已知回来成果(底层其实也是带参数的结构器赋值)
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("result");
创立一个回来成果类型为String的CompletableFuture,能够运用Future
接口的get()
办法获取该值(相同也会堵塞)。
能够运用无参结构器回来一个没有成果的CompletableFuture,也能够经过结构器的传参CompletableFuture设置好回来成果,或许运用CompletableFuture.completedFuture(U value)
结构一个已知成果的CompletableFuture。
- 运用CompletableFuture类的静态工厂办法(常用)
runAsync()
无回来值
// 运用默许线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 运用自定义线程池(引荐)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
runAsync()
办法的参数是Runnable接口,这是一个函数式接口,不允许回来值。当需求异步操作且不关怀回来成果的时分能够运用runAsync()
办法。
// 比如
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
// 经过Lambda表达式完结Runnable接口
CompletableFuture.runAsync(()-> System.out.println("获取产品基本信息成功"), executor).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
supplyAsync()
有回来值
// 运用默许线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 运用自定义线程池(引荐)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
supplyAsync()
办法的参数是Supplier<U>
供给型接口(无参有回来值),这也是一个函数式接口,U
是回来成果值的类型。当需求异步操作且关怀回来成果的时分,能够运用supplyAsync()
办法。
// 比如
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
// 经过Lambda表达式完结履行内容,并回来成果经过CompletableFuture接纳
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("获取产品信息成功");
return "信息";
}, executor);
// 输出成果
System.out.println(completableFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
关于第二个参数
Executor executor
阐明
在没有指定第二个参数(即没有指定线程池)时,CompletableFuture直接运用默许的ForkJoinPool.commonPool()
作为它的线程池履行异步代码。
在实践生产中会运用自定义的线程池来履行异步代码,详细能够参阅另一篇文章深入理解线程池ThreadPoolExecutor – (),里面的第二节有生产中怎样创立自定义线程的比如,能够参阅一下。
② 取得异步履行成果
get()
堵塞式获取履行成果
public T get() throws InterruptedException, ExecutionException
该办法调用后假如使命还没完结则会堵塞等候直到使命履行完结。假如使命履行进程产生反常则会抛出ExecutionException
反常,假如堵塞等候进程中被中止则会抛出InterruptedException
反常。
get(long timeout, TimeUnit unit)
带超时的堵塞式获取履行成果
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
该办法调用后假如假如使命还没完结则会堵塞等候直到使命履行完结或许超出timeout时刻,假如堵塞等候进程中超时则会抛出TimeoutException
反常。
getNow(T valueIfAbsent)
马上获取履行成果
public T getNow(T valueIfAbsent)
该办法调用后,会马上获取成果不会堵塞等候。假如使命完结则直接回来履行完结后的成果,假如使命没有完结,则回来调用办法时传入的参数valueIfAbsent
值。
join()
不抛反常的堵塞时获取履行成果
public T join()
该办法和get()
办法作用相同,仅仅不会抛出反常。
complete(T value)
主动触发核算,回来异步是否履行结束
public boolean complete(T value)
该办法调用后,会主动触发核算成果,假如此刻异步履行并没有完结(此刻boolean值回来true),则经过get()
拿到的数据会是complete()
设置的参数value
值,假如此刻异步履行现已完结(此刻boolean值回来false),则经过get()
拿到的便是履行完结的成果。
// 比如
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
// 经过Lambda表达式完结履行内容,并回来成果经过CompletableFuture接纳
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
// 休眠2秒,使得异步履行变慢,会导致主动触发核算先履行,此刻回来的get便是555
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
return 666;
}, executor);
// 主动触发核算,判别异步履行是否完结
System.out.println(completableFuture.complete(555));
// 输出成果
System.out.println(completableFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
/**
输出成果:
true
555
**/
③ 对履行成果进行处理
whenComplete
等候前面使命履行完再履行当时处理
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action)
在创立好的初始使命或许是上一个使命后经过链式调用该办法,会在之前使命履行完结后持续履行whenComplete
里的内容(whenComplete
传入的action仅仅对之前使命的成果进行处理),即运用该办法能够避免前面说到的Future
接口的问题,不再需求经过堵塞或许轮询的办法去获取成果,而是经过调用该办法等使命履行结束主动调用。
该办法的参数为BiConsumer<? super T, ? super Throwable> action
顾客接口,能够接纳两个参数,一个是使命履行完的成果,一个是履行使命时的反常。
// 比如
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
CompletableFuture.supplyAsync(() -> 666, executor)
.whenComplete((res, ex) -> System.out.println("使命履行结束,成果为" + res + " 反常为" + ex)
);
} catch (Exception e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
/**
输出成果:
使命履行结束,成果为666 反常为null
**/
除了上述的办法外,还有一些相似的办法如
XXXAsync()
或许是XXXAsync(XX,Executor executor)
,关于这些办法,这儿一致阐明,后续文章中将不会再罗列
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor)
XXXAsync()
:表明上一个使命履行完结后,不会再运用之前使命中的线程,而是从头运用从默许线程(ForkJoinPool 线程池)中从头获取新的线程履行当时使命。
XXXAsync(XX,Executor executor)
:表明不会沿用之前使命的线程,而是运用自己第二个参数指定的线程池从头获取线程履行当时使命。
④ 对履行成果进行消费
thenRun
前面使命履行完后履行当时使命,不关怀前面使命的成果,也没回来值
public CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture.supplyAsync(actionA).thenRun(actionB)
像这样链式调用该办法表明:履行使命A完结后接着履行使命B,可是使命B不需求A的成果,而且履行完使命B也不会回来成果。
thenRun(Runnable action)
的参数为Runnable接口即没有传入参数。
// 比如
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
CompletableFuture.supplyAsync(() -> 666, executor)
.thenRun(() -> System.out.println("我都没有参数怎样拿到之前的成果,我也没有回来值。")
);
} catch (Exception e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
/**
输出成果:
我都没有参数怎样拿到之前的成果,我也没有回来值。
**/
thenAccept
前面使命履行完后履行当时使命,消费前面的成果,没有回来值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture.supplyAsync(actionA).thenRun(actionB)
像这样链式调用该办法表明:履行使命A完结后接着履行使命B,而且使命B需求A的成果,可是履行完使命B不会回来成果。
thenAccept(Consumer<? super T> action)
的参数为顾客接口,即能够传入一个参数,该参数为上一个使命的履行成果。
// 比如
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
CompletableFuture.supplyAsync(() -> 666, executor)
.thenAccept((res) -> System.out.println("我能拿到上一个的成果" + res + ",可是我无法传出去。")
);
} catch (Exception e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
/**
输出成果:
我能拿到上一个的成果666,可是我无法传出去。
**/
thenApply
前面使命履行完后履行当时使命,消费前面的成果,具有回来值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
CompletableFuture.supplyAsync(actionA).thenRun(actionB)
像这样链式调用该办法表明:履行使命A完结后接着履行使命B,而且使命B需求A的成果,而且履行完使命B需求有回来成果。
thenApply(Function<? super T,? extends U> fn)
的参数为函数式接口,即能够传入一个参数类型为T,该参数是上一个使命的履行成果,而且函数式接口需求有回来值,类型为U。
// 比如
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
CompletableFuture.supplyAsync(() -> 666, executor)
.thenApply((res) -> {
System.out.println("我能拿到上一个的成果" + res + "而且我要将成果传出去");
return res;
}
).whenComplete((res, ex) -> System.out.println("成果" + res));
} catch (Exception e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
/**
输出成果:
我能拿到上一个的成果666而且我要将成果传出去
成果666
**/
⑤ 反常处理
exceptionally
反常捕获,只消费前面使命中出现的反常信息,具有回来值
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
能够经过链式调用该办法来获取反常信息,而且具有回来值。假如某一个使命出现反常被exceptionally
捕获到则剩余的使命将不会再履行。相似于Java反常处理的catch。
exceptionally(Function<Throwable, ? extends T> fn)
的参数是函数式接口,具有一个参数以及回来值,该参数为前面使命的反常信息。
// 比如
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) throw new RuntimeException("error");
return 666;
}, executor)
.thenApply((res) -> {
System.out.println("不出现反常,成果为" + res);
return res;
}).exceptionally((ex) -> {
ex.printStackTrace();
return -1;
});
} catch (Exception e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
/**
输出成果:
// 这是不出现反常的状况
不出现反常,成果为666
// 这是出现反常的状况
java.util.concurrent.CompletionException: java.lang.RuntimeException: error
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: error
at com.xqsr.review.thread.ThreadTest.lambda$main$0(ThreadTest.java:15)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 3 more
**/
handle
反常处理,消费前面的成果及反常信息,具有回来值,不会中止后续使命
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
能够经过链式调用该办法能够跟thenApply()
相同能够消费前面使命的成果并完结自己使命内容,而且具有回来值。不同之处在于出现反常也能够接着往下履行,根据反常参数做进一步处理。
handle(BiFunction<? super T, Throwable, ? extends U> fn)
的参数是顾客接口,一个参数是使命履行成果,一个是反常信息,而且具有回来值。
// 比如
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
CompletableFuture.supplyAsync(() -> 666, executor)
.thenApply((res) -> {
if (Math.random() < 0.5) throw new RuntimeException("error");
return res;
}).handle((res, ex) -> {
System.out.println("成果" + res + "(null表明之前出现反常导致成果无法传过来)");
return res == null ? -1 : res;
}).thenApply((res) -> {
System.out.println("成果为" + res + "(-1表明之前出现反常,经过handler使得成果处理成-1)");
return res;
}).exceptionally((ex) -> {
ex.printStackTrace();
return -1;
});
} catch (Exception e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
/**
输出成果:
// 这是不出现反常的状况
成果666(null表明之前出现反常导致成果无法传过来)
成果为666(-1表明之前出现反常,经过handler使得成果处理成-1)
// 这是出现反常的状况
成果null(null表明之前出现反常导致成果无法传过来)
成果为-1(-1表明之前出现反常,经过handler使得成果处理成-1)
**/
能够看到经过handle
相似于Java反常处理的finally,出现反常并不会像运用exceptionally
那样中止后续的使命,而是持续履行,能够经过handle为之前出现反常无法取得的成果从头赋值(根据事务需求设置安全值之类的)。
⑥ 两组使命按次序履行
thenCompose
完结两组使命按前后次序履行
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn)
A.thenCompose(B)
相当于使命A要排在使命B前面,即次序的履行使命A、使命B。该办法的参数是函数式接口,函数式接口的参数是调用者的履行成果,回来值是另一个使命B。
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
System.out.println("使命A先履行成果为666");
return 666;
}, executor);
actionA.thenCompose((res) -> CompletableFuture.supplyAsync(() -> {
System.out.println("使命B后履行成果加上333");
return 333 + res;
})).whenComplete((res, ex) -> System.out.println(res));
} catch (Exception e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
/**
输出成果:
使命A先履行成果为666
使命B后履行成果加上333
999
**/
⑦ 两组使命谁快用谁
applyToEither
比较两组使命履行速度,谁快消费谁的履行成果
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn)
该办法用于比较两组使命的履行速度,谁先履行完就用谁的履行成果。
传入参数阐明:第一个参数传入的是另一个使命的履行内容,第二个参数传入的是终究这两个使命谁快回来谁的成果,并经过当时函数式接口进行接纳和处理(运用函数式接口,有参且有回来值)。
// 比如
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("使命A等候久一点,履行成果为555");
return 555;
}, executor);
actionA.applyToEither(CompletableFuture.supplyAsync(() -> {
System.out.println("使命B很快,履行成果为666");
return 666;
}), (res) -> {
System.out.println("终究运用的履行成果为" + res);
return res;
});
} catch (Exception e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
/**
输出成果:
使命B很快,履行成果为666
终究运用的履行成果为666
使命A等候久一点,履行成果为555
**/
除了
applyToEither
对使命终究成果进行获取并消费,而且具有回来值的办法外,还有两个相似的办法。
// 这个办法作用和上面的相同,比谁快拿谁的成果,不同的是这个办法只消费不具有回来值
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action)
// 这个办法作用和上面的相同,比谁快拿谁的成果,不同的是这个办法不消费成果也不具有回来值
public CompletableFuture<Void> runAfterEither(
CompletionStage<?> other, Runnable action)
⑧ 两组使命完结后兼并
thenCombine
等候两组使命履行结束后,兼并两组使命的履行成果
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
该办法用于两组使命都完结后,将两组使命的履行成果一起交给当时办法的BiFunction处理。先完结的使命会等候后者使命完结。
传入参数阐明:第一个参数传入的是另一个使命的履行内容,第二个参数传入的是带两个参数的函数式接口(第一个参数是使命1的履行成果,第二个参数是使命2的履行成果,具有回来值)。
// 比如
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("使命A等候久一点,履行成果为333");
return 333;
}, executor);
CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {
System.out.println("使命B很快,履行成果为666");
return 666;
}, executor);
actionA.thenCombine(actionB, (res1, res2) -> {
System.out.println("终究运用的履行成果为" + (res1 + res2));
return res1 + res2;
});
} catch (Exception e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
/**
输出成果:
使命B很快,履行成果为666
使命A等候久一点,履行成果为333
终究运用的履行成果为999
**/
除了
thenCombine
对使命终究成果进行获取并消费,而且具有回来值的办法外,还有两个相似的办法。
// 这个办法作用和上面的相同,获取兼并成果,不同的是这个办法只消费不具有回来值
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
// 这个办法作用和上面的相同,获取兼并成果,不同的是这个办法不消费成果也不具有回来值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action)
⑨ 多使命组合
allOf
完结并行地履行多个使命,等候一切使命履行完结(无需考虑履行次序)
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
该办法能够完结并行地履行多个使命,适用于多个使命没有依靠联系,能够相互独立履行的,传入参数为多个使命,没有回来值。
allOf()
办法会等候一切的使命履行结束再回来,能够经过get()
堵塞保证一切使命履行结束
// 比如
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
CompletableFuture<Void> actionA = CompletableFuture.runAsync(() -> {
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("使命A等候2秒后履行结束");
}, executor);
CompletableFuture<Void> actionB = CompletableFuture.runAsync(() -> {
System.out.println("使命B很快履行结束");
}, executor);
CompletableFuture<Void> actionC = CompletableFuture.runAsync(() -> {
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("使命C等候1秒后履行结束");
}, executor);
CompletableFuture<Void> actionD = CompletableFuture.runAsync(() -> {
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("使命D等候5秒后履行结束");
}, executor);
CompletableFuture.allOf(actionA, actionB, actionC, actionD).get();
} catch (Exception e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
/**
输出成果:
使命B很快履行结束
使命C等候1秒后履行结束
使命A等候2秒后履行结束
使命D等候5秒后履行结束
**/
anyOf
完结并行地履行多个使命,只要有个一个完结的便会回来履行成果
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
该办法能够完结并行地履行多个使命,传入参数为多个使命,具有回来值。该办法不会等候一切使命履行完结后再回来成果,而是当有一个使命完结时,便会回来那个使命的履行成果。
// 比如
public static void main(String[] args) {
// 快速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("使命A等候2秒后履行结束");
return 555;
}, executor);
CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {
System.out.println("使命B很快履行结束");
return 666;
}, executor);
CompletableFuture<Integer> actionC = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("使命C等候1秒后履行结束");
return 999;
}, executor);
CompletableFuture<Integer> actionD = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("使命D等候5秒后履行结束");
return 888;
}, executor);
System.out.println("最早履行完的回来成果为" + CompletableFuture.anyOf(actionA, actionB, actionC, actionD).get());
} catch (Exception e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
/**
输出成果:
使命B很快履行结束
最早履行完的回来成果为666
使命C等候1秒后履行结束
使命A等候2秒后履行结束
使命D等候5秒后履行结束
**/
一个运用CompletableFuture异步编列的比如
不需求关怀比如中的事务内容,运用时依照自己事务的需求,对不同的需求调用不同API即可。
编写使命时主要关怀以下几点: ① 是否需求消费之前使命的成果 ② 是否需求回来成果给其他使命消费 ③ 是否要求次序履行(是否允许并行,有没有前置要求)
/**
* 该办法用于获取单个产品的一切信息
* 1. 产品的基本信息
* 2. 产品的图片信息
* 3. 产品的出售特点组合
* 4. 产品的各种分类基本信息
* 5. 产品的促销信息
*/
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
// 创立产品Vo经过各个使命去完善Vo的信息
SkuItemVo skuItemVo = new SkuItemVo();
// 获取产品基本信息 查询到后设置进Vo中,回来基本信息给后续使命消费 (运用自定义的线程池进行异步)
CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
SkuInfoEntity info = this.getById(skuId);
skuItemVo.setInfo(info);
return info;
}, executor);
// 获取产品的图片信息 获取后设置进Vo中,此处不需求消费图片信息,也不需求回来成果。所以运用runAsync即可
CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(imagesEntities);
}, executor);
// 获取产品出售特点 由于要利用之前查询到的基本信息,但后续使命不需求消费出售特点(不需求回来成果),所以运用thenAcceptAsync消费之前的基本信息,不回来出售信息。
CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
skuItemVo.setSaleAttr(saleAttrVos);
}, executor);
// 获取产品各分类基本信息,相同要消费之前的基本信息,但无需回来,所以运用thenAcceptAsync即可
CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
skuItemVo.setDesc(spuInfoDescEntity);
}, executor);
// 获取产品的促销信息 这个也不需求消费之前使命的成果,也不需求回来成果。所以直接运用runAsync即可
CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> {
R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId);
if (skuSeckilInfo.getCode() == 0) {
SeckillSkuVo seckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() {
});
skuItemVo.setSeckillSkuVo(seckilInfoData);
if (seckilInfoData != null) {
long currentTime = System.currentTimeMillis();
if (currentTime > seckilInfoData.getEndTime()) {
skuItemVo.setSeckillSkuVo(null);
}
}
}
}, executor);
// 运用allOf()组合一切使命,而且运用get()堵塞,等候一切使命完结。
// 弥补:infoFuture不能放入allOf中,由于allOf是并行无序履行(需求多个条件是无依靠性的)的,当上面使命中有需求消费infoFuture的成果,所以需求先履行infoFuture。
CompletableFuture.allOf(imageFuture,saleAttrFuture,descFuture,seckillFuture).get();
// 最终回来产品Vo
return skuItemVo;
}