目录
线程池的七大参数
线程池的拒绝策略
线程池的执行过程 | 执行原理 | 任务投递后处理的优先级
为什么非核心优先执行投递的任务?
核心线程与非核心线程有什么区别?
线程池源码
线程池使用方式
线程池的核心属性
线程池状态
线程池处理任务的整体执行流程——execute
添加工作线程——addWorker
执行任务——runWorker
从工作队列中获取任务——getTask
工作线程结束——porocessWorkerExit
其他问题-源码
线程池的拒绝策略不满足需求怎么办
为什么线程池要添加非核心并且没有任务的工作线程addWroker(null,false);
Worker中的锁是干嘛的
在没任务时,线程池中的工作线程在干嘛?
工作线程出现异常会导致什么问题?是否抛出异常、影响其他线程吗、工作线程会嘎嘛?
工作线程继承AQS的目的是什么?
线程池的七大参数
public ThreadPoolExecutor(int corePoolSize,核心线程数int maximumPoolSize, 最大线程数long keepAliveTime, 最大摸鱼时间……TimeUnit unit, 摸鱼时间单位。BlockingQueue<Runnable> workQueue, 工作队列ThreadFactory threadFactory, 线程工厂RejectedExecutionHandler handler) { 拒绝策略……
线程池的拒绝策略
-
AbortPolicy:抛出异常!
-
CallerRunsPolicy:让提交任务的线程处理这个任务!
-
DiscardPolicy:啥也不做,任务没了!
-
DiscardOldestPolicy:扔掉队列最前面的任务,尝试把当前任务添加进去!
线程池的执行过程 | 执行原理 | 任务投递后处理的优先级
任务投递到线程池之后
-
如果当前线程池的线程个数,不满足你设定的核心线程数,那么就会创建核心线程去处理投递过来的任务。
-
如果线程个数等于设定的核心线程数,那么任务会尝试投递到工作队列中排队。
-
工作队列有长度,如果工作队列的长度大于排队的任务数,任务会被正常的投递到工作队列。
-
-
工作队列中的任务数和现在排队的任务数一致,任务无法投递到工作队列,此时需要创建一个非核心线程来处理刚刚投递过来的任务。
-
创建非核心线程时,还需要判断一下线程个数是否小于你设定的最大线程数,小于才会正常创建。
-
-
如果线程个数等于你设定的最大线程数,会执行拒绝策略。
任务处理流程:
主线程执行execute添加任务,线程池创建工作线程,执行任务,执行任务,再次拉取工作队列任务,直到工作队列没有任务,阻塞工作线程
工作线程阻塞在工作队列,主线程执行execute添加任务到工作队列,工作线程被唤醒,拿到工作队列中的任务执行,执行完毕,再次拉取工作队列任务,直到工作队列没有任务,阻塞工作线程
上述情况中能够看到,非核心优先执行投递的任务,为什么?
为什么非核心优先执行投递的任务?
首先,线程池的使用是为了提交任务给线程池,让线程池异步处理。
而在提交任务的这个过程中,其实是业务线程在执行的。
希望业务线程提交任务的过程要尽可能的短一些,让业务线程尽快的执行后续的逻辑。
如果让业务线程创建的非核心线程直接去处理提交过去的任务,速度相对是最快的一种形式。
如果让业务线程创建的非核心线程优先去拉取队列中最早投递的任务,然后业务线程再将任务投递到工作队列这种形式,就会让任务投递的过程变慢。
核心线程与非核心线程有什么区别?
没区别,核心线程跟非核心线程只有在创建的时候会区分,因为他要根据核心与非核心来决定判断哪个参数。是判断核心线程数,还是最大线程数。 而在干活的时候,他俩都是普通线程。
线程池只关注数量,无论你创建的时候,走的是核心的逻辑,还是非核心的逻辑,我只看数量。即便创建的时候走的是核心线程的逻辑,但是根据线程个人情况,多了一个线程,到达了最大空闲时间,也会干掉这个核心线程。
线程池源码
线程池使用方式
public static void main(String[] args) {// 线程池的核心线程数如何设置// 任务可以分为两种:CPU密集,IO密集。ThreadPoolExecutor executor = new ThreadPoolExecutor(1,2,1,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);// ...return t;}},new ThreadPoolExecutor.AbortPolicy());executor.execute(任务);executor.submit(有返回结果的任务);
}
线程池的核心属性
// AtomicInteger,就是一个int,写操作用CAS实现,保证了原子性
// ctl维护这线程池的2个核心内容:
// 1:线程池状态(高3位,维护着线程池状态)
// 2:工作线程数量(核心线程+非核心线程,低29位,维护着工作线程个数)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS=29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 工作线程的最大个数
// 00100000 00000000 00000000 00000000 - 1
// 000111111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;// 拿到线程池状态
// 011...
// 111...
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 拿到工作线程个数
// ...0000000111111
// ...1111111111111
private static int workerCountOf(int c) { return c & CAPACITY; }
线程池状态
线程池处理任务的整体执行流程——execute
public void execute(Runnable command) {// 非空!!if (command == null)throw new NullPointerException();// 拿到ctlint c = ctl.get();// 通过ctl获取当前工作线程个数if (workerCountOf(c) < corePoolSize) {// true:代表是核心线程,false:代表是非核心线程if (addWorker(command, true))// 如果添加核心线程成功,return结束掉return;// 如果添加失败,重新获取ctlc = ctl.get();}// 核心线程数已经到了最大值、添加时,线程池状态变为SHUTDOWN/STOP// 判断线程池是否是运行状态 && 添加任务到工作队列if (isRunning(c) && workQueue.offer(command)) {// 再次获取ctl的值int recheck = ctl.get();// 再次判断线程池状态。 DCL// 如果状态不是RUNNING,把任务从工作队列移除。if (! isRunning(recheck) && remove(command))// 走一波拒绝策略。reject(command);// 线程池状态是RUNNING。// 判断工作线程数是否是0个。// 可以将核心线程设置为0,所有工作线程都是非核心线程。// 核心线程也可以通过keepAlived超时被销毁,所以如果恰巧核心线程被销毁,也会出现当前效果else if (workerCountOf(recheck) == 0)// 添加空任务的非核心线程去处理工作队列中的任务addWorker(null, false);}// 可能工作队列中的任务存满了,没添加进去,到这就要添加非核心线程去处理任务else if (!addWorker(command, false))// 执行拒绝策略!reject(command);
}
添加工作线程——addWorker
private boolean addWorker(Runnable firstTask, boolean core) {xxx:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 判断线程池状态if (rs >= SHUTDOWN &&// 判断如果线程池的状态为SHUTDOWN,还要处理工作队列中的任务// 【SHUTDOWN】不接收新任务// 如果你添加工作线程的方式,是任务的非核心线程,并且工作队列还有任务! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;// 判断工作线程个数for (;;) {int wc = workerCountOf(c);// 判断1:工作线程是否已经 == 工作线程最大个数// 判断2-true判断:判断是核心线程么?如果是判断是否超过核心线程个数// 判断2-false判断:如果是非核心线程,查看是否超过设置的最大线程数if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))return false;// 对工作线程进行 + 1操作if (compareAndIncrementWorkerCount(c))// +1成功,跳出外层循环,执行添加工作线程的业务// 以CAS方式,对ctl+1,多线程并发操作,只有会有一个成功break xxx;// 重新拿ctl,c = ctl.get();// 判断线程池状态是否有变化if (runStateOf(c) != rs)continue xxx;}}// 添加工作线程的业务 // 工作线程启动了吗?boolean workerStarted = false;// 工作线程添加了吗?boolean workerAdded = false;// Worker就是工作线程Worker w = null;try {// 创建工作线程,将任务传到Worker中w = new Worker(firstTask);final Thread t = w.thread;// 只有你写的线程工厂返回的是null,这里才会为nullif (t != null) {// 获取锁资源final ReentrantLock mainLock = this.mainLock;// 加锁。 因为我要在启动这个工作线程时,避免线程池状态发生变化,加锁。mainLock.lock();try {// 重新获取ctl,拿到线程池状态int rs = runStateOf(ctl.get());// DCL i think you know~~~if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 判断Worker中的thread是否已经启动了,一般不会启动,除非你在线程工厂把他启动了if (t.isAlive()) throw new IllegalThreadStateException();// 将工作线程存储到hashSet中workers.add(w);// 获取工作线程个数,判断是否需要修改最大工作线程数记录。int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;// 工作线程添加成功 0workerAdded = true;}} finally {mainLock.unlock();}// 如果添加成功if (workerAdded) {// 启动工作线程t.start();// 设置标识为trueworkerStarted = true;}}} finally {// 如果工作线程启动失败if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}// 如果添加工作线程失败,执行
private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 说明worker可能存放到了workers的hashSet中。if (w != null)// 移除!workers.remove(w);// 减掉workerCount的数值 -1decrementWorkerCount();// 尝试干掉自己tryTerminate();} finally {mainLock.unlock();}
}
执行任务——runWorker
final void runWorker(Worker w) {// 拿到当前线程对象Thread wt = Thread.currentThread();// 拿到worker中存放的RunnableRunnable task = w.firstTask;// 将worker中的任务清空w.firstTask = null;// 揍是一个标识boolean completedAbruptly = true;try {// 如果Worker自身携带任务,直接执行// 如果Worker携带的是null,通过getTask去工作队列获取任务while (task != null || (task = getTask()) != null) {w.lock();// 判断线程池状态是否大于等于STOP,如果是要中断当前线程if ((runStateAtLeast(ctl.get(), STOP) ||// 中断当前线程(DCL)(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())wt.interrupt();try {// 前置钩子beforeExecute(wt, task);Throwable thrown = null;try {// 执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 后置钩子afterExecute(task, thrown);}} finally {task = null;// 当前工作执行完一个任务,就++w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
从工作队列中获取任务——getTask
private Runnable getTask() {// 超时-falseboolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// 线程池状态判断// 如果线程池状态为SHUTDOWN && 工作队列为空// 如果线程池状态为STOPif (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// 对工作线程个数--decrementWorkerCount();return null;}// 对数量的判断。int wc = workerCountOf(c);// 判断核心线程是否允许超时?// 工作线程个数是否大于核心线程数boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 判断工作线程是否超过了最大线程数 && 工作队列为nullif ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {// 工作线程数有问题,必须-1,干掉当前工作线程// 工作线程是否超过了核心线程,如果超时,就干掉当前线程// 对工作线程个数--if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 如果是非核心,走poll,拉取工作队列任务,// 如果是核心线程,走take一直阻塞,拉取工作队列任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 当工作队列没有任务时,这时就会被Condition通过await阻塞线程// 当有任务添加到工作线程后,这是添加完任务后,就会用过Condition.signal唤醒阻塞的线程workQueue.take();if (r != null)return r;// 执行的poll方法,并且在指定时间没拿到任务,timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}
工作线程结束——porocessWorkerExit
其他问题-源码
线程池的拒绝策略不满足需求怎么办
线程池自带四种,如何可以满足业务需求,直接用即可,如果不满足,可以自行实现RejectedExecutionHandler接口,重写功能。
为什么线程池要添加非核心并且没有任务的工作线程addWroker(null,false);
当前工作线程个数为0,但是工作队列中有任务
此时就需要添加一个非核心并且空任务的工作线程去处理阻塞队列中的任务
Worker中的锁是干嘛的
Worker中基于AQS实现了一个非可重入锁。
Worker为了避免中断线程时,Worker还没有初始化完成,导致出现问题。
在没任务时,线程池中的工作线程在干嘛?
线程会挂起,默认核心线程是WAITING状态,非核心是TIMED_WAITING
如果是核心线程,默认情况下,会在阻塞队列的位置执行take方法,直到拿到任务为止。
如果是非核心线程,默认情况下,会在阻塞队列的位置执行poll方法,等待最大空闲时间,如果没任务,直接拉走咔嚓掉,如果有活,那就正常干。
工作线程出现异常会导致什么问题?是否抛出异常、影响其他线程吗、工作线程会嘎嘛?
如果任务是execute方法执行的,工作线程会将异常抛出。
如果任务是submit方法执行的futureTask,工作线程会将异常捕获并保存到FutureTask里,可以基于futureTask的get得到异常信息
出现异常的工作线程不会影响到其他的工作线程。
runWorker中的异常会被抛到run方法中,run方法会异常结束,run方法结束,线程就嘎了!
如果是submit,异常没抛出来,那就不嘎。
工作线程继承AQS的目的是什么?
工作线程的本质就是Worker对象,继承AQS跟shutdown和shutdownNow有关系。
如果是shutdown,会中断空闲的工作线程,基于Worker实现的AQS中的state的值来判断能否中断工作线程。
如果工作线程的state是0,代表空闲,可以中断,如果是1,代表正在干活。
如果是shutdownNow,直接强制中断所有工作线程