本文正在参加「金石方案」

概述

最近写一个批量事务接口,为了下降响应时刻,遂使用了自定义线程池 + CompletableFuture异步履行使命,堵塞获取成果,

但在测验过程中,发现一个很奇怪的问题

下面造一个测验案例来复现问题,也方便带咱们了解~


问题案例

@SpringBootTest
public class OtherTest {
​
  @Resource
  private ThreadPoolExecutor threadPoolExecutor;
​
  @Test
  public void test() {
   List<Integer> list = new ArrayList<>(100);
   for (int i = 0; i < 100; i++) {
     list.add(i);
    }
​
   List<CompletableFuture<Integer>> futureList = list.stream().map(num -> {
     return CompletableFuture.supplyAsync(() -> {
      // 模仿事务
      return num + 1;
     }, threadPoolExecutor);
    }).collect(Collectors.toList());
​
   CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
​
   for (CompletableFuture<Integer> completableFuture : futureList) {
     try {
      System.out.println(completableFuture.get());
     } catch (Exception e) {
      e.printStackTrace();
     }
    }
  }
​
}

履行案例后,咱们会发现主线程一向被堵塞,下面的打印底子没有履行!这 便是我遇到的问题

【坑】: 谁教你的自定义拒绝策略不抛异常?


为啥被堵塞了?

咱们直接Dump Threads,能够看到主线程处于WAITING状况,经过仓库盯梢,咱们能够看到是被LockSupport.park

【坑】: 谁教你的自定义拒绝策略不抛异常?

由于咱们终究会CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();堵塞获取成果

依据join办法一路盯梢,终究到达java.util.concurrent.CompletableFuture.Signaller#block,也契合上图的仓库信息

【坑】: 谁教你的自定义拒绝策略不抛异常?

ok,问题算是找到了,但是为啥会一向堵塞呢?

依据咱们之前的运行成果,能够发现,打印出了使命被回绝的error日志,难不成是由于被回绝的原因???

【坑】: 谁教你的自定义拒绝策略不抛异常?

想到这里,我直接调大了线程和行列容量

这里是之前的线程池装备参数

@Slf4j
@Configuration
public class ThreadPoolConfig {
​
  @Bean(name = "threadPoolExecutor")
  public ThreadPoolExecutor prepareThreadPoolExecutor() {
   return new ThreadPoolExecutor(2, 2, 1, TimeUnit.SECONDS,
      new LinkedBlockingQueue<>(5), new RejectedExecutionHandler() {
     @Override
     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      log.error("ThreadPoolExecutor 使命被回绝,请及时查看!!!");
     }
    });
  }
​
}

下面是调整之后的线程池参数,增加了行列容量和最大线程数

@Slf4j
@Configuration
public class ThreadPoolConfig {
​
  @Bean(name = "threadPoolExecutor")
  public ThreadPoolExecutor prepareThreadPoolExecutor() {
   return new ThreadPoolExecutor(2, 10, 1, TimeUnit.SECONDS,
      new LinkedBlockingQueue<>(500), new RejectedExecutionHandler() {
     @Override
     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      log.error("ThreadPoolExecutor 使命被回绝,请及时查看!!!");
     }
    });
  }
​
}

调整之后再次履行使命,能够看到履行成功

【坑】: 谁教你的自定义拒绝策略不抛异常?


看源码,找问题~

经过上面咱们已经了解到,是由于使命被回绝,导致主线程LockSupport.park一向处于WAITING状况,无法继续往下履行。

已然被park了,那肯定要unpark才行

那,什么时候会unpark呢?

当然是使命都履行完之后啦,才回去unpark主线程

【坑】: 谁教你的自定义拒绝策略不抛异常?

但之前咱们经过日志看到,有些使命被回绝了,使命被回绝了就不会被履行呀,那堵塞获取该使命的线程就不会unpark,导致之后的使命也不履行,所以一向不能去unpark主线程,所以主线程一向被堵塞住。

下面再看看线程池的源码:

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
​
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
   }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
   }
  else if (!addWorker(command, false))
    reject(command);
}

线程池的八股文相信咱们都知道,看上面源码咱们知道,当线程数已经不小于中心线程数且行列容量也满的情况下,会再去测验addWorker

// 此刻core为false
private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  for (int c = ctl.get();;) {
    // Check if queue empty only if necessary.
    if (runStateAtLeast(c, SHUTDOWN)
      && (runStateAtLeast(c, STOP)
        || firstTask != null
        || workQueue.isEmpty()))
      return false;
​
    for (;;) {
      if (workerCountOf(c)
        >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
        // 当时线程数超过中心线程数,直接回来false去触发回绝策略
        return false;
      if (compareAndIncrementWorkerCount(c))
        break retry;
      c = ctl.get(); // Re-read ctl
      if (runStateAtLeast(c, SHUTDOWN))
        continue retry;
      // else CAS failed due to workerCount change; retry inner loop
     }
   }
  // ...省掉
  return workerStarted;
}

addWorker中当时线程数超过中心线程数,直接回来false去触发回绝策略,且不会把使命添加到行列中

【坑】: 谁教你的自定义拒绝策略不抛异常?


CompletableFuture不可,那Future呢?

可能有小伙伴会说已然CompletableFuture不可,那我用Future。

其实原理上都是一样的,Future也不可

@SpringBootTest
public class OtherTest {
​
  @Resource
  private ThreadPoolExecutor threadPoolExecutor;
​
  @Test
  public void test() {
   List<Integer> list = new ArrayList<>(100);
   for (int i = 0; i < 100; i++) {
     list.add(i);
    }
​
   List<Future<Integer>> futureList = list.stream().map(num -> {
     return threadPoolExecutor.submit(() -> {
      return num + 1;
     });
    }).collect(Collectors.toList());
​
   for (Future<Integer> future : futureList) {
     try {
      System.out.println(future.get());
     } catch (Exception e) {
      e.printStackTrace();
     }
    }
​
  }
​
}

使用Future同样会导致主线程park,处于WAITING状况

【坑】: 谁教你的自定义拒绝策略不抛异常?

下面是FutureTask的get源码,会经过LockSupport.park(this);进行堵塞获取

具体解说见 -> 异步超时中断,知其然,也要知其所以然~

public V get() throws InterruptedException, ExecutionException {
  int s = state;
  if (s <= COMPLETING)
    s = awaitDone(false, 0L);
  return report(s);
}
​
private int awaitDone(boolean timed, long nanos)
 throws InterruptedException {
 long startTime = 0L;
 WaitNode q = null;
 boolean queued = false;
 for (;;) {
  int s = state;
  if (s > COMPLETING) {
   if (q != null)
    q.thread = null;
   return s;
   }
  else if (s == COMPLETING)
   Thread.yield();
  else if (Thread.interrupted()) {
   removeWaiter(q);
   throw new InterruptedException();
   }
  else if (q == null) {
   if (timed && nanos <= 0L)
    return s;
   q = new WaitNode();
   }
  else if (!queued)
   queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
  else if (timed) {
   final long parkNanos;
   if (startTime == 0L) {
    startTime = System.nanoTime();
    if (startTime == 0L)
     startTime = 1L;
    parkNanos = nanos;
    } else {
    long elapsed = System.nanoTime() - startTime;
    if (elapsed >= nanos) {
     removeWaiter(q);
     return state;
     }
    parkNanos = nanos - elapsed;
    }
   if (state < COMPLETING)
    LockSupport.parkNanos(this, parkNanos);
   }
  else
   LockSupport.park(this);
  }
}

而,unpark机遇便是在使命正常履行完后的finishCompletion,该办法会去唤醒一切堵塞获取该使命的线程

但便是使命被回绝了,所以无法去唤醒堵塞获取该使命的线程,造就了惨案~

CompletableFuture其实也是这个原因

本质上都是由于使命被回绝无法履行,导致无法去唤醒线程

public void run() {
  if (state != NEW ||
    !RUNNER.compareAndSet(this, null, Thread.currentThread()))
    return;
  try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
      V result;
      boolean ran;
      try {
        result = c.call();
        ran = true;
       } catch (Throwable ex) {
        result = null;
        ran = false;
        setException(ex);
       }
      if (ran)
        set(result);
     }
   } finally {
   // ...省掉
   }
}
​
​
protected void set(V v) {
 if (STATE.compareAndSet(this, NEW, COMPLETING)) {
  outcome = v;
  STATE.setRelease(this, NORMAL); // final state
  // 唤醒堵塞获取该使命的线程
  finishCompletion();
  }
}

解决方案

已然了解到坑的原因所在,那么找出对应的解决方案也不难

  1. 回绝策略记得抛出反常,防止主线程一向等待,比方DiscardPolicyDiscardOldestPolicy都会存在必定的问题,没有对外抛出反常。
  2. 设置超时时刻,及时开释资源,比方CompletableFuture在jdk9后,能够经过orTimeout设置超时时刻,Future的话,他的get办法也支撑设置等待时刻。

相关文章

异步超时中断,知其然,也要知其所以然~

揭秘Future cancel迷惑性boolean入参~

ScheduledThreadPoolExecutor有坑嗷~