您的位置:首页 > 游戏 > 游戏 > ios软件开发_建工厂网站的公司_在线代理浏览网站免费_企业营销咨询

ios软件开发_建工厂网站的公司_在线代理浏览网站免费_企业营销咨询

2025/3/15 14:53:15 来源:https://blog.csdn.net/qq_36070104/article/details/142991659  浏览:    关键词:ios软件开发_建工厂网站的公司_在线代理浏览网站免费_企业营销咨询
ios软件开发_建工厂网站的公司_在线代理浏览网站免费_企业营销咨询

前置推荐阅读:java并发之线程池使用-CSDN博客

自定义实现一个带监控的线程池

首先我们继承ThreadPoolExecutor,实现构造函数以及重写beforeExecute和afterExecute两个函数,具体调用我们会在代码实现层面进行详细的分析。

import java.util.concurrent.*;public class AsyncThreadPool extends ThreadPoolExecutor {/*** 任务队列*/private BlockingQueue<Runnable> workerQueue;public AsyncThreadPool(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler){super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);this.workerQueue = workQueue;}/*** 在任务执行之后**** @param r 执行任务* @param t 异常信息*/@Overrideprotected void afterExecute(Runnable r, Throwable t) {System.out.println("AsyncThreadPool afterExecute threadName:"+Thread.currentThread().getName()+", afterExecutor queueSize:"+workerQueue.size()+" !!!");}/*** 在任务执行之前**** @param t 执行线程* @param r 异常信息*/@Overrideprotected void beforeExecute(Thread t, Runnable r) {System.out.println("AsyncThreadPool beforeExecute threadName:"+Thread.currentThread().getName()+", afterExecutor queueSize:"+workerQueue.size()+" !!!");}}

创建Util并重写ThreadFactory,代码如下:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;public class AsyncThreadPoolUtil {/*** 默认线程数(当前cpu核心数量)*/private static final int DEFAULT_CORE_THREAD_SIZE = Runtime.getRuntime().availableProcessors() * 2 + 1;/*** 默认工作队列*/private static final LinkedBlockingDeque<Runnable> DEFAULT_WORKER_QUEUE = new LinkedBlockingDeque<>(20);private ThreadPoolExecutor threadPoolExecutor;public AsyncThreadPoolUtil(String threadName){this(DEFAULT_CORE_THREAD_SIZE,DEFAULT_CORE_THREAD_SIZE,DEFAULT_WORKER_QUEUE,threadName);}public AsyncThreadPoolUtil(int coreThreadSize, int maxThreadSize, BlockingQueue<Runnable> workerQueue,String threadName){this(coreThreadSize,maxThreadSize,0L,TimeUnit.SECONDS,workerQueue,threadName);}public AsyncThreadPoolUtil(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,String threadName){this.threadPoolExecutor = new AsyncThreadPool(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,new DefaultThreadFactory(threadName),new ThreadPoolExecutor.CallerRunsPolicy());}/*** The default thread factory*/static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory(String threadName) {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = threadName +poolNumber.getAndIncrement() +"-thread-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon()){t.setDaemon(false);}if (t.getPriority() != Thread.NORM_PRIORITY){t.setPriority(Thread.NORM_PRIORITY);}return t;}}/*** 执行runnable 任务* @param runnable 提交任务*/public void execute(Runnable runnable){this.threadPoolExecutor.execute(runnable);}/*** 提交异步任务* @param task 异步任务* @param <T> T* @return Future*/public <T>Future<T> submit(Callable<T> task){return this.threadPoolExecutor.submit(task);}}

编写Test进行验证

public class Test {public static void main(String[] args) {AsyncThreadPoolUtil pool = new AsyncThreadPoolUtil("demo-test-");for (int i=0;i<200;i++){pool.execute(()->{try{TimeUnit.MILLISECONDS.sleep(500);}catch (Exception e){}});}}}

输出信息见截图,​​​​​由此我们可以在任务执行前以及执行后进行任务的监控,同时可以队列情况。

源码分析

ThreadPoolExecutor类图:

我们从AsyncThreadPool 代码中调用super函数开始看起,该函数中传入:

1.核心线程数:默认情况下不会回收,可通过allowCoreThreadTimeOut函数设置回收,或者设置为0。若无需求,不建议进行核心线程回收。

2.最大线程数:该参数必须大于等于核心线程数,非核心线程数在队列中没有要继续执行任务时会进行回收。

3.非核心线程存活时间

4.非核心线程存活时间单位

5.任务存储队列:当无空闲线城时提交的任务会进入到队列进行等待执行。

6.创建线程工厂:用于创建初始化线程

7.拒绝策略:当无空闲线程且任务队列已满则执行决绝策略。

看完了构造函数创建之后,我们来看任务的提交。在Test中,我们通过pool.execute()函数来提交一个任务到线程池执行,在该函数我们看到线程池中的线程是在提交任务后才进行的初始化。

1. workerCountOf(c)统计当核心线程数量是否已经全部初始化了,如果没有,那么则直接通过addWorker()创建线程执行任务。

2.如果当前核心线程已经全部初始化了,那么则将任务快速添加到队列中,同时校验如果当前线程池已经关闭,那么则移除任务同时执行拒绝策略。如果当前线程池存活线程是0,那么添加工作线程进行任务执行。

3.如果在第2步中添加到任务队列时队列已满,则直接尝试创建非核心线程执行,如果非核心线程也无法创建,那么执行决绝策略。

接下来我们重点分析下 addWorker(Runnable firstTask, boolean core)的函数。

  • Runnable firstTask:要执行的第一个任务,如果为null,则表示新线程将从工作队列中获取任务。
  • boolean core:指示是否为核心线程,true表示是核心线程,false表示非核心线程。
  1. 循环尝试获取线程池状态runStateOf(ctl.get())):

    • 如果线程池状态大于或等于SHUTDOWN(即线程池正在关闭或已关闭),并且不是在关闭状态下添加新任务到非空队列,那么返回false,无法添加新工作线程。
  2. 检查工作线程数量

    • 获取当前线程池的工作线程数量(workerCountOf(c))。
    • 如果线程数量已经达到最大容量(CAPACITY),或者对于核心线程来说达到了corePoolSize,对于非核心线程来说达到了maximumPoolSize,则返回false,无法添加新工作线程。
    • 如果当前线程数量小于上述限制,并且成功通过compareAndIncrementWorkerCount(c)方法增加工作线程计数,则跳出循环。
  3. 创建新工作线程

    • 尝试创建新的Worker对象,它是一个继承了Thread的类,用于执行任务。
    • 如果新线程t不为空,并且线程池状态允许新线程启动(即runStateOf(ctl.get())小于SHUTDOWN或者在关闭状态下且firstTasknull),则将新工作线程添加到线程池的workers集合中,并标记为已添加(workerAdded = true)。
  4. 启动新工作线程

    • 如果工作线程成功添加,调用t.start()启动新线程,并将workerStarted标记为true
  5. 处理线程启动失败的情况

    • 如果新线程没有成功启动,调用addWorkerFailed(w)方法来处理失败情况,这可能包括移除工作线程计数和执行其他清理工作。
  6. 返回结果

    • 返回workerStarted,表示新工作线程是否成功启动。
    /*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started, running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked.  If the thread* creation fails, either due to the thread factory returning* null, or due to an exception (typically OutOfMemoryError in* Thread.start()), we roll back cleanly.** @param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** @param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* @return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

我们接着分析创建执行任务Worker(),它继承自AbstractQueuedSynchronizer并实现了Runnable接口。Worker类主要负责维护线程的中断状态和一些次要的记账工作,同时它也实现了任务的运行。

  • private static final long serialVersionUID:序列化ID,用于序列化机制。
  • final Thread thread:当前Worker线程运行的Thread对象。如果线程工厂创建线程失败,则为null
  • Runnable firstTask:当前Worker线程需要执行的第一个任务。如果没有初始任务,则为null
  • volatile long completedTasks:此线程完成的任务数量。

构造函数Worker(Runnable firstTask)

  • 调用setState(-1)初始化锁状态为-1,表示在runWorker方法执行之前禁止中断。
  • 初始化firstTask为传入的第一个任务。
  • 通过线程工厂创建新线程,并将其赋值给thread

运行方法:

  • public void run():将控制权委托给外部的runWorker方法,开始工作线程的主运行循环。

锁方法:

Worker类继承自AbstractQueuedSynchronizer,提供了锁的获取和释放方法。这些方法用于保护任务执行,防止在等待任务时被中断。

  • protected boolean isHeldExclusively():判断当前线程是否独占锁。
  • protected boolean tryAcquire(int unused):尝试获取锁。
  • protected boolean tryRelease(int unused):尝试释放锁。
  • public void lock():获取锁。
  • public boolean tryLock():尝试获取锁,如果锁被占用则立即返回false
  • public void unlock():释放锁。
  • public boolean isLocked():判断锁是否被占用。

中断方法:

  • void interruptIfStarted():如果线程已经开始运行并且尚未中断,则尝试中断该线程。这个方法用于在工作线程等待新任务时,如果线程池正在关闭,则中断工作线程。

Worker类是ThreadPoolExecutor线程池中每个工作线程的抽象表示。它负责维护线程的运行状态、锁状态和任务执行状态。通过继承AbstractQueuedSynchronizerWorker类提供了一个简单的互斥锁,以确保在执行任务时不会被中断。此外,Worker类还提供了中断控制,以确保在适当的时候中断

工作线程,特别是在线程池关闭时。

    /*** Class Worker mainly maintains interrupt control state for* threads running tasks, along with other minor bookkeeping.* This class opportunistically extends AbstractQueuedSynchronizer* to simplify acquiring and releasing a lock surrounding each* task execution.  This protects against interrupts that are* intended to wake up a worker thread waiting for a task from* instead interrupting a task being run.  We implement a simple* non-reentrant mutual exclusion lock rather than use* ReentrantLock because we do not want worker tasks to be able to* reacquire the lock when they invoke pool control methods like* setCorePoolSize.  Additionally, to suppress interrupts until* the thread actually starts running tasks, we initialize lock* state to a negative value, and clear it upon start (in* runWorker).*/private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in.  Null if factory fails. */final Thread thread;/** Initial task to run.  Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker  */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}

线程池的工作原理图解: 

以下是对本篇文章的的总结:

主要功能和特点:

  1. 线程池管理ThreadPoolExecutor允许你控制线程的创建和销毁,以及任务的执行和管理。
  2. 参数化配置:提供了多个参数来调整线程池的行为,包括核心线程数、最大线程数、线程存活时间、工作队列等。
  3. 任务执行:可以执行任何实现了Runnable接口的任务。
  4. 线程复用:通过重用线程来执行新任务,减少了线程创建和销毁的开销。
  5. 拒绝策略:当任务太多,无法被线程池及时处理时,可以定义拒绝策略来处理新提交的任务。

关键组件:

  • 核心线程数:即使它们是空闲的,也会保持一定数量的线程。
  • 最大线程数:线程池中允许的最大线程数量。
  • 工作队列:用于存放待执行任务的队列。
  • 线程工厂:用于创建新线程。
  • 拒绝执行处理器:当任务太多,无法被线程池及时处理时,定义了如何处理新提交的任务。

方法概览:

  • execute(Runnable command):提交一个任务给线程池执行。
  • shutdown():平滑地关闭线程池,不再接受新任务,但会处理完已提交的任务。
  • shutdownNow():尝试立即停止所有正在执行的任务,并返回等待执行的任务列表。
  • isShutdown()isTerminating()isTerminated():检查线程池的状态。
  • awaitTermination(long timeout, TimeUnit unit):等待线程池终止。
  • setCorePoolSize(int corePoolSize)setMaximumPoolSize(int maximumPoolSize):动态调整线程池的大小。
  • getQueue():获取当前的任务队列。

拒绝策略:

  • AbortPolicy:默认策略,当任务不能被接受时抛出异常。
  • CallerRunsPolicy:用调用者线程来运行任务。
  • DiscardPolicy:直接丢弃任务。
  • DiscardOldestPolicy:丢弃队列中最旧的任务,并尝试再次提交新任务。

扩展性:

ThreadPoolExecutor提供了多个钩子方法,如beforeExecute(Thread t, Runnable r)afterExecute(Runnable r, Throwable t),允许在任务执行前后进行自定义操作。

这个类是Java并发包中的核心组件,为多线程编程提供了强大的工具,使得任务的并发执行更加高效和易于管理。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com