作者:京东科技 张天赐
前语
JDK 8 是一次重大的版本升级,新增了十分多的特性,其中之一便是CompletableFuture
。自此从 JDK 层面真正意义上的支撑了根据事件的异步编程范式,弥补了Future
的缺陷。
在咱们的日常优化中,最常用手法便是多线程并行履行。这时分就会涉及到CompletableFuture
的使用。
常见使用办法
下面举例一个常见场景。
假如咱们有两个 RPC 远程调用服务,咱们需求获取两个 RPC 的成果后,再进行后续逻辑处理。
public static void main(String[] args) {
// 使命 A,耗时 2 秒
int resultA = compute(1);
// 使命 B,耗时 2 秒
int resultB = compute(2);
// 后续事务逻辑处理
System.out.println(resultA + resultB);
}
能够预估到,串行履行最少耗时 4 秒,而且 B 使命并不依靠 A 使命成果。
对于这种场景,咱们通常会挑选并行的办法优化,Demo 代码如下:
public static void main(String[] args) {
// 仅简略举例,在出产代码中可别这么写!
// 计算耗时的函数
time(() -> {
CompletableFuture<Integer> result = Stream.of(1, 2)
// 创立异步使命
.map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
// 聚合
.reduce(CompletableFuture.completedFuture(0), (x, y) -> x.thenCombineAsync(y, Integer::sum, executor));
// 等候成果
try {
System.out.println("成果:" + result.get());
} catch (ExecutionException | InterruptedException e) {
System.err.println("使命履行反常");
}
});
}
输出:
[async-1]: 使命履行开端:1
[async-2]: 使命履行开端:2
[async-1]: 使命履行完结:1
[async-2]: 使命履行完结:2
成果:3
耗时:2 秒
能够看到耗时变成了 2 秒。
存在的问题
剖析
看上去CompletableFuture
现有功用能够满足咱们诉求。但当咱们引进一些实际常见状况时,一些潜在的缺乏便暴露出来了。
compute(x)
假如是一个根据入参查询用户某类型优惠券列表的使命,咱们需求查询两种优惠券并组合在一起返回给上游。假如上游要求咱们 2 秒内处理完毕并返回成果,但compute(x)
耗时却在 0.5 秒 ~ 无穷大波动。这时分咱们就需求把耗时过长的compute(x)
使命成果放弃,仅处理在指守时刻内完结的使命,尽可能保证服务可用。
那么以上代码的耗时由耗时最长的服务决议,无法满足现有诉求。通常咱们会使用get(long timeout, TimeUnit unit)
来指定获取成果的超不时刻,而且咱们会给compute(x)
设置一个超不时刻,到达后自动抛反常来中止使命。
public static void main(String[] args) {
// 仅简略举例,在出产代码中可别这么写!
// 计算耗时的函数
time(() -> {
List<CompletableFuture<Integer>> result = Stream.of(1, 2)
// 创立异步使命,compute(x) 超时抛出反常
.map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
.toList();
// 等候成果
int res = 0;
for (CompletableFuture<Integer> future : result) {
try {
res += future.get(2, SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
System.err.println("使命履行反常或超时");
}
}
System.out.println("成果:" + res);
});
}
输出:
[async-2]: 使命履行开端:2
[async-1]: 使命履行开端:1
[async-1]: 使命履行完结:1
使命履行反常或超时
成果:1
耗时:2 秒
能够看到,只要咱们能够给compute(x)
设置一个超不时刻将使命中止,结合get
、getNow
等获取成果的办法,就能够很好地办理整体耗时。
那么问题也就转变成了,如何给使命设置异步超不时刻呢?
现有做法
当异步使命是一个 RPC 恳求时,咱们能够设置一个 JSF 超时,以到达异步超时作用。
当恳求是一个 R2M 恳求时,咱们也能够操控 R2M 衔接的最大超不时刻来到达作用。
这么看好像咱们都是在依靠三方中间件的才能来办理使命超不时刻?那么就存在一个问题,中间件超时操控才能有限,假如异步使命是中间件 IO 操作 + 本地计算操作怎么办?
用 JSF 超时举一个详细的比如,反编译 JSF 的获取成果代码如下:
public V get(long timeout, TimeUnit unit) throws InterruptedException {
// 配置的超不时刻
timeout = unit.toMillis(timeout);
// 剩余等候时刻
long remaintime = timeout - (this.sentTime - this.genTime);
if (remaintime <= 0L) {
if (this.isDone()) {
// 反序列化获取成果
return this.getNow();
}
} else if (this.await(remaintime, TimeUnit.MILLISECONDS)) {
// 等候时刻内使命完结,反序列化获取成果
return this.getNow();
}
this.setDoneTime();
// 超时抛出反常
throw this.clientTimeoutException(false);
}
当这个使命刚好卡在超时边际完结时,这个使命的耗不时刻就变成了超不时刻 + 获取成果时刻。而获取成果(反序列化)作为纯本地计算操作,耗时长短受 CPU 影响较大。
某些 CPU 使用率高的状况下,就会呈现异步使命没能触发抛出反常中止,导致咱们无法精确操控超不时刻。对上游来说,本次恳求全部失利。
处理办法
JDK 9
这类问题十分常见,如大促场景,服务器 CPU 瞬间升高就会呈现以上问题。
那么如何处理呢?其实 JDK 的开发大佬们早有研究。在 JDK 9,CompletableFuture
正式提供了orTimeout
、completeTimeout
办法,来精确完结异步超时操控。
public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
if (unit == null)
throw new NullPointerException();
if (result == null)
whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit)));
return this;
}
JDK 9orTimeout
其完结原理是经过一个守时使命,在给守时刻之后抛出反常。假如使命在指守时刻内完结,则撤销抛反常的操作。
以上代码咱们按履行次序来看下:
首要履行new Timeout(this)
。
static final class Timeout implements Runnable {
final CompletableFuture<?> f;
Timeout(CompletableFuture<?> f) { this.f = f; }
public void run() {
if (f != null && !f.isDone())
// 抛出超时反常
f.completeExceptionally(new TimeoutException());
}
}
经过源码能够看到,Timeout
是一个完结 Runnable 的类,run()
办法担任给传入的异步使命经过completeExceptionally
CAS 赋值反常,将使命标记为反常完结。
那么谁来触发这个run()
办法呢?咱们看下Delayer
的完结。
static final class Delayer {
static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
// 到时刻触发 command 使命
return delayer.schedule(command, delay, unit);
}
static final class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureDelayScheduler");
return t;
}
}
static final ScheduledThreadPoolExecutor delayer;
static {
(delayer = new ScheduledThreadPoolExecutor(
1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
}
Delayer
其实便是一个单例守时调度器,Delayer.delay(new Timeout(this), timeout, unit)
经过ScheduledThreadPoolExecutor
完结指守时刻后触发Timeout
的run()
办法。
到这里就已经完结了超时抛出反常的操作。但当使命完结时,就没必要触发Timeout
了。因此咱们还需求完结一个撤销逻辑。
static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> f;
Canceller(Future<?> f) { this.f = f; }
public void accept(Object ignore, Throwable ex) {
if (ex == null && f != null && !f.isDone())
// 3 未触发抛反常使命则撤销
f.cancel(false);
}
}
当使命履行完结,或者使命履行反常时,咱们也就没必要抛出超时反常了。因此咱们能够把delayer.schedule(command, delay, unit)
返回的守时超时使命撤销,不再触发Timeout
。 当咱们的异步使命完结,而且守时超时使命未完结的时分,便是咱们撤销的机遇。因此咱们能够经过whenComplete(BiConsumer<? super T, ? super Throwable> action)
来完结。
Canceller
便是一个BiConsumer
的完结。其持有了delayer.schedule(command, delay, unit)
返回的守时超时使命,accept(Object ignore, Throwable ex)
完结了守时超时使命未完结后,履行cancel(boolean mayInterruptIfRunning)
撤销使命的操作。
JDK 8
假如咱们使用的是 JDK 9 或以上,咱们能够直接用 JDK 的完结来完结异步超时操作。那么 JDK 8 怎么办呢?
其实咱们也能够根据上述逻辑简略完结一个东西类来辅佐。
以下是咱们营销自己的东西类以及用法,贴出来给我们作为参阅,我们也能够自己写的更高雅一些~
调用办法:
CompletableFutureExpandUtils.orTimeout(异步使命, 超不时刻, 时刻单位);
东西类源码:
package com.jd.jr.market.reduction.util;
import com.jdpay.market.common.exception.UncheckedException;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
/**
* CompletableFuture 扩展东西
*
* @author zhangtianci7
*/
public class CompletableFutureExpandUtils {
/**
* 假如在给定超时之前未完结,则反常完结此 CompletableFuture 并抛出 {@link TimeoutException} 。
*
* @param timeout 在呈现 TimeoutException 反常完结之前等候多长时刻,以 {@code unit} 为单位
* @param unit 一个 {@link TimeUnit},结合 {@code timeout} 参数,表示给定粒度单位的持续时刻
* @return 入参的 CompletableFuture
*/
public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) {
if (null == unit) {
throw new UncheckedException("时刻的给定粒度不能为空");
}
if (null == future) {
throw new UncheckedException("异步使命不能为空");
}
if (future.isDone()) {
return future;
}
return future.whenComplete(new Canceller(Delayer.delay(new Timeout(future), timeout, unit)));
}
/**
* 超不时反常完结的操作
*/
static final class Timeout implements Runnable {
final CompletableFuture<?> future;
Timeout(CompletableFuture<?> future) {
this.future = future;
}
public void run() {
if (null != future && !future.isDone()) {
future.completeExceptionally(new TimeoutException());
}
}
}
/**
* 撤销不需求的超时的操作
*/
static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> future;
Canceller(Future<?> future) {
this.future = future;
}
public void accept(Object ignore, Throwable ex) {
if (null == ex && null != future && !future.isDone()) {
future.cancel(false);
}
}
}
/**
* 单例推迟调度器,仅用于启动和撤销使命,一个线程就满足
*/
static final class Delayer {
static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
static final class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureExpandUtilsDelayScheduler");
return t;
}
}
static final ScheduledThreadPoolExecutor delayer;
static {
delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
delayer.setRemoveOnCancelPolicy(true);
}
}
}
参阅资料
- JEP 266: JDK 9 并发包更新提案