异步编列
在事务开发的过程中,咱们为了下降接口耗时,经常会用到线程池,书写多线程数据获取、同步堵塞获取成果的事务逻辑。
常见的使用方法如下:
Future
@Slf4j
@SpringBootTest
public class OtherTest {
public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100));
public static void main(String[] args) {
Future<Integer> submit1 = executor.submit(() -> {
// 事务耗时逻辑1
return 1;
});
Future<Integer> submit2 = executor.submit(() -> {
// 事务耗时逻辑2
return 2;
});
Future<Integer> submit3 = executor.submit(() -> {
// 事务耗时逻辑3
return 3;
});
try {
Integer integer1 = submit1.get();
Integer integer2 = submit2.get();
Integer integer3 = submit3.get();
System.out.println(integer1);
System.out.println(integer2);
System.out.println(integer3);
} catch (Exception e) {
e.printStackTrace();
}
}
}
假定一个接口涉及到3个事务逻辑,如下:
- 事务逻辑1耗时: 50ms
- 事务逻辑2耗时: 30ms
- 事务逻辑3耗时: 70ms
那么假如是传统的串行调用,接口总耗时:150ms
但假如是上面的利用线程池的方法进行调用,那么该接口耗时取决于耗时最长的那个事务逻辑,即该接口耗时为: 70ms
能够看到,接口耗时是有明显下降的~
CompletableFuture
当然,上面尽管对接口进行异步编列后,接口耗时有着下降,但是假如说咱们的耗时事务逻辑有着十几二十个?且事务逻辑之间存在依靠联系?那么咱们怎么办?
很显然,上面的Future
就不能满足咱们的需求了,所以从JDK8开端,JDK供给了CompletableFuture
东西类,为咱们异步编列供给了很大的便利~
@Slf4j
@SpringBootTest
public class OtherTest {
public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100));
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
// 事务耗时逻辑1
return 1;
}, executor);
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
// 事务耗时逻辑2
return 2;
}, executor);
CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {
// 事务耗时逻辑3
return 3;
}, executor);
try {
// 等候使命全部履行结束
CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3).get();
System.out.println(completableFuture1.get());
System.out.println(completableFuture2.get());
System.out.println(completableFuture3.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
由于案例比较简单,无法突出CompletableFuture
编列能力比较于Future
的优势地点,这个在今后的文章里专门会为咱们讲解,这不是本文的要点。
超时中止
在上面的案例中,细心的小伙伴能够发现,不管是CompletableFuture
仍是Future
,我都是进行堵塞等候使命结束。
这,其实是一个十分风险的行为,假如下游rpc
接口呈现波动,那么接口耗时会明显提升,而咱们却进行堵塞获取,线程会被一向堵塞无法及时开释,那么随着不断的请求进来,线程池线程、行列很快都会被打满,新使命都会被拒绝掉,然后影响用户体会,然后影响你的工资,然后影响你的作业。
所以,为了杜绝这种情况呈现,咱们在获取使命成果的时分需求设置等候时刻~
Future
和CompletableFuture
的get
方法都支撑传入等候时刻~
Future超时中止机制
Future供给了get
方法来供咱们堵塞获取使命成果,也支撑传入超时时刻,下面来了解下源码
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
// 参数校验
if (unit == null)
throw new NullPointerException();
int s = state;
// 堵塞等候,假如超越超时时刻使命还未完结,那么抛出超时反常
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
堵塞等候,timed
为true
代表存在超时时刻
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
long startTime = 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
// 使命状况 > COMPLETING阐明现已履行结束
if (s > COMPLETING) {
// 当时线程不必等候了,将等候节点里的Thread设置为null
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// COMPLETING是使命履行结束到真实将使命设置为完结态的一个中间状况
// 当使命的处于COMPLETING时,阐明使命现已履行完了,但此刻cpu时刻不行没有持续履行
// 此刻需求yield一下,让其他线程履行,然后将使命正确设置为完结状况
Thread.yield();
else if (Thread.interrupted()) {
// 假如当时线程被打断了,则把当时线程从等候该使命完结的堵塞线程链表中删除
removeWaiter(q);
// 抛出打断反常
throw new InterruptedException();
}
else if (q == null) {
// 假如是超时等候,且等候时刻<=0,则直接返回当时使命状况
if (timed && nanos <= 0L)
return s;
// 初始化一个等候当时使命履行完的节点,内部包括
q = new WaitNode();
}
else if (!queued)
// 将WaitNode排队到线程等候链表中
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
// 堵塞等候,存在超时时刻
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
// 堵塞等候,没有超时时刻
LockSupport.park(this);
}
}
上面源码注释现已比较完善了,但咱们仍是要总结一下
- 使命
COMPLETING
状况,是使命履行结束到真实将使命设置为完结态的一个中间状况(见FutureTask的run方法
) get
方法不管是否存在超时时刻,底层都是经过LockSupport
的park、unpark
方法来达到堵塞的目的- 对于每个使命,其内部会维护一个等候当时使命完结的线程链表
waiters
CompletableFuture超时中止机制
而从JDK 9
开端,CompletableFuture
也供给了 orTimeout
、completeTimeout
方法,来进行异步超时控制。
CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3).orTimeout(1, TimeUnit.SECONDS).get();
依据上面代码,咱们能够了解到,会等候completableFuture1, completableFuture2, completableFuture3
三个使命履行1秒钟
假如超越1秒,则会抛出java.util.concurrent.TimeoutException
源码如下:
public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
if (unit == null)
throw new NullPointerException();
if (result == null)
whenComplete(new Canceller(Delayer.delay(new Timeout(this),
timeout, unit)));
return this;
}
public CompletableFuture<T> completeOnTimeout(T value, long timeout,
TimeUnit unit) {
if (unit == null)
throw new NullPointerException();
if (result == null)
whenComplete(new Canceller(Delayer.delay(
new DelayedCompleter<T>(this, value),
timeout, unit)));
return this;
}
static final class Delayer {
static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
// 延时使命
return delayer.schedule(command, delay, unit);
}
static final ScheduledThreadPoolExecutor delayer;
static {
// 单线程
(delayer = new ScheduledThreadPoolExecutor(
1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
}
static final class Timeout implements Runnable {
final CompletableFuture<?> f;
Timeout(CompletableFuture<?> f) { this.f = f; }
public void run() {
// 假如CompletableFuture不为null,且守时使命没有被撤销
if (f != null && !f.isDone())
// 设置超时反常
f.completeExceptionally(new TimeoutException());
}
}
static final class DelayedCompleter<U> implements Runnable {
final CompletableFuture<U> f;
final U u;
DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; }
public void run() {
if (f != null)
// 将使命成果设置为咱们给定的value
f.complete(u);
}
}
static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> f;
Canceller(Future<?> f) { this.f = f; }
public void accept(Object ignore, Throwable ex) {
// 假如没有反常,且超时使命存在且没有被撤销,那么则撤销超时使命
// 由于此刻阐明,CompletableFuture的使命在超时时刻内完结了,则不需求在监控超时
if (ex == null && f != null && !f.isDone())
f.cancel(false);
}
}
经过对上面源码的了解,咱们能够知道
CompletableFuture
的orTimeout
、completeOnTimeout
底层其实都是经过ScheduledThreadPoolExecutor
来完结的
当咱们对一个CompletableFuture
设置了超时时刻后,底层其实会经过ScheduledThreadPoolExecutor
发动一个延时使命,延时时刻便是咱们设置的超时时刻,此刻有分为两种情况
- 使命在超时时刻之内完结,那么在使命完结之后,会去经过
cancel(false)
撤销延时使命 - 使命履行时刻超越设定的超时时刻,则为该使命设置
TimeoutException
,让主线程感知~
Future cancel原理
另外,咱们还能看到,
CompletableFuture
的延时使命并没有进行try-catch
,此处能够了解下->ScheduledThreadPoolExecutor有坑嗷~
而orTimeout
和completeOnTimeout
的区别就在于
- 假如是
orTimeout
,那么超时后会抛出超时反常 - 假如是
completeOnTimeout
,不会抛出反常,则是将使命成果设置为咱们传入的value
扩展知识点
在上面了解CompletableFuture
的orTimeout
、completeOnTimeout
时,咱们知道了其底层是经过ScheduledThreadPoolExecutor
来完结的,但经过源码发现,ScheduledThreadPoolExecutor
只要一个线程去处理
static final ScheduledThreadPoolExecutor delayer;
static {
(delayer = new ScheduledThreadPoolExecutor(
1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
那么,当呈现很多设置了超时时刻且时刻个不一致的CompletableFuture
时,由所以单线程处理,或许咱们给使命设置的超时时刻是1000ms
,但实际或许由于行列排队,真实处理超时的超时时刻会 > 1000ms
也便是说orTimeout
、completeOnTimeout
设置的超时时刻并不会那么精确
结束
我是 皮皮虾 ,会在今后的日子里跟咱们一同学习,一同前进!
觉得文章不错的话,能够在 关注我,或者是我的公众号——JavaCodes。