1、CompletableFuture介绍

CompletableFuture目标是JDK1.8版别新引入的类,这个类完结了两个接口,一个是Future接口,一个是CompletionStage接口。

CompletionStage接口是JDK1.8版别供给的接口,用于异步履行中的阶段处理,CompletionStage定义了一组接口用于在一个阶段履行结束之后,要么持续履行下一个阶段,要么对成果进行转化产生新的成果等,一般来说要履行下一个阶段都需求上一个阶段正常完结,这个类也供给了对反常成果的处理接口

2、CompletableFuture的API

2.1 提交使命

在CompletableFuture中提交使命有以下几种办法:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

这四个办法都是用来提交使命的,不同的是supplyAsync提交的使命有回来值,runAsync提交的使命没有回来值。两个接口都有一个重载的办法,第二个入参为指定的线程池,假如不指定,则默许运用ForkJoinPool.commonPool()线程池。在运用的过程中尽量依据不同的事务来指定不同的线程池,便利对不同线程池进行监控,一同避免事务共用线程池相互影响。

2.2 成果转化

2.2.1 thenApply

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

thenApply这一组函数入参是Function,意思是将上一个CompletableFuture履行成果作为入参,再次进行转化或者计算,重新回来一个新的值。

2.2.2 handle

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

handle这一组函数入参是BiFunction,该函数式接口有两个入参一个回来值,意思是处理上一个CompletableFuture的处理成果,一同假如有反常,需求手动处理反常。

2.2.3 thenRun

public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

thenRun这一组函数入参是Runnable函数式接口,该接口无需入参和出参,这一组函数是在上一个CompletableFuture使命履行完结后,在履行另外一个接口,不需求上一个使命的成果,也不需求回来值,只需求在上一个使命履行完结后履行即可。

2.2.4 thenAccept

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

thenAccept这一组函数的入参是Consumer,该函数式接口有一个入参,没有回来值,所以这一组接口的意思是处理上一个CompletableFuture的处理成果,可是不回来成果。

2.2.5 thenAcceptBoth

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

thenAcceptBoth这一组函数入参包括CompletionStage以及BiConsumer,CompletionStage是JDK1.8新增的接口,在JDK中只有一个完结类:CompletableFuture,所以第一个入参便是CompletableFuture,这一组函数是用来承受两个CompletableFuture的回来值,并将其组合到一同。BiConsumer这个函数式接口有两个入参,而且没有回来值,BiConsumer的第一个入参便是调用方CompletableFuture的履行成果,第二个入参便是thenAcceptBoth接口入参的CompletableFuture的履行成果。所以这一组函数意思是将两个CompletableFuture履行成果合并到一同。

2.2.6 thenCombine

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

thenCombine这一组函数和thenAcceptBoth类似,入参都包括一个CompletionStage,也便是CompletableFuture目标,意思也是组合两个CompletableFuture的履行成果,不同的是thenCombine的第二个入参为BiFunction,该函数式接口有两个入参,一同有一个回来值。所以与thenAcceptBoth不同的是,thenCombine将两个使命成果合并后会回来一个全新的值作为出参。

2.2.7 thenCompose

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

thenCompose这一组函数意思是将调用方的履行成果作为Function函数的入参,一同回来一个新的CompletableFuture目标。

2.3 回调办法

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

whenComplete办法意思是当上一个CompletableFuture目标使命履行完结后履行该办法。BiConsumer函数式接口有两个入参没有回来值,这两个入参第一个是CompletableFuture使命的履行成果,第二个是反常信息。表明处理上一个使命的成果,假如有反常,则需求手动处理反常,与handle办法的区别在于,handle办法的BiFunction是有回来值的,而BiConsumer是没有回来值的。

以上办法都有一个带有Async的办法,带有Async的办法表明是异步履行的,会将该使命放到线程池中履行,一同该办法会有一个重载的办法,最后一个参数为Executor,表明异步履行能够指定线程池履行。为了便利进行操控,最好在运用CompletableFuture时手动指定咱们的线程池。

2.4 反常处理

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

exceptionally是用来处理反常的,当使命抛出反常后,能够经过exceptionally来进行处理,也能够选择运用handle来进行处理,不过两者有些不同,hand是用来处理上一个使命的成果,假如有反常情况,就处理反常。而exceptionally能够放在CompletableFuture处理的最后,作为兜底逻辑来处理不知道反常。

2.5 获取成果

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf是需求入参中一切的CompletableFuture使命履行完结,才会进行下一步;

anyOf是入参中任何一个CompletableFuture使命履行完结都能够履行下一步。

public T get() throws InterruptedException, ExecutionException
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
public T getNow(T valueIfAbsent)
public T join()

get办法一个是不带超时时刻的,一个是带有超时时刻的。

getNow办法则是当即回来成果,假如还没有成果,则回来默许值,也便是该办法的入参。

join办法是不带超时时刻的等候使命完结。

3、CompletableFuture原理

join办法相同表明获取成果,可是join与get办法有什么区别呢。

public T join() {
    Object r;
    return reportJoin((r = result) == null ? waitingGet(false) : r);
}
public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        Object r;
        long nanos = unit.toNanos(timeout);
        return reportGet((r = result) == null ? timedGet(nanos) : r);
}
public T getNow(T valueIfAbsent) {
        Object r;
        return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}

以上是CompletableFuture类中两个办法的代码,能够看到两个办法几乎一样。区别在于reportJoin/reportGet,waitingGet办法是共同的,只不过参数不一样,咱们在看下reportGet与reportJoin办法。

private static <T> T reportGet(Object r)
        throws InterruptedException, ExecutionException {
        if (r == null) // by convention below, null means interrupted
            throw new InterruptedException();
        if (r instanceof AltResult) {
            Throwable x, cause;
            if ((x = ((AltResult)r).ex) == null)
                return null;
            if (x instanceof CancellationException)
                throw (CancellationException)x;
            if ((x instanceof CompletionException) &&
                (cause = x.getCause()) != null)
                x = cause;
            throw new ExecutionException(x);
        }
        @SuppressWarnings("unchecked") T t = (T) r;
        return t;
    }
private static <T> T reportJoin(Object r) {
        if (r instanceof AltResult) {
            Throwable x;
            if ((x = ((AltResult)r).ex) == null)
                return null;
            if (x instanceof CancellationException)
                throw (CancellationException)x;
            if (x instanceof CompletionException)
                throw (CompletionException)x;
            throw new CompletionException(x);
        }
        @SuppressWarnings("unchecked") T t = (T) r;
        return t;
    }

能够看到这两个办法很附近,reportGet办法判别了r目标是否为空,并抛出了中断反常,而reportJoin办法没有判别,一同reportJoin抛出的都是运行时反常,所以join办法也是无需手动捕获反常的。

咱们在看下waitingGet办法

private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0)
                spins = SPINS;
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }
            else if (q == null)
                q = new Signaller(interruptible, 0L, 0L);
            else if (!queued)
                queued = tryPushStack(q);
            else if (interruptible && q.interruptControl < 0) {
                q.thread = null;
                cleanStack();
                return null;
            }
            else if (q.thread != null && result == null) {
                try {
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible)
                    r = null; // report interruption
                else
                    Thread.currentThread().interrupt();
            }
        }
        postComplete();
        return r;
    }

该waitingGet办法是经过while的办法循环判别是否使命现已完结并产生成果,假如成果为空,则会一向在这儿循环,这儿需求留意的是在这儿初始化了一下spins=-1,当第一次进入while循环的时分,spins是-1,这时会将spins赋值为一个常量,该常量为SPINS。

private static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
                                      1 << 8 : 0);

这儿判别可用CPU数是否大于1,假如大于1,则该常量为 1<< 8,也便是256,不然该常量为0。

第2次进入while循环的时分,spins是256大于0,这儿做了减一的操作,下次进入while循环,假如还没有成果,依然是大于0持续做减一的操作,此处用来做短时刻的自旋等候成果,只有当spins等于0,后续会进入正常流程判别。

咱们在看下timedGet办法的源码

private Object timedGet(long nanos) throws TimeoutException {
        if (Thread.interrupted())
            return null;
        if (nanos <= 0L)
            throw new TimeoutException();
        long d = System.nanoTime() + nanos;
        Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
        boolean queued = false;
        Object r;
        // We intentionally don't spin here (as waitingGet does) because
        // the call to nanoTime() above acts much like a spin.
        while ((r = result) == null) {
            if (!queued)
                queued = tryPushStack(q);
            else if (q.interruptControl < 0 || q.nanos <= 0L) {
                q.thread = null;
                cleanStack();
                if (q.interruptControl < 0)
                    return null;
                throw new TimeoutException();
            }
            else if (q.thread != null && result == null) {
                try {
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q.interruptControl < 0)
            r = null;
        q.thread = null;
        postComplete();
        return r;
    }

timedGet办法依然是经过while循环的办法来判别是否现已完结,timedGet办法入参为一个纳秒值,并经过该值计算出一个deadline截止时刻,当while循环还未获取到使命成果且现已达到截止时刻,则抛出一个TimeoutException反常。

4、CompletableFuture完结多线程使命

这儿咱们经过CompletableFuture来完结一个多线程处理异步使命的例子。

这儿咱们创立10个使命提交到咱们指定的线程池中履行,并等候这10个使命悉数履行结束。

每个使命的履行流程为第一次先履行加法,第2次履行乘法,假如产生反常则回来默许值,当10个使命履行完结后顺次打印每个使命的成果。

public void demo() throws InterruptedException, ExecutionException, TimeoutException {
        // 1、自定义线程池
        ExecutorService executorService = new ThreadPoolExecutor(5, 10,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100));
        // 2、调集保存future目标
        List<CompletableFuture<Integer>> futures = new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            CompletableFuture<Integer> future = CompletableFuture
                    // 提交使命到指定线程池
                    .supplyAsync(() -> this.addValue(finalI), executorService)
                    // 第一个使命履行成果在此处进行处理
                    .thenApplyAsync(k -> this.plusValue(finalI, k), executorService)
                    // 使命履行反常时处理反常并回来默许值
                    .exceptionally(e -> this.defaultValue(finalI, e));
            // future目标添加到调集中
            futures.add(future);
        }
        // 3、等候一切使命履行完结,此处最好加超时时刻
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES);
        for (CompletableFuture<Integer> future : futures) {
            Integer num = future.get();
            System.out.println("使命履行成果为:" + num);
        }
        System.out.println("使命悉数履行完结!");
    }
    private Integer addValue(Integer index) {
        System.out.println("第" + index + "个使命第一次履行");
        if (index == 3) {
            int value = index / 0;
        }
        return index + 3;
    }
    private Integer plusValue(Integer index, Integer num) {
        System.out.println("第" + index + "个使命第2次履行,前次履行成果:" + num);
        return num * 10;
    }
    private Integer defaultValue(Integer index, Throwable e) {
        System.out.println("第" + index + "个使命履行反常!" + e.getMessage());
        e.printStackTrace();
        return 10;
    }

作者:京东物流 丁冬

来源:京东云开发者社区 自猿其说Tech