文章目录
- 线程池存在的逻辑
- 线程池的继承逻辑结构
- 线程池策略模拟示意图集
- 线程池的构造方法(参数含义)
- corePoolSize
- maximumPoolSize
- keepAliveTime
- unit
- workQueue
- threadFactory
- RejectedExecutionHandler
- 线程池完整测试代码
- 线程池的高度封装类
- Executors简介
- 测试代码
线程池存在的逻辑
将一些数据进行池化
操作是我们常用的处理手段, 其实也就是一种缓存的思路, 我们已经接触过的池化
技术有以下这几种
- 字符串常量池
- 整数型常量池
- 数据库连接池
- 线程池
虽然创建线程 / 销毁线程的开销以及比进程低很多了, 但是还存在一些问题
- 想象这么⼀个场景:
在学校附近新开了⼀家快递店,⽼板很精明,想到⼀个与众不同的办法来经营。店⾥没有雇⼈,⽽是每次有业务来了,就现场找⼀名同学过来把快递送了,然后解雇同学。这个类⽐我们平时来⼀个任务,起⼀个线程进⾏处理的模式。很快⽼板发现问题来了,每次招聘 + 解雇同学的成本还是⾮常⾼的。⽼板还是很善于变通的,知道了为什么⼤家都要雇⼈了,所以指定了⼀个指标,公司业务⼈员会扩张到 3 个⼈,但还是随着业务逐步雇⼈。于是再有业务来了,⽼板就看,如果现在公司还没 3 个⼈,就雇⼀个⼈去送快递,否则只是把业务放到⼀个本本上,等着 3 个快递⼈员空闲的时候去处理。这个就是我们要带出的线程池的模式。
线程池最⼤的好处就是减少每次启动、销毁线程的损耗。除了把线程池化为线程池来降低开销以外, 还可以通过协程(轻量级线程)
的方式来减少开销, 这也是Go语言
威胁Java语言
的最重要的点(更好的解决高并发)
线程池的继承逻辑结构
Executor
接口ExecutorService
接口AbstractExecutorService
抽象类ThreadPoolExecutor
类
继承结构图
Executor
是一个接口, 里面有一个重要的方法execute
这个就是作为执行任务的入口
ExecutorService
是一个接口, 继承了Executor
接口, 里面也有许多重要的方法
awaitTermination
这个方法的作用是等待线程池的线程任务全部执行完毕之后再关闭线程
shutdown
方法的作用就是关闭线程池(任务不一定都执行完毕)
submit
方法的作用是执行当前传入的任务
AbstractExecutorService
和ThreadPoolExecutor
就是对上面接口的封装…(具体自行翻阅文档)
线程池策略模拟示意图集
我们来解释线程池模型很好的一个例子就是银行排队叫号系统
最开始的情况就是下图
初态的时候我们的银行有5个办理服务的窗口, 其中3个为常开的窗口(核心线程数
)还有2个为待机的窗口, 在客户访问量变大的时候才开启(非核心线程数
), 还有一个排队的区域, 容量为3, 其实就是阻塞队列
假设现在来了三个用户需要提供服务, 此时直接来到这三个窗口进行服务
此时我们的三个客户直接在三个窗口执行任务了(核心线程), 根本不会进入阻塞队列等待, 我们的测试代码如下
/*** 本节测试的是关于线程池相关的操作细节*/
public class ThreadTest {public static void main(String[] args) {ArrayBlockingQueue<Runnable> runnables = new ArrayBlockingQueue<>(3);ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5,1L, TimeUnit.SECONDS,runnables, Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 模拟i个人进行排队for(int i = 0; i < 3; i++){int id = i;threadPool.execute(() -> {System.out.println(runnables.size());System.out.println(Thread.currentThread().getName() + " id = " + id);});}}
}
执行结果
证明我们的三个核心线程之间进行了任务的执行
假设人数再多一点, 达到5个人(小于6个人即可), 此时我们多出来的人就在队列里面排队就可以了
我们使用下面的代码测试, 每一个任务中间都有一个while(true)
循环, 所以如果一个线程抢到了一个位置就不会放弃…
测试代码
/*** 本节测试的是关于线程池相关的操作细节*/
public class ThreadTest {public static void main(String[] args) {ArrayBlockingQueue<Runnable> runnables = new ArrayBlockingQueue<>(3);ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5,1L, TimeUnit.SECONDS,runnables, Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 模拟i个人进行排队for(int i = 0; i < 5; i++){int id = i;threadPool.execute(() -> {System.out.println(runnables.size());System.out.println(Thread.currentThread().getName());while(true) {}});}}
}
此时说明客户1 , 2 , 3 占用了三个线程进行任务的执行, 然后剩下的两个客户 4, 5 进入到阻塞队列进行无休止的等待, 也就是说, 只要阻塞队列中还有位置, 就不会去开新的线程
当我们的客户的数量大于核心线程数量 + 阻塞队列的容量, 并且小于最大线程数 + 阻塞队列容量这里指的取值范围也就是 [6, 8], 此时就需要把非核心线程进行启动, 假设此时有7个客户, 场景如下
我们的当前的测试代码还是之前的那个, 带while(true)
循环的版本, 不过把 i置为7
, 此时我们查看结果
此时阻塞队列中的元素有3个, 开启一个非核心线程, 也就是一共四个线程进行服务…
当我们的任务的数目大于阻塞队列的容量 + 最大线程数, 也就是 n > 8, 此时已经没有可以容纳新的任务的位置了, 此时就会触发拒绝策略, 我们当前的拒绝策略是 AbortPolicy
也就是抛出异常(其实不是一个好的方案)
此时的示意图如下
我们把i置为10
进行测试(还是上面的测试代码)
测试结果如下
此时程序抛出异常, 可以看到5个线程全部启动, 而且阻塞队列还是满的…
我们上面的关于线程池的真实的执行逻辑的测试使用图已经案例结合起来看应该是十分好懂的…
线程池的构造方法(参数含义)
我们分析的是最后一个带有七个参数的构造方法
下面是JDK线程池构造方法(七个参数版本)的源代码
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
corePoolSize
核心线程数量, 也就是核心的线程数, 也就是上面我们的举的例子中的一直开着的窗口
其实换句话说, 就是最小的线程数量, 在一创建线程池的时候就会存在
maximumPoolSize
最大的线程数量, 也就是上面的我们举的例子中的核心线程加上非核心线程的数量, 在上面柜台的例子中就是5个(3个核心的, 还有2个根据需要进行自动扩容的)
keepAliveTime
非核心线程的最大生存时间, 我们的非核心线程在任务量多的时候开启, 在任务量小的时候关闭, 但是也不是没有任务之后就立即关闭, 中间也有一个最大的缓冲时间keepAliveTime
, 假设执行任务之后并且等待的时间大于这个最大的时间, 那我们这个非核心线程就选择关闭
unit
最大等待时间的单位, 我们上面说了 keepAliveTime
的参数的含义, 这个其实就是那个时间的单位, 是一个枚举类型, 我们打开TimeUnit
进行查看
比如我们想要让传入的最大存活时间的单位为秒
ArrayBlockingQueue<Runnable> runnables = new ArrayBlockingQueue<>(3);ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5,1L, TimeUnit.SECONDS,runnables, Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
workQueue
工作队列, 其实也就是阻塞队列, 其实也就是上面我们银行叫号的例子中的阻塞队列…
我们任务的可以组织其按照什么样的数据结构进行存储, 比如数组, 链表, 优先级队列
threadFactory
这个是给线程池中的执行任务的线程的定义, 其实这种策略也是工厂模式(一种设计模式的体现)
这其实是一个接口, 正常的使用是通过一个类实现这个接口然后重写newThread
方法来实现的
但是我们的系统提供了一个默认的可选项
Executors.defaultFactory()
RejectedExecutionHandler
这是最重要的一个参数拒绝策略
也就是上面我们说的, 当新添加的任务真的没有地方放的时候, 就只能进行拒绝策略
的逻辑, 我们的拒绝策略的相关类信息如下
本质上是一个接口, 有4个实现类, 也就是ThreadPoolExecutor
中的静态内部类来实现
ThreadPoolExecutor.AbortPolicy
: 抛出RejectedExecutionException
被拒绝任务的处理程序ThreadPoolExecutor.CallerRunsPolicy
: 被拒绝任务的处理程序直接在 execute 方法的调用线程中运行被拒绝的任务,除非执行程序已关闭,在这种情况下任务将被丢弃ThreadPoolExecutor.DiscardOldestPolicy
: 被拒绝任务的处理程序会丢弃最早的未处理请求,然后重试 execute ,除非执行程序已关闭,在这种情况下任务将被丢弃ThreadPoolExecutor.DiscardPolicy
: 被拒绝任务的处理程序,它默默地丢弃被拒绝的任务
线程池完整测试代码
/*** 本节测试的是关于线程池相关的操作细节*/
public class ThreadTest {public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5, 1L, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());// 进行任务的执行for(int i = 0; i < 10; i++){// 防止发生变量捕获, 把i的值给到idint id = i;threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " id = " + id);});}}
}
此时的拒绝策略是 ThreadPoolExecutor.CallerRunsPolicy
, 也就是由当前的线程执行拒绝的任务
可以看到main
线程参与了任务的执行
线程池的高度封装类
Executors简介
由于我们关于线程池的构造比较的复杂, 所以JDK提供了另外一套类来对线程池进行更加抽象的封装, 这个类就是Executors
之前说过, 这个类其实也是一个工厂类(存在大量的静态工厂方法), 关键的方法也就三个(第一个我们上面说, 获取默认的ThreadFactor
已经说过了)
newCachedThreadPool()
: 构造并返回一个线程池, 这个线程池可以自行的扩容(上限是一个非常大的值)newFixedThreadPool(int nThreads)
: 返回一个固定线程大小的线程池, 这个线程池的容量是确定的- 返回的实际上是一个
ExecutorService
类型, 这是一个接口,ThreadPoolExecutor
实现了这个接口
在任务执行完毕之后, 使用下面的方法关闭线程池
// 这个方法是等待所有的任务结束之后, 或者是超时之后关闭线程池threadPool.awaitTermination(1000, TimeUnit.SECONDS);// 启动有序关闭,其中执行先前提交的任务,但不会接受新任务threadPool.shutdown();
但是, 在阿里巴巴开发手册这本书中提到, 实际的开发中, 我们要使用带有完整参数版本的线程池构造, 不可以使用封装之后的线程池构造, 因为这种高度的封装让参数都是隐式的, 不好控制所以要具体问题, 具体规则具体分析应用, 可能不同公司不一样
测试代码
/*** 由于线程池的参数比较的多也比较的复杂, 我们的JDK还提供了一种更简单的获取线程池的方式*/
public class ThreadTest01 {public static void main(String[] args) throws InterruptedException {ExecutorService threadPool = Executors.newCachedThreadPool();for(int i = 0; i < 10; i++){int id = i;threadPool.submit(() -> {System.out.println(Thread.currentThread().getName() + " id = " + id);});}// 这个方法是等待所有的任务结束之后, 或者是超时之后关闭线程池threadPool.awaitTermination(1000, TimeUnit.SECONDS);// 这个方法就是之间关闭线程池threadPool.shutdown();}
}
另外还有关于submit
和execute
的区别, 目前还没有感觉到…