持续创造,加快成长!这是我参与「日新方案 10 月更文挑战」的第9天,点击查看活动概况

前语

在前面的文章自己着手写乞丐版线程池中,咱们写了一个十分简略的线程池完结,这个仅仅一个十分简略的完结,在本篇文章傍边咱们即将完结一个和JDK内部完结的线程池十分相似的线程池。

JDK线程池一瞥

咱们首先看一个JDK给咱们供给的线程池ThreadPoolExecutor的结构函数的参数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

参数解说:

  • corePoolSize:这个参数你能够了解为线程池傍边至少需求 corePoolSize 个线程,初始时线程池傍边线程的个数为0,当线程池傍边线程的个数小于 corePoolSize 每次提交一个使命都会创立一个线程,而且先履行这个提交的使命,然后再去使命行列里边去获取新的使命,然后再履行。
  • maximumPoolSize:这个参数指的是线程池傍边能够答应的最大的线程的数目,当使命行列满了之后假如这个时分有新的使命想要参加行列傍边,当发现行列满了之后就创立新的线程去履行使命,可是需求满足最大的线程的个数不能够超越 maximumPoolSize 。
  • keepAliveTime 和 unit:这个主要是用于时刻的表明,当行列傍边多长时刻没有数据的时分线程自己退出,前面谈到了线程池傍边使命过多的时分会超越 corePoolSize ,当线程池闲下来的时分这些剩余的线程就能够退出了。
  • workQueue:这个便是用于保存使命的阻塞行列。
  • threadFactory:这个参数倒不是很重要,线程工厂。
  • handler:这个表明回绝战略,JDK给咱们供给了四种战略:
    • AbortPolicy:抛出异常。
    • DiscardPolicy:抛弃这个使命。
    • CallerRunPolicy:提交使命的线程履行。
    • DiscardOldestPolicy:抛弃等候时刻最长的使命。

假如上面的参数你不能够了解,能够先阅读这篇文章自己着手写乞丐版线程池。依据上面谈到的参数,线程池傍边提交使命的流程大致如下图所示:

自己动手写线程池——向JDK线程池进发

自己着手完结线程池

依据前面的参数分析咱们自己完结的线程池需求完结一下功用:

  • 能够提交Runnable的使命和Callable的使命。
  • 线程池能够自己完结动态的扩容和所容,动态调整线程池傍边线程的数目,当使命多的时分能够增加线程的数目,当使命少的时分多出来的线程能够自动退出。
  • 有自己的回绝战略,当使命行列满了,线程数也达到最大的时分,需求回绝提交的使命。

线程池参数介绍

  private AtomicInteger ct = new AtomicInteger(0); // 当时在履行使命的线程个数
  private int corePoolSize;
  private int maximumPoolSize;
  private long keepAliveTime;
  private TimeUnit unit;
  private BlockingQueue<Runnable> taskQueue;
  private RejectPolicy policy;
  private ArrayList<Worker> workers = new ArrayList<>();
  private volatile boolean isStopped;
  private boolean useTimed;

参数解说如下:

  • ct:表明当时线程池傍边线程的个数。

  • corePoolSize:线程池傍边中心线程的个数,含义和上面谈到的JDK的线程池含义共同。

  • maximumPoolSize:线程池傍边最大的线程个数,含义和上面谈到的JDK的线程池含义共同。

  • keepAliveTime 和 unit:和JDK线程池的参数含义共同。

  • taskQueue:使命行列,用不保存提交的使命。

  • policy:回绝战略,主要有一下四种战略:

public enum RejectPolicy {
  ABORT,
  CALLER_RUN,
  DISCARD_OLDEST,
  DISCARD
}
  • workers:用于保存作业线程。
  • isStopped:线程池是否被封闭了。
  • useTimed:主要是用于表明是否运用上面的 keepAliveTime 和 unit,假如运用便是在一定的时刻内,假如没有从使命行列傍边获取到使命,线程就从线程池退出,可是需求确保线程池傍边最小的线程个数不小于 corePoolSize 。

完结Runnable

  // 下面这个办法是向线程池提交使命
  public void execute(Runnable runnable) throws InterruptedException {
    checkPoolState();
    if (addWorker(runnable, false)  // 假如能够参加新的线程履行使命 参加成功就直接回来
            || !taskQueue.offer(runnable) // 假如 taskQueue.offer(runnable) 回来 false 阐明提交使命失利 使命行列现已满了
            || addWorker(runnable, true)) // 运用能够运用的最大的线程数 (maximumPoolSize) 看是否能够发生新的线程
      return;
    // 假如使命行列满了而且不能够参加新的线程 则回绝这个使命
    if (!taskQueue.offer(runnable))
      reject(runnable);
  }

在上面的代码傍边:

  • checkPoolState函数是检查线程池的状况,当线程池被停下来之后就不能够在提交使命:
  private void checkPoolState() {
    if (isStopped) {
      // 假如线程池现已停下来了,就不在向使命行列傍边提交使命了
      throw new RuntimeException("thread pool has been stopped, so quit submitting task");
    }
  }
  • addWorker函数是往线程池傍边提交使命而且发生一个线程,而且这个线程履行的第一个使命便是传递的参数。max表明线程的最大数目,max == true 的时分表明运用 maximumPoolSize 否则运用 corePoolSize,当回来值等于 true 的时分表明履行成功,否则表明履行失利。
  /**
   *
   * @param runnable 需求被履行的使命
   * @param max 是否运用 maximumPoolSize
   * @return boolean
   */
  public synchronized boolean addWorker(Runnable runnable, boolean max) {
    if (ct.get() >= corePoolSize && !max)
      return false;
    if (ct.get() >= maximumPoolSize && max)
      return false;
    Worker worker = new Worker(runnable);
    workers.add(worker);
    Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
    thread.start();
    return true;
  }

完结Callable

这个函数其实比较简略,只需求将传入的Callable目标封装成一个FutureTask目标即可,由于FutureTask完结了Callable和Runnable两个接口,然后将这个成果回来即可,得到这个目标,再调用目标的 get 办法就能够得到成果。

  public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {
    checkPoolState();
    FutureTask<V> futureTask = new FutureTask<>(task);
    execute(futureTask);
    return futureTask;
  }

回绝战略的完结

依据前面提到的各种战略的详细完结办法,详细的代码完结如下所示:

  private void reject(Runnable runnable) throws InterruptedException {
    switch (policy) {
      case ABORT:
        throw new RuntimeException("task queue is full");
      case CALLER_RUN:
        runnable.run();
      case DISCARD: // 直接抛弃这个使命
        return;
      case DISCARD_OLDEST:
        // 抛弃等候时刻最长的使命 也便是行列傍边的第一个使命
        taskQueue.poll();
        execute(runnable); // 从头履行这个使命
    }
  }

线程池封闭完结

一共两种办法完结线程池封闭:

  • 直接封闭线程池,不管使命行列傍边的使命是否被悉数履行完结。
  • 安全封闭线程池,先等候使命行列傍边一切的使命被履行完结,再封闭线程池,可是在这个进程傍边不答应继续提交使命了,这一点现已在函数 checkPoolState 傍边完结了。
  // 强制封闭线程池
  public synchronized void stop() {
    isStopped = true;
    for (Worker worker : workers) {
      worker.stopWorker();
    }
  }
  public synchronized void shutDown() {
    // 先表明封闭线程池 线程就不能再向线程池提交使命
    isStopped = true;
    // 先等候一切的使命履行完结再封闭线程池
    waitForAllTasks();
    stop();
  }
  private void waitForAllTasks() {
    // 当线程池傍边还有使命的时分 就不退出循环
    while (taskQueue.size() > 0) {
      Thread.yield();
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

作业线程的作业完结

    @Override
    public void run() {
      // 先履行传递过来的第一个使命 这里是一个小的优化 让线程直接履行第一个使命 不需求
      // 放入使命行列再取出来履行了
      firstTask.run();
      thisThread = Thread.currentThread();
      while (!isStopped) {
        try {
          // 是否运用时刻就在这里显示出来了
          Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
          if (task == null) {
            int i;
            boolean exit = true;
            // 假如当时线程数大于中心线程数 则运用 CAS 去退出 用于确保在线程安全下的退出
            // 且确保线程的个数不小于 corePoolSize 下面这段代码需求仔细分析一下
            if (ct.get() > corePoolSize) {
              do{
                i = ct.get();
                if (i <= corePoolSize) {
                  exit = false;
                  break;
                }
              }while (!ct.compareAndSet(i, i - 1));
              if (exit) {
                return;
              }
            }
          }else {
            task.run();
          }
        } catch (InterruptedException e) {
          // do nothing
        }
      }
    }

咱们现在来仔细分析一下,线程退出线程池的时分是怎么确保线程池傍边总的线程数是不小于 corePoolSize 的!首先全体的结构是运用 CAS 进行完结,详细代码为 do … while 操作,然后在 while 操作里边运用 CAS 进行测试替换,假如没有成功再次获取 ,当线程池傍边中心线程的数目小于等于 corePoolSize 的时分也需求退出循环,由于线程池傍边线程的个数不能小于 corePoolSize 。因而运用 break 跳出循环的线程是不会退出线程池的。

线程池完结的BUG

在咱们自己完结的线程池傍边当线程退出的时分,workers 傍边还保存这指向这个线程的目标,可是当线程退出的时分咱们还没有在 workers 傍边删去这个目标,因而这个线程目标不会被废物回收器搜集掉,可是咱们这个仅仅一个线程池完结的比如罢了,并不用于出产环境,仅仅为了协助咱们了解线程池的原理。

完好代码

package cscore.concurrent.java.threadpoolv2;
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPool {
  private AtomicInteger ct = new AtomicInteger(0); // 当时在履行使命的线程个数
  private int corePoolSize;
  private int maximumPoolSize;
  private long keepAliveTime;
  private TimeUnit unit;
  private BlockingQueue<Runnable> taskQueue;
  private RejectPolicy policy;
  private ArrayList<Worker> workers = new ArrayList<>();
  private volatile boolean isStopped;
  private boolean useTimed;
  public int getCt() {
    return ct.get();
  }
  public ThreadPool(int corePoolSize, int maximumPoolSize, TimeUnit unit, long keepAliveTime, RejectPolicy policy
          , int maxTasks) {
    // please add -ea to vm options to make assert keyword enable
    assert corePoolSize > 0;
    assert maximumPoolSize > 0;
    assert keepAliveTime >= 0;
    assert maxTasks > 0;
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.unit = unit;
    this.policy = policy;
    this.keepAliveTime = keepAliveTime;
    taskQueue = new ArrayBlockingQueue<Runnable>(maxTasks);
    useTimed = keepAliveTime != 0;
  }
  /**
   *
   * @param runnable 需求被履行的使命
   * @param max 是否运用 maximumPoolSize
   * @return boolean
   */
  public synchronized boolean addWorker(Runnable runnable, boolean max) {
    if (ct.get() >= corePoolSize && !max)
      return false;
    if (ct.get() >= maximumPoolSize && max)
      return false;
    Worker worker = new Worker(runnable);
    workers.add(worker);
    Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
    thread.start();
    return true;
  }
  // 下面这个办法是向线程池提交使命
  public void execute(Runnable runnable) throws InterruptedException {
    checkPoolState();
    if (addWorker(runnable, false)  // 假如能够参加新的线程履行使命 参加成功就直接回来
            || !taskQueue.offer(runnable) // 假如 taskQueue.offer(runnable) 回来 false 阐明提交使命失利 使命行列现已满了
            || addWorker(runnable, true)) // 运用能够运用的最大的线程数 (maximumPoolSize) 看是否能够发生新的线程
      return;
    // 假如使命行列满了而且不能够参加新的线程 则回绝这个使命
    if (!taskQueue.offer(runnable))
      reject(runnable);
  }
  private void reject(Runnable runnable) throws InterruptedException {
    switch (policy) {
      case ABORT:
        throw new RuntimeException("task queue is full");
      case CALLER_RUN:
        runnable.run();
      case DISCARD:
        return;
      case DISCARD_OLDEST:
        // 抛弃等候时刻最长的使命
        taskQueue.poll();
        execute(runnable);
    }
  }
  private void checkPoolState() {
    if (isStopped) {
      // 假如线程池现已停下来了,就不在向使命行列傍边提交使命了
      throw new RuntimeException("thread pool has been stopped, so quit submitting task");
    }
  }
  public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {
    checkPoolState();
    FutureTask<V> futureTask = new FutureTask<>(task);
    execute(futureTask);
    return futureTask;
  }
  // 强制封闭线程池
  public synchronized void stop() {
    isStopped = true;
    for (Worker worker : workers) {
      worker.stopWorker();
    }
  }
  public synchronized void shutDown() {
    // 先表明封闭线程池 线程就不能再向线程池提交使命
    isStopped = true;
    // 先等候一切的使命履行完结再封闭线程池
    waitForAllTasks();
    stop();
  }
  private void waitForAllTasks() {
    // 当线程池傍边还有使命的时分 就不退出循环
    while (taskQueue.size() > 0) {
      Thread.yield();
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
  class Worker implements Runnable {
    private Thread thisThread;
    private final Runnable firstTask;
    private volatile boolean isStopped;
    public Worker(Runnable firstTask) {
      this.firstTask = firstTask;
    }
    @Override
    public void run() {
      // 先履行传递过来的第一个使命 这里是一个小的优化 让线程直接履行第一个使命 不需求
      // 放入使命行列再取出来履行了
      firstTask.run();
      thisThread = Thread.currentThread();
      while (!isStopped) {
        try {
          Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
          if (task == null) {
            int i;
            boolean exit = true;
            if (ct.get() > corePoolSize) {
              do{
                i = ct.get();
                if (i <= corePoolSize) {
                  exit = false;
                  break;
                }
              }while (!ct.compareAndSet(i, i - 1));
              if (exit) {
                return;
              }
            }
          }else {
            task.run();
          }
        } catch (InterruptedException e) {
          // do nothing
        }
      }
    }
    public synchronized void stopWorker() {
      if (isStopped) {
        throw new RuntimeException("thread has been interrupted");
      }
      isStopped = true;
      thisThread.interrupt();
    }
  }
}

线程池测试

package cscore.concurrent.java.threadpoolv2;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
public class Test {
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    var pool = new ThreadPool(2, 5, TimeUnit.SECONDS, 10, RejectPolicy.ABORT, 100000);
    for (int i = 0; i < 10; i++) {
      RunnableFuture<Integer> submit = pool.submit(() -> {
        System.out.println(Thread.currentThread().getName() + " output a");
        try {
          Thread.sleep(10);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return 0;
      });
      System.out.println(submit.get());
    }
    int n = 15;
    while (n-- > 0) {
      System.out.println("Number Threads = " + pool.getCt());
      Thread.sleep(1000);
    }
    pool.shutDown();
  }
}

上面测试代码的输出成果如下所示:

ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
Number Threads = 5
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 3
Number Threads = 2
Number Threads = 2
Number Threads = 2
Number Threads = 2

从上面的代码能够看出咱们完结了正确的使命完结成果,一起线程池傍边的中心线程数从 2 变到了 5 ,当线程池傍边使命行列悉数别履行完结之后,线程的数目从头降下来了,这确实是咱们想要达到的成果。

总结

在本篇文章傍边主要给咱们介绍了怎么完结一个类似于JDK中的线程池,里边有十分多的完结细节,咱们能够仔细捋一下其中的流程,对线程池的了解将会十分有协助。

以上便是本篇文章的一切内容了,我是LeHung,咱们下期再会!!!更多精彩内容合集可访问项目:github.com/Chang-LeHun…

关注公众号:一无是处的研究僧,了解更多计算机(Java、Python、计算机系统根底、算法与数据结构)知识。