面试|ThreadPoolExecutor处理Runnable任务过程源码分析

it2025-03-05  29

本篇博文主要介绍线程池如何接收并处理任务,在此之前你需要了解下面两个知识点:

①ThreadPoolExecutor创建过程以及各参数分析;②线程池5种状态的表示方法以及原理;

了解上面两个知识点才能更好的理解本篇文章。

0.引言

线程池处理任务的逻辑决定于当前线程数(workerCount)的大小与核心线程数(corePoolSize)、最大线程数(maximumPoolSize)大小的关系。为了便于理解,先把执行逻辑画出来: 关于线程池如何接收并处理任务的所有逻辑都在上面那个图,我们后面分析也是根据图的逻辑来的,所以可以先把图理解,后面的源码分析才不会头晕目眩。

1.execute(Runnable)

ThreadPoolExecutor只提供一个接收Runnable任务的方法executor(Runnable)。核心逻辑也在该方法内,所以有必要对该方法的源码进一步分析。 线程池通过execute(Runnable)方法接收放入线程池的任务。

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }

处理的过程分为五个步骤:

如果提交的任务为null,则抛出NPE异常;如果运行的线程数小于corePoolSize,则新创建一个线程并执行该任务;这个过程由addWorker(command,true)方法完成;如果线程池还在运行且等待队列没有满,则把任务放入等待队列;如果线程池还在运行但队列已经放满,则尝试创建新的线程并执行该任务;如果线程池没有运行,则直接执行拒绝策略;

从以上5个的简单的步骤可知,除了通过workerCountOf©方法获取当前工作线程以及isRunning©方法判断线程池是否处于RUNNING状态外,最关键的方法是addWorker(Runnable,boolean)和reject(Runnable)。

2.addWorker(Runnable,boolean)

addWorker(Runnable,boolean)被调用三次,而且每次调用时的传参不同。下面首先分析下addWorker(Runnable,boolean)方法的作用:根据线程池的当前状态以及corePoolSize和maximumPoolSize的边界值判断是否需要新增worker。Worker类是ThreadPoolExecutor类的一个内部私有类,看一下它的构造方法:

Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }

表明会利用线程工厂新建一个线程,并且把该线程和传进来的Runnable任务封装在一起。

为了进一步研究线程池是如何接收并执行任务的,我们进一步查看addWorker()方法。下面是对该方法的简要分析:

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // 这里用了一个自循环结构,目的是保证处理过程中状态一致 int c = ctl.get(); // 获取ctl的值,ctl包含workerCount和runState int rs = runStateOf(c); // 获取线程池的当前状态 /** * 如果线程池不处于RUNNING状态且线程池状态处于STOP/TIDYING/TERMINATED或者有任务传递进来 * 或者等待队列为空,则返回fasle **/ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 再来一个自循环 for (;;) { // 获取当前池内的工作线程数 int wc = workerCountOf(c); // 如果工作线程数大于CAPACITY,或者大于最大线程数,则返回fasle if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 利用CAS原理给ctl值增加1,代表工作线程数+1 if (compareAndIncrementWorkerCount(c)) break retry; // break 返回retry标签,结束自循环 // 如果CAS增加失败,表明c的值被修改过,需要通过自循环重试 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; // 标记新增线程是否开启 boolean workerAdded = false; // 标记新增worker是否假如列表 Worker w = null; try { w = new Worker(firstTask); // 构造一个新的worker对象 final Thread t = w.thread; // 获取该worker对象的thread if (t != null) { // 线程不为空,则加可重入锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 加锁之后的线程池状态 // 线程池状态为RUNNING或者 状态为SHUTDOWN且任务为null的情况下 新增的woker对象才可以加入workers列表 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; // 标记设置true,表明线程可以开启 } } finally { mainLock.unlock(); // 解锁 } // 新增worker加入workers列表,对应的新增线程可以运行 if (workerAdded) { t.start(); workerStarted = true; // 线程启动标记为为true } } } finally { // 如果线程没有启动,那么新增的worker需要从workers列表中移除 if (! workerStarted) addWorkerFailed(w); } return workerStarted; // 返回线程是否启动的标示 }

看到这么长的方法可能有点懵,我们精简以后的流程图如下: 可以对照着addWorker(Runnable,boolean)源码解析和流程图,再回顾一下executor(Runnable)方法的流程图。

3.reject(Runnable)

下面分析拒绝策略方法reject(Runnable)。 线程池拒绝策略在构造的时候由用户根据实际需要选择,共有四种拒绝策略。

3.1 AbortPolicy
public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }

该策略直接抛出运行时异常RejectedExecutionException(String)。

3.2 CallerRunPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }

如果本线程池没有shutdown,那么将会直接运行Runnable任务;如果本线程池状态不为RUNNING,那么该Runnable任务直接抛弃。

3.3 DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }

如何线程池没有关闭,那么抛弃等待队列中最老的任务,并且执行Runnable任务;如果线程池已经关闭,那么Runnable任务直接抛弃。

3.4 DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }

该策略不执行任何动作,意味着任务直接抛弃。

以上四种抛弃策略实现RejectedExecutionHandler接口以及接口中的rejectedExecutor(Runnable,ThreadPoolSize)方法,且位于ThreadPoolExecutor类内部,根据实际情况选择调用。

4.总结

本篇博文介绍的内容是线程池核心内容,它涉及到线程池如何处理一个Runnable任务,如何根据线程池的状态以及当前线程数(workerCount)判断是否需要新增一个worker,还是直接放入等待队列,还是执行拒绝策略。

最新回复(0)