线程池
线程池:一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作。
线程池的作用
- 降低资源消耗,避免频繁的创建和销毁线程。每个工作线程都可以被重复利用,可执行多个任务。
- 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死。
- 对线程进行管理,防止无休止的创建线程而引起资源匮乏进而导致的系统崩溃,对其进行统一的分配,调优和监控。
核心思想
线程池的核心思想是线程复用,即同一个线程可以被重复使用,来处理多个任务。也就是使用池化技术 (Pool),一种编程技巧,核心思想是资源复用,在请求量大时能优化应用性能,降低系统频繁建连的资源开销。
在线程池中,同一个线程可以从阻塞队列中不断获取新任务来执行,并不是每次执行任务都会调用 Thread.start() 来创建新线程,而是让每个线程去执行一个“循环任务”,在这个“循环任务”中不停的检查是否有任务需要被执行。
⭐Java的线程池
Java 主要是通过构建ThreadPoolExecutor
来创建线程池的。接下来我们看一下线程池是如何构造出来的
其构造方法如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
从其构造方法可以看出,其参数有7个,分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。下边是对每一个参数的解释:
- corePoolSize:线程池中用来工作的核心线程数量,核心线程,即使空闲也不会被销毁。
- maximumPoolSize:最大线程数,线程池允许创建的最大线程数,非核心线程数:maximumPoolSize-corePoolSize
- keepAliveTime:空闲线程存活时间。一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定,只影响非核心线程
- unit:空闲线程存活时间单位,keepAliveTime的时间单位。
- workQueue:任务队列,是一个阻塞队列,当线程数达到核心线程数后,会将任务存储在阻塞队列中。JDK提供了许多队列,常见的如下:
- ArrayBlockingQueue:FIFO,基于数组的有界阻塞队列,可以防止资源耗尽问题。
- LinkedBlockingQueue:基于链表的无界阻塞队列(其实最大容量为Interger.MAX),使用该队列时,就不会创建非核心线程,容易溢出。
- SynchronousQuene:SynchronousQueue没有容量,生产者放入一个任务,必须等待一个消费者取出,继续放入新的任务。
- PriorityBlockingQueue:具有优先级的无界阻塞队列,优先级通过参数Comparator实现
- threadFactory :线程池内部创建线程所用的工厂,创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等。
- handler:拒绝策略;当队列已满并且线程数量达到最大线程数量时,会调用该方法处理任务。JDK内置的拒绝策略如下:
- AbortPolicy:直接抛出异常,阻止系统正常运行。
- CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
- DiscardOldestPolicy:丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
- DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案
注意的几个点:
- keepAliveTime设置的只是非核心线程的存活时间,不影响核心线程。
- 当核心线程已经全部工作,此时还有任务来,就会被存储在任务队列中;如果任务队列已满,并且核心线程没有空闲,那么就会创建非核心线程(数量不超过maximumPoolSize-corePoolSize)来去处理新来的线程,而不是去任务队列中拿。
贴一个简单的线程池的实现
ThreadPool.java:
package com.qcby.threadpool;public interface ThreadPool<T extends Runnable> {void execute(T job);void shutdown();void addWorks(int num);void removeWorks(int num);int getJobSize();
}
SimpleThreadPool.java
package com.qcby.threadpool;import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;public class SimpleThreadPool<Job extends Runnable> implements ThreadPool<Job> {/*** 最大的工作者数量也即线程数量*/private static final int MAX_WORKER_NUMBERS = 10;/*** 默认的线程数量*/private static final int DEFAULT_WORKER_NUMBERS = 5;/*** 最小的线程数量*/private static final int MIN_WORKER_NUMBERS = 1;/*** 任务队列*/private final LinkedList<Job> jobs = new LinkedList<>();private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());/*** 用来标记每个线程,使用线程安全的计数器*/private AtomicLong threadNum = new AtomicLong();/*** 工作者数量*/private int workerNum = DEFAULT_WORKER_NUMBERS;public SimpleThreadPool() {initializeWorkers(DEFAULT_WORKER_NUMBERS);}public SimpleThreadPool(int num) {this.workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;initializeWorkers(workerNum);}/*** 初始化线程** @param num*/private void initializeWorkers(int num) {for (int i = 0; i < num; i++) {Worker worker = new Worker();workers.add(worker);Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());thread.start();}}/*** 执行任务** @param job*/@Overridepublic void execute(Job job) {if (job != null) {synchronized (jobs) {jobs.addLast(job);jobs.notify();}}}/*** 关闭线程*/@Overridepublic void shutdown() {for (Worker worker : workers) {worker.shutdown();}}/*** 使用synchroized保证线程安全* 创建新的线程,保证总的线程数量不超过MAX_WORKER_NUMBERS** @param num*/@Overridepublic void addWorks(int num) {synchronized (jobs) {if (num + workerNum > MAX_WORKER_NUMBERS) {num = MAX_WORKER_NUMBERS - workerNum;}initializeWorkers(num);workerNum += num;}}@Overridepublic void removeWorks(int num) {synchronized (jobs) {if (num >= workerNum) {throw new IllegalArgumentException("beyond workNum");}int count = 0;while (count < num) {Worker worker = workers.get(count);if (workers.remove(worker)) {worker.shutdown();count++;}}workerNum -= count;}}@Overridepublic int getJobSize() {return jobs.size();}class Worker implements Runnable {private volatile boolean running = true;@Overridepublic void run() {while (running) {Job job = null;synchronized (jobs) {while (jobs.isEmpty()) {try {jobs.wait();} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}}job = jobs.removeFirst();}if (job != null) {job.run();}}}public void shutdown() {running = false;}}
}