作者:京东科技 张天赐

前语

JDK 8 是一次重大的版本升级,新增了十分多的特性,其中之一便是CompletableFuture。自此从 JDK 层面真正意义上的支撑了根据事件的异步编程范式,弥补了Future的缺陷。

在咱们的日常优化中,最常用手法便是多线程并行履行。这时分就会涉及到CompletableFuture的使用。

常见使用办法

下面举例一个常见场景。

假如咱们有两个 RPC 远程调用服务,咱们需求获取两个 RPC 的成果后,再进行后续逻辑处理。

public static void main(String[] args) {
    // 使命 A,耗时 2 秒
    int resultA = compute(1);
    // 使命 B,耗时 2 秒
    int resultB = compute(2);
    // 后续事务逻辑处理
    System.out.println(resultA + resultB);
}

能够预估到,串行履行最少耗时 4 秒,而且 B 使命并不依靠 A 使命成果。

对于这种场景,咱们通常会挑选并行的办法优化,Demo 代码如下:

public static void main(String[] args) {
    // 仅简略举例,在出产代码中可别这么写!
    // 计算耗时的函数
    time(() -> {
        CompletableFuture<Integer> result = Stream.of(1, 2)
                                                  // 创立异步使命
                                                  .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
                                                  // 聚合
                                                  .reduce(CompletableFuture.completedFuture(0), (x, y) -> x.thenCombineAsync(y, Integer::sum, executor));
        // 等候成果
        try {
            System.out.println("成果:" + result.get());
        } catch (ExecutionException | InterruptedException e) {
            System.err.println("使命履行反常");
        }
    });
}
输出:
[async-1]: 使命履行开端:1
[async-2]: 使命履行开端:2
[async-1]: 使命履行完结:1
[async-2]: 使命履行完结:2
成果:3
耗时:2

能够看到耗时变成了 2 秒。

存在的问题

剖析

看上去CompletableFuture现有功用能够满足咱们诉求。但当咱们引进一些实际常见状况时,一些潜在的缺乏便暴露出来了。

compute(x)假如是一个根据入参查询用户某类型优惠券列表的使命,咱们需求查询两种优惠券并组合在一起返回给上游。假如上游要求咱们 2 秒内处理完毕并返回成果,但compute(x)耗时却在 0.5 秒 ~ 无穷大波动。这时分咱们就需求把耗时过长的compute(x)使命成果放弃,仅处理在指守时刻内完结的使命,尽可能保证服务可用。

那么以上代码的耗时由耗时最长的服务决议,无法满足现有诉求。通常咱们会使用get(long timeout, TimeUnit unit)来指定获取成果的超不时刻,而且咱们会给compute(x)设置一个超不时刻,到达后自动抛反常来中止使命。

public static void main(String[] args) {
    // 仅简略举例,在出产代码中可别这么写!
    // 计算耗时的函数
    time(() -> {
        List<CompletableFuture<Integer>> result = Stream.of(1, 2)
                                                        // 创立异步使命,compute(x) 超时抛出反常
                                                        .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
                                                        .toList();
        // 等候成果
        int res = 0;
        for (CompletableFuture<Integer> future : result) {
            try {
                res += future.get(2, SECONDS);
            } catch (ExecutionException | InterruptedException | TimeoutException e) {
                System.err.println("使命履行反常或超时");
            }
        }
        System.out.println("成果:" + res);
    });
}
输出:
[async-2]: 使命履行开端:2
[async-1]: 使命履行开端:1
[async-1]: 使命履行完结:1
使命履行反常或超时
成果:1
耗时:2

能够看到,只要咱们能够给compute(x)设置一个超不时刻将使命中止,结合getgetNow等获取成果的办法,就能够很好地办理整体耗时。

那么问题也就转变成了,如何给使命设置异步超不时刻呢

现有做法

当异步使命是一个 RPC 恳求时,咱们能够设置一个 JSF 超时,以到达异步超时作用。

当恳求是一个 R2M 恳求时,咱们也能够操控 R2M 衔接的最大超不时刻来到达作用。

这么看好像咱们都是在依靠三方中间件的才能来办理使命超不时刻?那么就存在一个问题,中间件超时操控才能有限,假如异步使命是中间件 IO 操作 + 本地计算操作怎么办?

用 JSF 超时举一个详细的比如,反编译 JSF 的获取成果代码如下:

public V get(long timeout, TimeUnit unit) throws InterruptedException {
    // 配置的超不时刻
    timeout = unit.toMillis(timeout);
    // 剩余等候时刻
    long remaintime = timeout - (this.sentTime - this.genTime);
    if (remaintime <= 0L) {
        if (this.isDone()) {
            // 反序列化获取成果
            return this.getNow();
        }
    } else if (this.await(remaintime, TimeUnit.MILLISECONDS)) {
        // 等候时刻内使命完结,反序列化获取成果
        return this.getNow();
    }
    this.setDoneTime();
    // 超时抛出反常
    throw this.clientTimeoutException(false);
}

当这个使命刚好卡在超时边际完结时,这个使命的耗不时刻就变成了超不时刻 + 获取成果时刻。而获取成果(反序列化)作为纯本地计算操作,耗时长短受 CPU 影响较大。

某些 CPU 使用率高的状况下,就会呈现异步使命没能触发抛出反常中止,导致咱们无法精确操控超不时刻。对上游来说,本次恳求全部失利。

处理办法

JDK 9

这类问题十分常见,如大促场景,服务器 CPU 瞬间升高就会呈现以上问题。

那么如何处理呢?其实 JDK 的开发大佬们早有研究。在 JDK 9,CompletableFuture正式提供了orTimeoutcompleteTimeout办法,来精确完结异步超时操控。

public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
    if (unit == null)
        throw new NullPointerException();
    if (result == null)
        whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit)));
    return this;
}

JDK 9orTimeout其完结原理是经过一个守时使命,在给守时刻之后抛出反常。假如使命在指守时刻内完结,则撤销抛反常的操作。

以上代码咱们按履行次序来看下:

首要履行new Timeout(this)

static final class Timeout implements Runnable {
    final CompletableFuture<?> f;
    Timeout(CompletableFuture<?> f) { this.f = f; }
    public void run() {
        if (f != null && !f.isDone())
            // 抛出超时反常
            f.completeExceptionally(new TimeoutException());
    }
}

经过源码能够看到,Timeout是一个完结 Runnable 的类,run()办法担任给传入的异步使命经过completeExceptionallyCAS 赋值反常,将使命标记为反常完结。

那么谁来触发这个run()办法呢?咱们看下Delayer的完结。

static final class Delayer {
    static ScheduledFuture<?> delay(Runnable command, long delay,
                                    TimeUnit unit) {
        // 到时刻触发 command 使命
        return delayer.schedule(command, delay, unit);
    }
    static final class DaemonThreadFactory implements ThreadFactory {
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("CompletableFutureDelayScheduler");
            return t;
        }
    }
    static final ScheduledThreadPoolExecutor delayer;
    static {
        (delayer = new ScheduledThreadPoolExecutor(
            1, new DaemonThreadFactory())).
            setRemoveOnCancelPolicy(true);
    }
}

Delayer其实便是一个单例守时调度器,Delayer.delay(new Timeout(this), timeout, unit)经过ScheduledThreadPoolExecutor完结指守时刻后触发Timeoutrun()办法。

到这里就已经完结了超时抛出反常的操作。但当使命完结时,就没必要触发Timeout了。因此咱们还需求完结一个撤销逻辑。

static final class Canceller implements BiConsumer<Object, Throwable> {
    final Future<?> f;
    Canceller(Future<?> f) { this.f = f; }
    public void accept(Object ignore, Throwable ex) {
        if (ex == null && f != null && !f.isDone())
        // 3 未触发抛反常使命则撤销
            f.cancel(false);
    }
}

当使命履行完结,或者使命履行反常时,咱们也就没必要抛出超时反常了。因此咱们能够把delayer.schedule(command, delay, unit)返回的守时超时使命撤销,不再触发Timeout。 当咱们的异步使命完结,而且守时超时使命未完结的时分,便是咱们撤销的机遇。因此咱们能够经过whenComplete(BiConsumer<? super T, ? super Throwable> action)来完结。

Canceller便是一个BiConsumer的完结。其持有了delayer.schedule(command, delay, unit)返回的守时超时使命,accept(Object ignore, Throwable ex)完结了守时超时使命未完结后,履行cancel(boolean mayInterruptIfRunning)撤销使命的操作。

JDK 8

假如咱们使用的是 JDK 9 或以上,咱们能够直接用 JDK 的完结来完结异步超时操作。那么 JDK 8 怎么办呢?

其实咱们也能够根据上述逻辑简略完结一个东西类来辅佐。

以下是咱们营销自己的东西类以及用法,贴出来给我们作为参阅,我们也能够自己写的更高雅一些~

调用办法:

CompletableFutureExpandUtils.orTimeout(异步使命, 超不时刻, 时刻单位);

东西类源码:

package com.jd.jr.market.reduction.util;
import com.jdpay.market.common.exception.UncheckedException;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
/**
 * CompletableFuture 扩展东西
 *
 * @author zhangtianci7
 */
public class CompletableFutureExpandUtils {
    /**
     * 假如在给定超时之前未完结,则反常完结此 CompletableFuture 并抛出 {@link TimeoutException} 。
     *
     * @param timeout 在呈现 TimeoutException 反常完结之前等候多长时刻,以 {@code unit} 为单位
     * @param unit    一个 {@link TimeUnit},结合 {@code timeout} 参数,表示给定粒度单位的持续时刻
     * @return 入参的 CompletableFuture
     */
    public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) {
        if (null == unit) {
            throw new UncheckedException("时刻的给定粒度不能为空");
        }
        if (null == future) {
            throw new UncheckedException("异步使命不能为空");
        }
        if (future.isDone()) {
            return future;
        }
        return future.whenComplete(new Canceller(Delayer.delay(new Timeout(future), timeout, unit)));
    }
    /**
     * 超不时反常完结的操作
     */
    static final class Timeout implements Runnable {
        final CompletableFuture<?> future;
        Timeout(CompletableFuture<?> future) {
            this.future = future;
        }
        public void run() {
            if (null != future && !future.isDone()) {
                future.completeExceptionally(new TimeoutException());
            }
        }
    }
    /**
     * 撤销不需求的超时的操作
     */
    static final class Canceller implements BiConsumer<Object, Throwable> {
        final Future<?> future;
        Canceller(Future<?> future) {
            this.future = future;
        }
        public void accept(Object ignore, Throwable ex) {
            if (null == ex && null != future && !future.isDone()) {
                future.cancel(false);
            }
        }
    }
    /**
     * 单例推迟调度器,仅用于启动和撤销使命,一个线程就满足
     */
    static final class Delayer {
        static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
            return delayer.schedule(command, delay, unit);
        }
        static final class DaemonThreadFactory implements ThreadFactory {
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("CompletableFutureExpandUtilsDelayScheduler");
                return t;
            }
        }
        static final ScheduledThreadPoolExecutor delayer;
        static {
            delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
            delayer.setRemoveOnCancelPolicy(true);
        }
    }
}

参阅资料

  1. JEP 266: JDK 9 并发包更新提案