前语

最近遇到了一个业务场景,涉及到多数据源之间的恳求的流程编列,正好看到了一篇某团介绍CompletableFuture原理和运用的技术文章,主要还是涉及运用层面。网上很多文章涉及原理的部分讲的不是特别具体且比较笼统。因为涉及到多线程的工具必需求了解原理,否则一旦遇到问题排查起来就只能凭形而上学,正好借此梳理一下CompletableFuture的作业原理

背景

咱们把Runnable了解为最基本的线程使命,只具备在线程下履行一段逻辑的才干。为了获取履行的回来值,发明了Callable和与其配合运用的Future。为了将使命之间进行逻辑编列,就诞生了CompletableFuture。关于怎样了解使命的逻辑编列,举一个简略的比方:

任务编排:CompletableFuture从入门到精通

打开电脑-更新体系这两个操作是有先后次序的,可是泡茶和这两个操作没有先后次序,是能够并行的,而开始作业必需求等候其他操作结束之后才干进行,这就形成了使命编列的履行链。

在IO密集型体系中,相似的场景有很多。因为不同数据集的查询依靠主键不同,A数据集的查询主键是B数据集的一个字段这种情况很常见,一般还需求并发查询多个数据集的数据,所以对于多线程的履行编列是有需求的。

一种解决办法是CountDownLatch,让线程履行到某个当地后进行等候,直到依靠的使命履行结束。对于一些简略的履行链是能够满意的,可是当编列逻辑杂乱起来,CountDownLatch会导致代码难以保护和调试。所以诞生了CompletableFuture用来描绘和保护使命之间的依靠联系以进行使命编列。在实践运用中,有以下两类场景是适合运用使命编列的:

  • 多数据源恳求的流程编列

  • 非堵塞化网关等NIO场景

运用办法

创立与履行

同步办法

和FutureTask相似,CompletableFuture也经过get()办法获取履行成果。可是不同的是,CompletableFuture自身能够不承载可履行的使命(比较FutureTask则必须承载一个可履行的使命Callable),经过一个用于符号履行成功并设置回来值的函数,在运用上也更为灵敏,如下:

	CompletableFuture<String> demo = new CompletableFuture<>();
	demo.complete("success");
	System.out.println(demo.get());

履行成果:success

和Future相似,get()函数也是同步堵塞的,调用get函数后线程会堵塞直到调用complete办法符号使命现已履行成功。

除了手动触发使命的完结,也能够让创立目标的一起就符号使命完结:

	CompletableFuture<String> demo = CompletableFuture.completedFuture("success");
	System.out.println(demo.get());

履行成果:success

异步办法

比较于同步办法,异步履行更为常见。比方下面这个比方:

        CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> {
            System.out.println("do something by thread" + Thread.currentThread().getName());
            return "success";
        });
        System.out.println(demo.get());

履行成果:
do something by threadForkJoinPool.commonPool-worker-9
success

supplyAsync办法接纳一个Supplier目标,逻辑函数交给线程池中的线程异步履行

任务编排:CompletableFuture从入门到精通

默许会运用ForkJoinPool的公共线程池来履行代码(不推荐),当然也能够指定线程池,如下:

	ExecutorService executor = Executors.newFixedThreadPool(4);
    CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> {
        System.out.println("do something by thread" + Thread.currentThread().getName());
        return "success";
    }, executor);
    System.out.println(demo.get());

履行成果:
do something by threadpool-1-thread-1
success

假如不需求履行成果,也能够用runAsync办法:

	CompletableFuture.runAsync(() -> {
        System.out.println("do something by thread" + Thread.currentThread().getName());
    });

履行成果:
do something by threadForkJoinPool.commonPool-worker-9

多使命编列

多使命编列是CompletableFuture的中心,这儿列举不同的场景来进行阐明

一元依靠

任务编排:CompletableFuture从入门到精通

过程2需求依靠过程1履行结束才干履行,相似主线程的次序履行,能够经过以下办法完结:

      ExecutorService executor = Executors.newFixedThreadPool(4);
      CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("履行【过程1】");
        return "【过程1的履行成果】";
      }, executor);
      CompletableFuture<String> step2 = step1.thenApply(result -> {
        System.out.println("上一步操作成果为:" + result);
        return "【过程2的履行成果】";
      });
      System.out.println("过程2的履行成果:" + step2.get());

履行成果:
履行【过程1】
上一步操作成果为:【过程1的履行成果】
过程2的履行成果:【过程2的履行成果】

经过thenApply办法,接纳上一个CompletableFuture目标的回来值,其间隐含的逻辑是,该处逻辑只有等上一个CompletableFuture目标履行完后才会履行

二元依靠

比较于一元依靠的次序履行链,二元依靠更为常见,比方下面这个场景:

任务编排:CompletableFuture从入门到精通

过程1和2是并行的,而过程3需求等过程1和2履行完之后才干履行,经过CompletableFuture是这么完结的:

        ExecutorService executor = Executors.newFixedThreadPool(4);
        CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("履行【过程1】");
            return "【过程1的履行成果】";
        }, executor);
        CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("履行【过程2】");
            return "【过程2的履行成果】";
        }, executor);
        CompletableFuture<String> step3 = step1.thenCombine(step2, (result1, result2) -> {
            System.out.println("前两步操作成果分别为:" + result1 + result2);
            return "【过程3的履行成果】";
        });
        System.out.println("过程3的履行成果:" + step3.get());

履行成果:
履行【过程1】
履行【过程2】
前两步操作成果分别为:【过程1的履行成果】【过程2的履行成果】
过程3的履行成果:【过程3的履行成果】

经过thenCombine办法,等候step1和step2都履行结束后,获取其回来成果并履行一段新的逻辑

多元依靠

当然还可能有下面这种场景,过程M需求依靠1-N的N个前置节点:

任务编排:CompletableFuture从入门到精通

这种情况会更为杂乱,完结办法如下:

    	ExecutorService executor = Executors.newFixedThreadPool(4);
        CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("履行【过程1】");
            return "【过程1的履行成果】";
        }, executor);
        CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("履行【过程2】");
            return "【过程2的履行成果】";
        }, executor);
        CompletableFuture<String> step3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("履行【过程3】");
            return "【过程3的履行成果】";
        }, executor);
        CompletableFuture<Void> stepM = CompletableFuture.allOf(step1, step2, step3);
        CompletableFuture<String> stepMResult = stepM.thenApply(res -> {
           // 经过join函数获取回来值
           String result1 = step1.join();
           String result2 = step2.join();
           String result3 = step3.join();
           return result1 + result2 + result3;
        });
        System.out.println("过程M的成果:" + stepMResult.get());

履行成果:
履行【过程1】
履行【过程2】
履行【过程3】
过程M的成果:【过程1的履行成果】【过程2的履行成果】【过程3的履行成果】

经过allOf函数声明当参数中的全部使命履行结束后,才会履行下一步操作,可是要注意,allOf自身仅仅界说节点,真实堵塞的方位是thenApply函数。

和之前的办法不同,因为采用了不定变量,所以要经过CompletableFuture#join来获取每个使命的回来值。

除了allOf之外,假如咱们需求恣意一个使命完结后就履行下一步操作,能够运用anyOf办法,如下:

    // step1/2/3的界说相同
		// ...
		CompletableFuture<Object> stepM = CompletableFuture.anyOf(step1, step2, step3);
    System.out.println("过程M的成果:" + stepM.get());

履行成果:
过程M的成果:【过程1的履行成果】

与allOf不同,anyOf的回来值即为榜首个履行结束的使命

作业原理

概念

在讲原理之前,先来了解一下CompletableFuture的界说。在完结上,CompletableFuture承继了Future和CompletionStage

任务编排:CompletableFuture从入门到精通

Future毋庸置疑,CompletableFuture最基本的才干便是获取异步计算的成果。CompletionStage则是声明了编列节点的才干,每一个CompletionStage都声明了流程树上的一个节点(见下图)

任务编排:CompletableFuture从入门到精通

CompletionStage声明的接口thenXXX,包括thenApply、thenCompose等,界说了节点之间的连接办法(实践情况更为杂乱,具体原理参考下节函数剖析),经过这种办法,终究界说出一颗流程树,从而完结了多线程的使命编列。CompletionStage的办法回来值一般是另一个CompletionStage,从而构成了链式调用。

结构剖析

CompletableFuture里包括两个变量,result和stack

任务编排:CompletableFuture从入门到精通

result很好了解,便是当时节点的履行成果。stack就比较杂乱,是一个无锁并发栈,声明了当时节点履行结束后要触发的节点列表,接下来咱们具体讲一下

CompletableFuture中的栈规划

Completion是一个无锁并发栈,声明了当时节点履行结束后要触发的节点列表。在结构上是一个链式节点,其间只包括了一个指向下一个节点的next目标

任务编排:CompletableFuture从入门到精通

咱们能够看到Completion有繁多的完结类,表明不同的依靠办法。

任务编排:CompletableFuture从入门到精通

咱们知道,在CompletableFuture中的流程编列是经过thenApply、thenAccept、thenCombine等办法来完结的,

  • thenApply:接纳上一步的处理成果,进行下一步消费,并回来成果

  • thenAccept:和thenApply相似,不过无成果回来

  • thenCombine:一起接纳两个流程节点,等其都履行结束后一起处理成果

每个函数实践分别对应了一种Completion完结类,以刚才的三种函数为例,分别对应了UniApply、UniAccept、UniCombine三个目标。所以Completion能够以为是流程编列逻辑的笼统目标,能够了解为流程节点,或许使命节点。

以UniCompletion为例,结构如下:

abstract static class UniCompletion<T,V> extends Completion {
    Executor executor;                 // 线程池
    CompletableFuture<V> dep;          // 完结使命依靠的cf
    CompletableFuture<T> src;          // 完结使命所需资源地点的cf
    /**
     * 假如使命能够被履行则回来true,经过FJ符号位保证只有一个线程判别成功。
     * 假如使命是异步的,则在使命启动后经过tryFire来履行使命
     */
    final boolean claim() {
        Executor e = executor;
        if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
            if (e == null)
                return true;
            executor = null; // disable
            e.execute(this);
        }
        return false;
    }
    /**
     * 假如dep不为空回来true,用以判别当时使命节点是否已被激活
     */
    final boolean isLive() {
        return dep != null;
    }
}

先来看claim函数,这个比较简单解说,该函数用于判别使命是否可被履行。经过compareAndSetForkJoinTaskTag函数的CAS操作保证只有一个线程履行成功,主要作用便是在多线程情况下保证使命的正确履行。

接下来便是重头戏,源使命与依靠使命,这两个概念是CompletableFuture的中心,贯穿了全部逻辑的履行,只有了解了这两个概念,才干对履行原理有比较透彻的了解

源使命与依靠使命

源使命和依靠使命在UniCompletion中分别为src和dep特点,举个具体的比方,比方下面这段代码:

CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
    return "A";
});
CompletableFuture<String> b = a.thenApply(res -> {
    return "B " + res;
});

调用a.thenApply(Function fn)时,能够以为是生成了一个UniApply的流程节点(具体怎样生成的下文会提到),其间源使命便是a,而依靠使命则是thenApply的回来值。

换个简略的说法,在上面的代码中,咱们有a、b两个使命,b使命的完结需求依靠于a使命的完结,所以a会生成一个流程节点(UniApply目标),其间包括了b想要履行完结的全部资源(a的履行成果等),这时a使命就叫做源使命(因为a使命中有使命资源)。而b使命需求依靠a使命来完结,所以b使命叫做依靠使命。

源使命的完结会触发依靠使命的履行,这个便是使命编列的基本原理

函数剖析

在本节中,CompletableFuture因为名字太长,会以cf来代指

因为thenAccept、thenCombine函数等逻辑比较相似,咱们以最根底的thenApply函数为例进行剖析

中心函数

咱们先不要直接从thenApply、complete等函数入手,咱们先来看这几个中心函数,不明白做什么的不要紧,先了解这几个函数的原理就好

uniApply

CompletableFuture的逻辑在于“只有当X条件满意时,再履行Y逻辑”,uniApply函数便是担任这样的逻辑判别,首要看源码:

final <S> boolean uniApply(CompletableFuture<S> a,
                            Consumer<? super S> f, UniApply<S> c) {
    Object r; Throwable x;
    // 1
    if (a == null || (r = a.result) == null || f == null)
        return false;
    tryComplete: if (result == null) {
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        try {
            // 2
            if (c != null && !c.claim())
                return false;
            // 3
            S s = (S) r;
            completeValue(f.apply(s));
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}

整个办法能够分为三段(已在代码中标出),咱们分隔来说。

榜首段,判别所给的使命节点是否现已履行结束,假如现已履行结束则进入下一步

第二段,假如有相关的流程节点,则经过claim函数判别当时使命是否可被履行,假如可履行则进入下一步(保证多线程情况下使命的正确履行)

第三段,履行传入的函数并把值设置到当时目标中。

整个逻辑是这样的,首要咱们传入了一个cf目标、一个函数,和一个流程节点。只有当传入的cf目标履行完结(result不为空),再履行给定的函数,并把履行成果设置到当时目标中。假如不考虑特殊情况,uniApply办法用一句话解说便是:假如给定的使命现已履行结束,就履行传入的函数并把履行成果设置到当时目标中

tryFire

uniApply函数仅仅是一个有条件的函数履行器,真实想要达到使命编列还需求其他函数的参与,咱们先来看tryFire办法:

final CompletableFuture<V> tryFire(int mode) {
            CompletableFuture<V> d; CompletableFuture<T> a;
    if ((d = dep) == null ||
        !d.uniApply(a = src, fn, mode > 0 ? null : this))
        return null;
    dep = null; src = null; fn = null;
    return d.postFire(a, mode);
}

tryFire依据相关联系的不同有多种完结,实践履行流程相差不大,这儿以常用的UniApply的完结来举例。

首要这个办法接纳了一个mode参数,有以下几种取值:

  • -1:传播形式,或许叫嵌套形式。表明使命实践现已履行结束,仅仅在传递状态

  • 0:同步形式。使命由当时线程调用处理

  • 1:异步形式。使命需求提交到指定线程池处理

依据mode的不同,实践tryFire履行的流程也会发生很大差异。不过归根结底,tryFire办法的本质是调用了uniApply履行一次使命,假如履行成功,则会清空dep、src等自身特点(清空之后isLive办法会回来false,表明使命现已履行结束),一起经过postFire办法履行该使命下的其他依靠使命,完结使命的传播履行。

postFire办法因为和tryFire办法相关比较亲近,这儿放在一起阐明:

final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
        if (a != null && a.stack != null) {
            if (mode < 0 || a.result == null)
                a.cleanStack();
            else
                a.postComplete();
        }
        if (result != null && stack != null) {
            if (mode < 0)
                return this;
            else
                postComplete();
        }
        return null;
}

这儿简略概括一下履行原理,假如是嵌套形式,则整理栈内无效使命,并回来目标自身(能够以为什么都没做);否则经过postComplete办法履行栈内依靠此使命的其他使命项

postComplete

当一个CompletionStage履行完结之后,会触发依靠它的其他CompletionStage的履行,这些Stage的履行又会触发新一批的Stage履行,这便是使命的次序编列

假如说uniApply是根底功能,是担任线程安全且恪守依靠次序地履行一个函数,那么postComplete便是中心逻辑,担任当一个使命履行结束后触发依靠该使命的其他使命项,先来看源码:

final void postComplete() {
    CompletableFuture<?> f = this; Completion h;
    // 1
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        // 2
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            // 3
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

在源码上符号了三个方位,分别代表三层结构,首要是榜首层while循环,只需当时目标栈中还有流程节点,那么就循环履行内部逻辑。

第二层,因为continue的存在,和榜首层结合起来看便是一个批量压栈的操作,将全部需求触发的依靠树按次序压入当时目标栈中。

第三层,经过tryFire按次序触发栈中全部的依靠使命。上节咱们能够看到tryFire函数内依据mode的不同会触发不同的逻辑,这儿mode指定为NESTED便是为了避免循环调用postComplete

履行函数

几个中心函数介绍完了,接下来咱们回到最外层,来看看使命是怎样履行的,首要咱们以thenApply为例剖析中心履行函数

supplyAsync(实践调用为asyncSupplyStage)

该办法用于提交一个使命到线程池中履行,并将该使命打包为一个CompletableFuture节点

static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<U> d = new CompletableFuture<U>();
    e.execute(new AsyncSupply<U>(d, f));
    return d;
}

其间AsyncSupply完结了Runnable接口,所以了解为一种特殊的使命即可。这种使命在履行完结后会调用completeValue将函数履行的成果设置当时目标中。

所以全体逻辑为,首要创立一个cf目标,并立即将使命添加到线程池履行,在履行结束后会将使命履行的成果保存到所创立的cf目标中。

complete

public boolean complete(T value) {
    boolean triggered = completeValue(value);
	postComplete();
	return triggered;
}

该办法直接调用completeValue办法设置值,设置完值之后调用postComplete办法来依次履行后续使命。当调用该办法时,会完结使命的依靠扩散履行

thenApply(实践调用为uniApplyStage)

private <V> CompletableFuture<V> uniApplyStage(
    	Executor e, Function<? super T,? extends V> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<V> d =  new CompletableFuture<V>();
    if (e != null || !d.uniApply(this, f, null)) {
        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}

结合上节剖析的中心函数,咱们很简单能够剖析该函数的流程:履行function函数,假如条件不满意则履行失败,会生成一个流程节点并压入栈,一起再经过tryFire再测验履行一次,假如条件仍然不满意,那么只能等候所依靠的使命履行完结后经过postComplete触发履行。

get

public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}

办法中心在于waitingGet,内部运用了ForkJoinPool.managedBlock来堵塞线程直到履行结束

流程剖析

在函数剖析中,咱们实践现已阐明了使命依靠履行的基本原理,这儿为了更为具体地阐明,咱们以一个简略的比方来剖析。

首要咱们抛开全部杂乱的因素,以最基本的同步串行代码来讲,咱们现在有这样一个目标:

    CompletableFuture<String> A = new CompletableFuture<>();

任务编排:CompletableFuture从入门到精通

然后咱们这时候给其加上了使命编列,增加了一个thenApply依靠

    AtomicInteger seq = new AtomicInteger(0);
    Function<String, String> func = s -> s + " | " + seq.incrementAndGet();
    CompletableFuture<String> a = new CompletableFuture<>();
    CompletableFuture<String> b = a.thenApply(func);

任务编排:CompletableFuture从入门到精通

所以咱们就有了这样一个结构,A的stack中压入了一个Completion节点,该节点的源使命指向A自身,而依靠使命指向了B,表明B使命依靠A使命的履行。

接下来咱们再加一条依靠

    AtomicInteger seq = new AtomicInteger(0);
    Function<String, String> func = s -> s + " | " + seq.incrementAndGet();
    CompletableFuture<String> a = new CompletableFuture<>();
    CompletableFuture<String> b = a.thenApply(func);
	  CompletableFuture<String> c = a.thenApply(func);

任务编排:CompletableFuture从入门到精通

咱们会发现两个特色:

  1. 和栈的性质相同,越晚添加的编列逻辑越早被履行

  2. 根据同一个目标衍生出来的流程节点的源使命是一致的

以此类推,thenXXX的其他逻辑也是相似的原理,当a调用complete函数时(无论是同步还是异步),都会依次触发A使命的stack下挂接的其他依靠使命。而只需a没有调用complete函数,那么thenApply中挂接的依靠使命无论怎样都无法履行(因为a目标的result特点为空)

注意事项

避免主使命和子使命向同一个线程池中请求线程,因为存在依靠联系,当经过join来获取子使命的值时,一旦子使命因为线程队列已满进入堵塞队列,那么将会形成死锁