ThreadPoolExecutor JDK源码解读

it2022-05-05  255

java.util.concurrent.ThreadPoolExecutor(请特别注意代码中我的中文代码注释(是我笔记))

(想自学习编程的小伙伴请搜索圈T社区,更多行业相关资讯更有行业相关免费视频教程。完全免费哦!)

1.如下创建固定线程池后,先扔5个,隔5秒在扔11个,隔5秒再扔10个,任务执行时间假设很长30分钟,执行顺序是怎么样的?那又或者随机很长很短呢?

new ThreadPoolExecutor(5, //coresize (池核心线程数,最小运行线程数) 10, //maxsize(池允许最大线程数) 10000L, //time 这里空闲时间为10秒 TimeUnit.MILLISECONDS, //timeunit 毫秒 new ArrayBlockingQueue<Runnable>(10) //taskqueue 任务队列 );

答: 假设执行时间固定很长,扔完这26个所有线程还在忙,那场景应该是:扔进5,启动5个core,再扔进11,优先放队列(taskqueue=10),队列放不下,第11个在尝试启动线程6,在扔10个时候,这10个的前四个可以启动线程,直至max线程数为10,假设执行任务还是很长,一个线程都没有结束,第三次扔的10个里的第5个起线程也启动不了,则reject(runable)给用户user code处理 理由jdk源码:

//java.util.concurrent.ThreadPoolExecutor public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //如果小于最小(核心)线程数 if (addWorker(command, true)) //则尝试启动一个线程,并且带入这个任务作为FirstTask任务执行 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //有可运行线程数,则尝试将command任务加入队列 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //加入任务到队列后,双重检查:如果所有运行的线程死了(取反为true),接着执行从队列里移走,刚刚加入的comand任务,并且执行拒绝操作 reject(command); //拒绝任务,抛错给user的代码(非jdk代码) else if (workerCountOf(recheck) == 0) //加入任务到队列后,如果有可运行的线程为0了,在启动一个线程 addWorker(null, false); } else if (!addWorker(command, false)) //如果没有可运行的线程数,直接尝试启动一个线程,并且带入这个任务作为FirstTask任务执行 reject(command); //如果启动线程失败,则拒绝这个任务,抛错给user的代码(因为执行任务肯定也失败了) }

复制代码 2.上面三次分不同批次提交的任务都执行完了,超过了10秒(队列空闲)没有任务提交,线程数为什么(怎么会减少为5的(coresize=5)) 答案如下:

//java.util.concurrent.ThreadPoolExecutor private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? //假设当前线程数为10个 则wc>coresize=true,则使用poll超时拉取任务,否则小于或者等于coresize(5),则使用take函数一直阻塞,那如果被以外终止,导致小于coresize呢,这个问题问的好,见后面的问题4的保证机制(每次执行任务后校验最少线程数,不足则补数) workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; //在死循环的下一轮里 timed=true && timedOut=true 则减少线程数,这样反复操作,使当前运行的线程数wc=10,逐渐减少至coresize=5,再追问:是否运行线程数会少于5个呢?肯定有嘛,因为中断异常,或者用户线程异常情况,都会导致小于5个?那什么机制会保证池的运行线程数wc尽量靠近coresize=5呢? } catch (InterruptedException retry) { timedOut = false; } } }

3.问题1里面的addWorker(null,…)启动线程了,没有传递task(command),那怎么执行任务的?

else if (workerCountOf(recheck) == 0) //加入任务到队列后,如果有可运行的线程为0了,在启动一个线程 addWorker(null, false);

答:

//java.util.concurrent.ThreadPoolExecutor /* * Methods for creating, running and cleaning up after workers */ /** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * execute提交任务时候,wc(当前运行线程数)<coresize时,要新启动线程,并且新线程应该执行提交传参过来的task,也就是新线程应该创建并执行附带过来的一个任务【敲黑板:不是从任务队列取,而是直接执行,所以任务会是FIFO的观点错误】 * execute提交任务时候 wc>=coresize时,并且任务taskqueue已经满了,maxsize>coresize的话,会按需尝试启动>coresize但是小于maxsize的线程数,并且执行execute提交过来的任务【敲黑板:不是从任务队列取,而是直接执行,所以任务会是FIFO的观点错误】 * 空闲的线程创建方式通常是新建线程替换corethread(重启那5个核心线程数),或者替换脏线程,但是这里用词是“usually”是通常情况,那么也就是说也有不是重启corethread和replace dying workers,我推测就是直接new出来的 worker了 * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { ...... w = new Worker(firstTask); final Thread t = w.thread; ...... if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

因为 t.start();执行了,所以会执行run()

/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {//如果task是null,则取队列去任务执行,反复执行,这里,gettask会在队列空闲时候并且长时间没有任务丢进池(可能直接丢给queue,可能直接以firsttask丢给新加的worker线程,所以这里不能笼统说是丢给queue)的时候,死循环干死多余的非coresize线程数 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); //真正执行任务了,可能是execute(task)带过来的task,也可能是从queue里取的task } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; //有缺陷的完成任务=false,即无错误的完美完成了任务 } finally { processWorkerExit(w, completedAbruptly); } }

4.所有work执行完任务后会自己销毁吗? 我这里认为是,但是不知道是否是真的是?依据如下,上段代码里的processWorkerExit:

private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { //优雅的完成了任务,前面又把他tryTerminate()了,所以这里要校验下最少可使用的线程数 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) //等于线程数可能为0是因为 允许核心线程数超时 min = 1;//队列也空了,核心线程数可能为0(允许超时),则还是要保证一个线程在那里运行的 if (workerCountOf(c) >= min) //当前线程数达标,不用新加线程了,那么直接return即可 return; // replacement not needed } addWorker(null, false); //如果线程不够了,那就加呗,前面够的情况,就已经return了,不会到这种不够的情况了,回答了前面池尽量保证运行线程数=coresize(5)的问题 } }

5.execute(task)是借助在线程池初始化的时候的taskqueue来保证任务的先进先出先完成的吗? 不是,见问题3里的中文翻译,而且,即使假设task都是放到queue,FIFO,没有firsttask随新建线程执行的机制,一样也保证不了任务的是按顺序完成的,因为cpu时间分片,难说,谁先完成。

6.分析完这个wang作ba者dan的代码后,最后在问一个最基本的问题:假设没有任务提交的时候,线程池的线程数永远是coresize=5吗? 答:不是,这个让我想起薛定谔的猫:你打开看,猫就死了,或者猫活着,当然这里要表达的意思是: 如果你创建了线程池,但是一个任务都没有提交,其实看源码,他是一个线程都没有的,也就是coresize=0,只有你提交任务后,才会触发addworker添加线程的工作,触发执行gettask的死循环,触发校验最小线程数的逻辑


最新回复(0)