本篇博文主要介绍线程池如何接收并处理任务,在此之前你需要了解下面两个知识点:
①ThreadPoolExecutor创建过程以及各参数分析;②线程池5种状态的表示方法以及原理;了解上面两个知识点才能更好的理解本篇文章。
线程池处理任务的逻辑决定于当前线程数(workerCount)的大小与核心线程数(corePoolSize)、最大线程数(maximumPoolSize)大小的关系。为了便于理解,先把执行逻辑画出来: 关于线程池如何接收并处理任务的所有逻辑都在上面那个图,我们后面分析也是根据图的逻辑来的,所以可以先把图理解,后面的源码分析才不会头晕目眩。
ThreadPoolExecutor只提供一个接收Runnable任务的方法executor(Runnable)。核心逻辑也在该方法内,所以有必要对该方法的源码进一步分析。 线程池通过execute(Runnable)方法接收放入线程池的任务。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }处理的过程分为五个步骤:
如果提交的任务为null,则抛出NPE异常;如果运行的线程数小于corePoolSize,则新创建一个线程并执行该任务;这个过程由addWorker(command,true)方法完成;如果线程池还在运行且等待队列没有满,则把任务放入等待队列;如果线程池还在运行但队列已经放满,则尝试创建新的线程并执行该任务;如果线程池没有运行,则直接执行拒绝策略;从以上5个的简单的步骤可知,除了通过workerCountOf©方法获取当前工作线程以及isRunning©方法判断线程池是否处于RUNNING状态外,最关键的方法是addWorker(Runnable,boolean)和reject(Runnable)。
addWorker(Runnable,boolean)被调用三次,而且每次调用时的传参不同。下面首先分析下addWorker(Runnable,boolean)方法的作用:根据线程池的当前状态以及corePoolSize和maximumPoolSize的边界值判断是否需要新增worker。Worker类是ThreadPoolExecutor类的一个内部私有类,看一下它的构造方法:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }表明会利用线程工厂新建一个线程,并且把该线程和传进来的Runnable任务封装在一起。
为了进一步研究线程池是如何接收并执行任务的,我们进一步查看addWorker()方法。下面是对该方法的简要分析:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // 这里用了一个自循环结构,目的是保证处理过程中状态一致 int c = ctl.get(); // 获取ctl的值,ctl包含workerCount和runState int rs = runStateOf(c); // 获取线程池的当前状态 /** * 如果线程池不处于RUNNING状态且线程池状态处于STOP/TIDYING/TERMINATED或者有任务传递进来 * 或者等待队列为空,则返回fasle **/ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 再来一个自循环 for (;;) { // 获取当前池内的工作线程数 int wc = workerCountOf(c); // 如果工作线程数大于CAPACITY,或者大于最大线程数,则返回fasle if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 利用CAS原理给ctl值增加1,代表工作线程数+1 if (compareAndIncrementWorkerCount(c)) break retry; // break 返回retry标签,结束自循环 // 如果CAS增加失败,表明c的值被修改过,需要通过自循环重试 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; // 标记新增线程是否开启 boolean workerAdded = false; // 标记新增worker是否假如列表 Worker w = null; try { w = new Worker(firstTask); // 构造一个新的worker对象 final Thread t = w.thread; // 获取该worker对象的thread if (t != null) { // 线程不为空,则加可重入锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 加锁之后的线程池状态 // 线程池状态为RUNNING或者 状态为SHUTDOWN且任务为null的情况下 新增的woker对象才可以加入workers列表 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; // 标记设置true,表明线程可以开启 } } finally { mainLock.unlock(); // 解锁 } // 新增worker加入workers列表,对应的新增线程可以运行 if (workerAdded) { t.start(); workerStarted = true; // 线程启动标记为为true } } } finally { // 如果线程没有启动,那么新增的worker需要从workers列表中移除 if (! workerStarted) addWorkerFailed(w); } return workerStarted; // 返回线程是否启动的标示 }看到这么长的方法可能有点懵,我们精简以后的流程图如下: 可以对照着addWorker(Runnable,boolean)源码解析和流程图,再回顾一下executor(Runnable)方法的流程图。
下面分析拒绝策略方法reject(Runnable)。 线程池拒绝策略在构造的时候由用户根据实际需要选择,共有四种拒绝策略。
该策略直接抛出运行时异常RejectedExecutionException(String)。
如果本线程池没有shutdown,那么将会直接运行Runnable任务;如果本线程池状态不为RUNNING,那么该Runnable任务直接抛弃。
如何线程池没有关闭,那么抛弃等待队列中最老的任务,并且执行Runnable任务;如果线程池已经关闭,那么Runnable任务直接抛弃。
该策略不执行任何动作,意味着任务直接抛弃。
以上四种抛弃策略实现RejectedExecutionHandler接口以及接口中的rejectedExecutor(Runnable,ThreadPoolSize)方法,且位于ThreadPoolExecutor类内部,根据实际情况选择调用。
本篇博文介绍的内容是线程池核心内容,它涉及到线程池如何处理一个Runnable任务,如何根据线程池的状态以及当前线程数(workerCount)判断是否需要新增一个worker,还是直接放入等待队列,还是执行拒绝策略。