您的位置:首页 > 汽车 > 新车 > Java 异步编编程——Java内置线程池(Executor 线程池)

Java 异步编编程——Java内置线程池(Executor 线程池)

2025/1/5 16:40:46 来源:https://blog.csdn.net/longool/article/details/139304774  浏览:    关键词:Java 异步编编程——Java内置线程池(Executor 线程池)

文章目录

  • 知道线程池是什么以及解决什么问题
  • Java 内置线程池
    • Java 内置线程池设计结构及执行机制
    • ThreadPoolExecutor 中的概念
      • 生命周期
      • 核心参数
      • 阻塞队列
      • 4 种任务拒绝策略
    • 线程池使用场景

知道线程池是什么以及解决什么问题

线程池(Thread Pool)是一种基于池化思想管理线程的工具,用于管理和复用线程的技术。它由一组预先创建的线程组成,这些线程可以在需要时被重复使用来执行多个任务,从而避免了频繁创建和销毁线程的开销。

线程创建是有一定的开销,频繁创建销毁线程的开销会降低系统性能。线程池可以维护多个已创建的线程,一方面避免了处理任务时频繁创建销毁线程开销的代价,另一方面避免了线程无上限创建导致资源浪费及过分调度问题,保证了对系统资源的充分利用。

线程池解决的核心问题就是资源管理的问题。在并发环境下,任意时刻都无法确定有多少任务需要执行,需要多少资源投入使用。由于这些不确定因素在使用多线程时会带来以下若干问题:

  1. 频繁创建和销毁线程带来较大的开销。
  2. 使用不当出现无限制地创建线程,不停的系统申请资源,缺少抑制手段,容易引发系统资源耗尽的风险。
  3. 系统无法合理有效地管理与分配资源,降低系统的稳定性。

那么,使用线程池可以带来有效解决上述可能出现的一系列问题:

  1. 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  2. 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  4. 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

Java 内置线程池

在Java 异步编程——Java内置线程调度器(Executor 框架)一文中简单介绍了 Executor 框架以及两级调度模型,现在详细介绍 Executor 框架中相关的线程池类。

Executor、ExecutorService、ThreadPoolExecutor、Executors

Java 里面线程池的顶级接口是 Executor,顶层接口 Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。但是严格意义上讲 Executor 并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是 ExecutorService (一个 ExecutorService 对象代表一个线程池),这个接口里面声明了一系列管理线程和调度的方式,如:submit()、invokeAll()、invokeAny()、shutDown() 和 shutdownNow() 等,返回 Future 对象,以及可跟踪一个或多个异步任务执行状况返回 Future 的方法。而 ThreadPoolExecutor 是Java中最核心的一个线程池实现类,下面主要以 ThreadPoolExecutor 为例对线程池展开详细介绍。Executors 类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了 ExecutorService 接口。
在这里插入图片描述

Java 内置线程池设计结构及执行机制

ThreadPoolExecutor 是Java中最核心的一个线程池实现类,这里介绍 ThreadPoolExecutor 线程池的运行机制,如下图所示:
在这里插入图片描述
执行机制(任务调度与线程调度)

  1. 在创建了线程池后,开始等待请求。
  2. 当调用 execute() 或 submit() 方法添加一个请求任务时,线程池会做出如下判断:
    1. 当任务数量少于 corePoolSize 线程池中的常驻核心线程数时,会自动创建 thread 来处理这些任务并加入线程池;
    2. 当添加任务数大于 corePoolSize 且少于 maximumPoolSize 时,不再创建线程,而是将这些任务放到阻塞队列中,等待被执行;
    3. 当阻塞队列满了之后,线程池中的线程数没有超过 maximumPoolSize 最大线程数(非核心线程数),则新建一个线程处理任务并加入到线程池中,从而加速处理阻塞队列;
    4. 当阻塞队列满了之后,且添加任务大于 maximmPoolSize 时,根据饱和策略决定是否容许继续向线程池中添加任务,默认的饱和策略是 AbortPolicy(直接丢弃)。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。
  4. 空闲的线程(非核心线程)会在到达 keepAliveTime 时间之后没有被使用的话就被回收(超过核心线程数的线程销毁,核心线程数以内的线程扔回池子待命)。所以线程池的所有任务完成后,线程池最终收缩只保留 corePoolSize 核心线程数的大小。

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦开来,从而更方便地管理任务和线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

ThreadPoolExecutor 中的概念

生命周期

线程池通过维护5种状态来实现线程池的生命周期:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。

  • RUNNING:线程池被一旦被创建,就处于RUNNING状态,线程池处在RUNNING状态时,能够接收新任务,以及对已添加到队列中的任务进行处理。

  • SHUTDOWN:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN,线程池处在SHUTDOWN状态时,不接收新任务,但会继续执行已提交到任务队列中的任务,直到队列为空。当所有任务执行完成后,线程池会进入TIDYING状态。

  • STOP:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN) -> STOP,线程池处在STOP状态时,不接收新任务,不处理队列中的任务,并且会中断正在处理的任务。同时,它会丢弃任务队列中尚未执行的任务。当所有任务执行完成或被中断后,线程池会进入TIDYING状态;

  • TIDYING:当线程池在SHUTDOWN or STOP状态下,所有任务已经执行完毕,线程池中的活动线程数量为0,线程池会进入TIDYING状态,进行一些清理操作;

    在TIDYING状态下,线程池会执行一些清理操作。当线程池完成清理操作后,会进入TERMINATED状态。

  • TERMINATED:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。线程池彻底终止,可以进行一些释放资源的操作。
    当线程池变为 TIDYING 状态时,会执行钩子函数 terminated()。terminated() 在 ThreadPoolExecutor 类中是空的,若用户想在线程池变为 TIDYING 时,进行相应的处理;可以通过重载 terminated() 函数来实现。
    在这里插入图片描述

核心参数

  1. corePoolSize:线程池中的常驻核心线程数。核心池的大小。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法,这2个方法是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中;

  2. maximumPoolSize:线程池最大线程数(非核心线程数),表示在线程池中最多能创建多少个线程;

  3. keepAliveTime:多余的空闲线程的存活时间。表示线程没有任务执行时最多保持多久时间会终止。当前线程池数量超过corePoolSize时,当空闲的时间达到keepAliveTime值时,多余的空闲线程会被直接销毁直到只剩下corePoolSize个线程为止。

    默认情况下,只有当线程池中的线程数大于 corePoolSize 时,keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize,即当线程池中的线程数大于 corePoolSize 时,如果一个线程空闲的时间达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean) 方法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为0;

  4. TimeUnit:时间单位,keepAliveTime 的单位。

  5. ThreadFactory:线程工厂,主要用来创建线程,用于创建线程一般用默认的即可。

  6. workQueue:一个阻塞队列,用来存储等待执行的任务(被提交但未被执行的任务),维护着等待执行的 Runnable 对象。

  7. RejectedExecutionHandler:饱和策略,线程拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数。

阻塞队列

在前面我们多次提到了任务缓存队列,即 workQueue,它用来存放等待执行的任务。

为什么出现 BlockingQueue

BlockingQueue 继承了 Queue 接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。在新增的 Concurrent 包中(java.util.concurrent.BlockingQueue)。

BlockingQueue 很好的解决了多线程高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

作为 BlockingQueue 的使用者,再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)。

阻塞队列的特点

阻塞队列与普通队列的区别在于:

  • 当队列是空的时,从队列中获取元素的操作将会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。

  • 当队列是满时,往队列里添加元素的操作会被阻塞。试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列。

  • 阻塞队列提供了四种处理方法:

    方法\处理方式抛出异常返回特殊值一直阻塞超时退出
    插入方法add(e)offer(e)put(e)offer(e,time,unit)
    移除方法remove()poll()take()poll(time,unit)
    检查方法element()peek()不可用不可用
    • 异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出 IllegalStateException(“Queue full”) 异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常 。

    • 返回特殊值:插入方法会返回是否成功,成功则返回 true。移除方法,则是从队列里拿出一个元素,如果没有则返回 null。

    • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
    • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

阻塞队列的成员

队列有界性数据结构
ArrayBlockingQueuebounded(有界)加锁arrayList
LinkedBlockingQueueoptionally-bounded加锁linkedList
PriorityBlockingQueueunbounded加锁heap
DelayQueueunbounded加锁heap
SynchronousQueuebounded加锁
LinkedTransferQueueunbounded加锁heap
LinkedBlockingDequeunbounded无锁heap
  • ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】

    • ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

      数组实现、有界、先进先出、队列不支持空元素、线程安全的阻塞循环队列;

      对数据操作时,共用一把锁,所以不能同时读写操作;

      enqueue()和dequeue()方法是入队和出队的核心方法,他们分别通知”队列非空”和”队列非满”,从而使阻塞中的入队和出队方法能够继续执行,以实现生产者消费者模式。

  • LinkedBlockingQueue:一个由链表结构组成的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE(65536),当大量请求任务时,容易造成内存耗尽。

    链表实现、先进先出、队列不支持空元素、线程安全的阻塞队列。

    内部有两把锁,插入和取出各一把,互不干扰,所以能同时进行读写操作。

    由于 FixedThreadPool 和 SingleThreadExecutor 的线程数是固定的,所以一般用容量无穷大的 LinkedBlockingQueue 队列来存任务。

  • PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。

    • PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

      数组实现、通过排序指定出对规则、不允许为null的元素插入、线程安全的无界队列。

      可以指定内部元素的排序规则,依赖Comparator来确保不同元素的排序位置。

      添加到PriorityBlockingQueue队列中的元素对应的Java类,通常需要实现Comparable接口或者是可以默认排序的对象(如数字、字符串),否则会抛出 ClassCastException。

  • DelayQueue: 一个实现 PriorityBlockingQueue 实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。(DelayQueue可以运用在以下

    应用场景:

    1. 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

    2. 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)

      可实现延时、用于放置实现了Delayed接口的对象、不允许为null的元素插入、线程安全的无界队列。

  • SynchronousQueue:一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool() 就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

    • 一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者。

      容量为0,不存储任何元素,即没有数据缓存的空间。一对一,生产者和消费者缺一就阻塞。入队线程和出队线程必须一一匹配,否则任意先到达的线程会阻塞。比如ThreadA进行入队操作,在有其它线程执行出队操作之前,ThreadA会一直等待,反之亦然

      SynchronousQueue 提供两种实现方式,分别是 队列 的方式实现。这两种实现方式中, 是属于非公平的策略,队列 是属于公平策略。默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)。

      在java线程池newCachedThreadPool中就使用了这种阻塞队列。

  • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。

  • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

4 种任务拒绝策略

当线程池的任务缓存队列已满并且线程池中的线程数目达到 maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

  1. ThreadPoolExecutor.AbortPolicy(默认):丢弃任务并抛出 RejectedExecutionException 异常。
  2. ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,并将新任务加入对列,然后重新尝试执行任务(重复此过程)
  4. ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

自定义拒绝策略:实现 RejectedExecutionHandle 接口。

线程池使用场景

现在计算机基本都是多核CPU,为了最大程度利用CPU的多核性能,提升并行运算的能力。通过线程池管理线程获取CPU进行并发处理是基本操作,现在结合场景需求使用线程池提高并发性。

场景1:快速响应用户请求场景

描述:用户发起的实时请求,服务追求响应时间。比如:用户要查看一个商品的信息,那么我们需要将商品维度的一系列信息(如商品的价格、优惠、库存、图片等等)聚合起来,展示给用户。

分析:从用户体验角度看,这个结果响应的越快越好,如果一个页面半天都刷不出,用户可能就放弃查看这个商品了。而面向用户的功能聚合通常非常复杂,伴随着调用与调用之间的级联、多级级联等情况,业务开发同学往往会选择使用线程池这种简单的方式,将调用封装成任务并行的执行,缩短总体响应时间。另外,使用线程池也是有考量的,这种场景最重要的就是获取最大的响应速度去满足用户,所以应该不设置队列去缓冲并发任务,应该提高corePoolSize和maxPoolSize去尽可能创造多的线程快速执行任务。

场景2:快速处理批量任务场景

描述:离线的大量计算任务,需要快速执行。比如:统计某个报表,需要计算出全国各个门店中有哪些商品有某种属性,用于后续营销策略的分析,那么我们需要查询全国所有门店中的所有商品,并且记录具有某属性的商品,然后快速生成报表。

分析:这种场景需要执行大量的任务,我们也会希望任务执行的越快越好。这种情况下,也应该使用多线程策略,并行计算。但与响应速度优先的场景区别在于,这类场景任务量巨大,并不需要瞬时的完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com