阻塞队列
概述和架构
分类和核心方法
这里是在讲 为了区分在不同场景下 调用的不同组实现方法
核心方法演示
package com.example.juc.queue;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class BlockingQueueDemo {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);// 第一组
// System.out.println(blockingQueue.add("a"));
// System.out.println(blockingQueue.add("b"));
// System.out.println(blockingQueue.add("c"));
// System.out.println(blockingQueue.element());// Exception in thread "main" java.lang.IllegalStateException: Queue full
// System.out.println(blockingQueue.add("d"));// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());// Exception in thread "main" java.util.NoSuchElementException
// System.out.println(blockingQueue.remove());// 第二组
// System.out.println(blockingQueue.offer("a"));
// System.out.println(blockingQueue.offer("b"));
// System.out.println(blockingQueue.offer("c"));
// // a
// System.out.println(blockingQueue.peek());
//
// // false
// System.out.println(blockingQueue.offer("d"));
//
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// // null
// System.out.println(blockingQueue.poll());// 第三组
// blockingQueue.put("a");
// blockingQueue.put("b");
// blockingQueue.put("c");
// // 会阻塞住 由于用的是定长的 ArrayBlockingQueue
blockingQueue.put("www");
//
//
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// // 阻塞
// System.out.println(blockingQueue.take());// 第四组System.out.println(blockingQueue.offer("a"));System.out.println(blockingQueue.offer("b"));System.out.println(blockingQueue.offer("c"));// 阻塞 3S 后退出 返回 false
// System.out.println(blockingQueue.offer("w", 3L, TimeUnit.SECONDS));System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());// 阻塞 3S 后退出 返回 nullSystem.out.println(blockingQueue.poll(3L, TimeUnit.SECONDS));}
}
线程池
概念
接口介绍以及提交任务方式对比
Executor
我们把实现了 Runnable
或者 Callable
接口的任务提交给线程池
Executor
框架负责任务的执行
提交任务可以使用 submit
和 execute
ExecutorService 中 execute 和 submit 的区别
ExecutorService 中的 execute 和 submit 方法都用于提交任务,但它们有一些关键区别:
1.返回类型:
execute(Runnable command): 这个方法没有返回值。它只接受一个 Runnable 对象并执行它。
submit(Callable task): 这个方法返回一个 Future 对象。它可以接受一个 Callable 对象或一个 Runnable 对象,并返回一个 Future,可以用来检查任务的状态或获取任务的结果。
- 异常处理:
execute: 如果任务在执行过程中抛出未捕获的异常,异常会直接传播到调用者线程。
submit: 如果任务在执行过程中抛出未捕获的异常,异常会被捕获并存储在返回的 Future 对象中。调用 Future.get() 方法时会抛出 ExecutionException,其 getCause() 方法返回实际的异常。
任务类型:
execute: 只能接受 Runnable 对象。
submit: 可以接受 Runnable 或 Callable 对象
public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(5);// 使用 execute 提交任务threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " 使用 execute 办理业务");});// 使用 submit 提交任务Future<String> future = threadPool.submit(() -> {System.out.println(Thread.currentThread().getName() + " 使用 submit 办理业务");return "任务完成";});try {// 获取 submit 任务的结果String result = future.get();System.out.println("submit 任务结果: " + result);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();} finally {threadPool.shutdown();}}
总结: execute 功能较简单,submit 可以看做增强版本,可以获取一个 future
ScheduledThreadThreadPoolExecutor
添加了调度任务执行的功能
常见线程池和创建线程池的底层原理
package com.example.juc.pool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;// 演示线程池 三种 常用分类
public class ThreadPoolDemo1 {public static void main(String[] args) {// 一池五线程ExecutorService threadPool1 = Executors.newFixedThreadPool(5);// 一池一线程ExecutorService threadPool2 = Executors.newSingleThreadExecutor();// 一池可扩容线程ExecutorService threadPool3 = Executors.newCachedThreadPool();// 10 个顾客请求try {for (int i = 0; i <= 10; i++) {// 执行threadPool3.execute(() -> {System.out.println(Thread.currentThread().getName() + " 办理业务");});}} catch (Exception e) {e.printStackTrace();} finally {threadPool3.shutdown();}}
}
底层都是创建 ThreadPoolExecutor
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}// 引出下面的一小节,ThreadPoolExecutor 总共有 七个参数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
- newFixedThreadPool
- 没有救急线程,也就无需空闲时间,阻塞队列无限
- 适合任务量已知,相对耗时任务
- newSingleThreadExecutor
- 希望多个任务排队执行,线程数固定一个,多余任务排队执行
- newCachedThreadPool
- 全部是救急线程,60s空闲回收时间,救急线程无限创建
- 适合任务比较密集,任务执行时间短
七个参数介绍
重点注意 keepAliveTIme 和 unit ,是用于救急线程在空闲后的回收
常用阻塞队列
- 容量为
Integer.MAX_VALUE
的<font style="color:#DF2A3F;">LinkedBlockingQueue</font>
(无界队列):FixedThreadPool
和SingleThreadExector
。FixedThreadPool
最多只能创建核心线程数的线程(核心线程数和最大线程数相等),SingleThreadExector
只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。
public class LinkedBlockingQueueExample {public static void main(String[] args) {// 创建一个容量为 5 的 LinkedBlockingQueue 队列LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(5);// 创建生产者线程Thread producer = new Thread(() -> {try {for (int i = 1; i <= 5; i++) {String element = "Element-" + i;System.out.println("Producer is putting: " + element);queue.put(element); // 阻塞直到有空间可放入元素System.out.println("Producer has put: " + element);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});// 创建消费者线程Thread consumer = new Thread(() -> {try {for (int i = 1; i <= 5; i++) {System.out.println("Consumer is waiting to take an element...");String element = queue.take(); // 阻塞直到有元素可取System.out.println("Consumer has taken: " + element);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});// 启动生产者和消费者线程producer.start();consumer.start();}
}
<font style="color:#DF2A3F;">SynchronousQueue</font>
(同步队列):CachedThreadPool
。SynchronousQueue
没有容量,不存储元素,插入时阻塞,直到元素被取出。DelayedWorkQueue
(延迟阻塞队列):ScheduledThreadPool
和SingleThreadScheduledExecutor
。DelayedWorkQueue
的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue
添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达Integer.MAX_VALUE
,所以最多只能创建核心线程数的线程。- ArrayBlockingQueue: 有界阻塞队列,内部是一个数组。需要指定队列的容量。适用于有界任务队列的场景。
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列。元素按照优先级顺序出队。适用于需要任务优先级调度的场景。
工作流程和拒绝策略
- 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
- 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
- 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
- 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,拒绝策略会调用
RejectedExecutionHandler.rejectedExecution()
方法。
自定义线程池
package com.example.juc.pool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;// 自定义线程池
public class ThreadPoolDemo2 {public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,5,2L,TimeUnit.SECONDS,new ArrayBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 10 个顾客请求try {for (int i = 0; i <= 150; i++) {// 执行threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " 办理业务");});}} catch (Exception e) {e.printStackTrace();} finally {threadPool.shutdown();}}
}