Java多线程
下面的运行顺序仅供参考,开启多线程运行顺序不一定是按照顺序和我给的顺序执行的。
创建线程池的个数
- 任务性质:任务是CPU密集型还是I/O密集型?CPU密集型任务可能需要更多的线程来充分利用CPU资源,而I/O密集型任务可能需要更少的线程,因为线程会在I/O操作时阻塞。
- 系统资源:考虑系统的硬件资源情况,包括CPU核数、内存等。通常线程池的大小不应该超过CPU核数的几倍,以免过度消耗系统资源。
- 任务响应时间:线程池过小可能导致任务排队等待执行,影响任务的响应时间;线程池过大可能会增加线程切换开销,影响系统性能。
- 任务量:考虑任务的提交速率和处理速度,如果任务提交速率较快,可能需要更大的线程池来处理任务,以减少任务排队等待时间。
一般来说,可以根据以下经验法则来选择线程池的大小:
- 对于CPU密集型任务,线程池大小可以设置为
N + 1
,其中N
是CPU核数。 - 对于I/O密集型任务,线程池大小可以设置为
2N
到2N + 1
,其中N
是CPU核数。
创建线程
- 使用
thread
可以创建线程,关于创建线程注意事项看下面 - 使用
Runnable
创建线程
使用Thread创建线程
- 使用
Thread
创建线程不能使用run()
去启动线程,run()
是启动线程,但是不是开启线程,也就是说如果使用run()
启动是同步启动并不是开启线程启动。
public class ThreadTest0 {public static void main(String[] args) {System.out.println("这是开头内容。。。");// 创建线程Thread thread = new Thread(() -> {System.out.println("使用Thread创建线程");});// 启动线程thread.start();// 创建线程,并启动线程new Thread(() -> {System.out.println("使用Thread创建线程2");}).start();System.out.println("这是最后内容");}
}
输出顺序,输出顺序不一定会是我这样的,因为创建线程时机不确定
这是开头内容。。。 使用Thread创建线程 这是最后内容 使用Thread创建线程2
使用Thread和Runnable创建线程
错误示例
- 使用
Runnable
;如果直接调用run()
方法,那么线程的代码将会在当前线程中同步执行,而不会创建一个新的线程来执行。
Runnable runnable = () -> System.out.println("使用Runnable启动线程");
runnable.run();
正确示例
public class ThreadTest1 {public static void main(String[] args) {System.out.println("线程学习开始。。。");Runnable runnable = () -> {System.out.println("线程学习。。。");};System.out.println("线程结束---1。。。");Thread thread = new Thread(runnable);thread.start();System.out.println("线程结束---2。。。");}
}
大致顺序如下:
线程学习开始。。。 线程结束---1。。。 线程结束---2。。。 线程学习 Runnable。。。
使用继承方式创建线程
实现Runnable
public class RunnableTest1 implements Runnable {/*** Runs this operation.*/@Overridepublic void run() {System.out.println("实现 Runnable 接口");}
}
继承Thread
public class ThreadExtendTest extends Thread {public void run() {System.out.println("继承并运行线程。。。");}
}
使用创建和继承类
public class ThreadTest2 {public static void main(String[] args) {// 继承Thread类ThreadExtendTest threadExtendTest = new ThreadExtendTest();threadExtendTest.start();// 实现Runnable接口RunnableTest1 runnableTest1 = new RunnableTest1();runnableTest1.run();}
}
运行顺序
继承并运行线程。。。 实现 Runnable 接口
线程的竞态竞争
产生条件
竞态条件(Race Condition)是多线程编程中常见的一个问题,它指的是多个线程在访问共享资源时,由于执行顺序不确定或者操作时序不当导致的不确定性问题。竞态条件产生的条件包括:
-
共享资源: 多个线程同时访问同一个共享资源,比如共享变量、共享对象、文件等。
-
至少有一个线程对共享资源进行了写操作: 至少有一个线程对共享资源进行了写操作,而不仅仅是读操作。
-
并发访问: 多个线程同时访问共享资源,且执行顺序不确定。
当这些条件同时满足时,就有可能产生竞态条件。在竞态条件下,由于线程执行顺序的不确定性,可能会导致程序出现意料之外的结果,甚至导致程序崩溃。
示例代码
- 按照下面代码所示,正常在单线程运行中所给的值应当是
10*1000=10000
但是在开启线程情况下,这个值是不确定的,每一次运行的值都是不一样的。 - 但是如果将
Integer
换成int
出现值不一样的概率会小一点但是还是会出现值不一样,但是为什么换成Interger
出现概率会小一点?- 使用
int
的情况:- 对于
int
类型,它是基本数据类型,具有原子性操作,即对int
类型的读取和赋值操作是原子的,不会被打断。 - 如果使用
int
类型来声明count
和threadCount
,虽然仍然存在资源竞争,但由于int
的操作是原子的,所以可能出现的结果相对更加可控,不容易出现值不一样的情况。
- 对于
- 使用
Integer
的情况:- 而对于
Integer
类型,它是对象类型,不具备原子性操作。对Integer
类型的操作可能会涉及到自动装箱(autoboxing)和拆箱(unboxing)的过程,这些过程不是原子的。 - 当多个线程同时对
Integer
类型进行操作时,可能会出现线程安全问题,导致值不一样的情况更加频繁地发生。
- 而对于
- 使用
public class ThreadTest3 {private static Integer count = 0;private static Integer threadCount = 0;/*** * 资源竞争代码示例*/public static void main(String[] args) {// 这是单线程下没有竞争的示例for (int i = 0; i < 10; i++) {for (int j = 0; j < 1000; j++) {count++;}}// 有资源竞争,值不确定是多少,每一次运行都不一样for (int i = 0; i < 10; i++) {new Thread(() -> {for (int j = 0; j < 1000; j++) {threadCount++;}}).start();}try {Thread.sleep(1000);} catch (InterruptedException e) {System.out.println(e.getMessage());}System.out.println("count 值:" + count);System.out.println("threadCount 值:" + threadCount);}
}
运行第一次的值:
count 值:10000 threadCount 值:3104
运行第二次的值
count 值:10000 threadCount 值:8964
尽管使用
int
类型仍然存在资源竞争,但由于int
的操作是原子的,因此相对于Integer
类型,可能会减少值不一样的概率。而使用Integer
类型时,由于其非原子性操作,容易出现竞态条件,导致值不一样的情况更为频繁。
解决办法
使用线程同步,为了效果更佳明显,我们还是使用Interger
类型去存储。
使用锁解决
public class ThreadTest3 {private static int count = 0;private static Integer threadCount = 0;private static Integer lockCount = 0;private static Integer lockReentrantLockCount = 0;private static final Object lockObject = new Object();private static final ReentrantLock lock = new ReentrantLock();/*** * 资源竞争代码示例*/public static void main(String[] args) {// 这是单线程下没有竞争的示例for (int i = 0; i < 10; i++) {for (int j = 0; j < 1000; j++) {count++;}}// 有资源竞争,值不确定是多少,每一次运行都不一样for (int i = 0; i < 10; i++) {new Thread(() -> {for (int j = 0; j < 1000; j++) {threadCount++;}}).start();}// 使用对象锁解决这个问题for (int i = 0; i < 10; i++) {new Thread(() -> {synchronized (lockObject) {for (int j = 0; j < 1000; j++) {lockCount++;}}}).start();}// 使用锁解决for (int i = 0; i < 10; i++) {new Thread(() -> {lock.lock();try {for (int j = 0; j < 1000; j++) {lockReentrantLockCount++;}} finally {lock.unlock();}}).start();}try {Thread.sleep(1000);} catch (InterruptedException e) {System.out.println(e.getMessage());}System.out.println("count 值:" + count);System.out.println("threadCount 值:" + threadCount);System.out.println("lockCount 值:" + lockCount);System.out.println("lockReentrantLockCount 值:" + lockReentrantLockCount);}
}
使用对象锁
使用对象锁之后值不一样的问题就不存在了。使用对像锁,常使用:一个类的实例或者系统提供的Object
作为锁对象。
-
对象锁是通过
synchronized
关键字或lock
对象来实现的,它可以确保同一时间只有一个线程可以获取到锁,从而保证了互斥访问共享资源的目的。对象锁的好处包括:- 互斥性:对象锁可以确保同一时间只有一个线程可以执行被锁定的代码块或方法,避免了多个线程同时访问共享资源导致的数据不一致性和竞态条件问题。
- 可见性:对象锁不仅提供互斥性,还提供了可见性。当一个线程获取到对象锁后,它会释放之前的所有修改,使得其他线程在获取锁后可以看到最新的共享资源状态。
- 等待与唤醒机制:对象锁提供了等待与唤醒机制,使得线程可以在获取不到锁时进入等待状态,并在适当的时机被唤醒继续执行。
-
需要注意的是,对象锁是基于对象的,每个对象都可以拥有一个锁。在使用对象锁时,需要注意以下几点:
- 锁的粒度:锁的粒度应该尽量小,只锁定需要保护的共享资源,避免锁定过多的代码,以提高并发性能。
- 锁的选择:应该选择合适的对象作为锁,通常选择私有的、不可变的对象作为锁,避免与其他部分代码共享锁,减少锁冲突。
- 避免锁的过度竞争:在设计并发程序时,应该尽量避免多个线程竞争同一个锁,以充分发挥并发性能。
- 不适合作为对象锁的东西:不适合作为对象锁的东西包括字符串常量、基本类型的包装类对象、数组等,因为它们是可变的或在多个地方共享,容易导致锁的不准确性。
-
什么是:字符串常量、基本类型的包装类对象、数组等?
Boolean
:对应基本类型boolean
的包装类。Byte
:对应基本类型byte
的包装类。Short
:对应基本类型short
的包装类。Integer
:对应基本类型int
的包装类。Long
:对应基本类型long
的包装类。Float
:对应基本类型float
的包装类。Double
:对应基本类型double
的包装类。Character
:对应基本类型char
的包装类。
-
数组是一种容纳多个相同类型元素的数据结构,可以是基本类型的数组或对象数组。例如,
int[]
是基本类型int
的数组,String[]
是对象类型String
的数组。
public static void main(String[] args) {// 这是单线程下没有竞争的示例for (int i = 0; i < 10; i++) {for (int j = 0; j < 1000; j++) {count++;}}// 有资源竞争,值不确定是多少,每一次运行都不一样for (int i = 0; i < 10; i++) {new Thread(() -> {for (int j = 0; j < 1000; j++) {threadCount++;}}).start();}// 使用对象锁解决这个问题for (int i = 0; i < 10; i++) {new Thread(() -> {synchronized (lockObject) {for (int j = 0; j < 1000; j++) {lockCount++;}}}).start();}try {Thread.sleep(1000);} catch (InterruptedException e) {System.out.println(e.getMessage());}System.out.println("count 值:" + count);System.out.println("threadCount 值:" + threadCount);System.out.println("lockCount 值:" + lockCount);
}
}
运行结果
count 值:10000 threadCount 值:3368 lockCount 值:10000
对数组操作的竞争条件
如果只涉及到添加操作而不涉及删除和修改,并且每个线程在数组中添加内容的位置不重叠,通常情况下不会发生竞争条件。但是为了确保线程安全,最好还是考虑使用线程安全的数据结构或适当的同步机制。
public class ThreadTest4 {public static void main(String[] args) throws Exception {// 同步添加数组ArrayList<Integer> list = new ArrayList<>();for (int i = 0; i < 10; i++) {for (int j = 0; j < 1000; j++) {list.add(j);}}// 线程添加数组ArrayList<Integer> list1 = new ArrayList<>();// 会有竞争条件ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();// 不会出现竞争条件CopyOnWriteArrayList<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();// 不会出现竞争条件List<Integer> synchronizedList = Collections.synchronizedList(list);// 这个数组内容会是正常的两倍(没有竞争条件的数组),因为复制的是之前存在的数组for (int i = 0; i < 10; i++) {new Thread(() -> {for (int j = 0; j < 1000; j++) {list1.add(j);copyOnWriteArrayList.add(j);synchronizedList.add(j);concurrentLinkedQueue.add(j);}}).start();}Thread.sleep(1000);System.out.println("list 长度:" + list.size());System.out.println("list1 长度:" + list1.size());System.out.println("copyOnWriteArrayList 长度:" + copyOnWriteArrayList.size());System.out.println("synchronizedList 长度:" + synchronizedList.size());System.out.println("concurrentLinkedQueue 长度:" + concurrentLinkedQueue.size());}
}
执行结果
list 长度:20000 list1 长度:9965 copyOnWriteArrayList 长度:10000 synchronizedList 长度:20000 concurrentLinkedQueue 长度:10000
除了
CopyOnWriteArrayList
,Java中还提供了其他线程安全的集合类。以下是一些常见的线程安全集合类:
Vector
:它是一个传统的线程安全的动态数组,类似于ArrayList
,但所有的操作都是同步的。然而,由于性能原因,推荐使用ArrayList
或CopyOnWriteArrayList
。
Hashtable
:它是一个传统的线程安全的哈希表实现,类似于HashMap
,但所有的操作都是同步的。然而,由于性能原因,推荐使用HashMap
或ConcurrentHashMap
。
Collections.synchronizedList
:这是一个工具方法,可以将非线程安全的List
转换为线程安全的List
。例如:List<Integer> list = new ArrayList<>(); List<Integer> synchronizedList = Collections.synchronizedList(list);
Collections.synchronizedSet
:这是一个工具方法,可以将非线程安全的Set
转换为线程安全的Set
。
Collections.synchronizedMap
:这是一个工具方法,可以将非线程安全的Map
转换为线程安全的Map
。
ConcurrentHashMap
:它是一个高效的并发哈希表实现,提供了线程安全的操作。它采用了分段锁的机制,可以支持多个线程同时进行读操作,而不需要互斥。
ConcurrentSkipListSet
:它是一个基于跳表(Skip List)的并发有序集合,提供了线程安全的操作。
ConcurrentSkipListMap
:它是一个基于跳表(Skip List)的并发有序映射表,提供了线程安全的操作。
创建线程池
创建线程池示例
这里写个示例代码,下面都会使用try-resource
方式去写,这种方式最后需要关闭资源,但是try-resource
。
ExecutorService pool1 = Executors.newFixedThreadPool(4);
pool1.execute(() -> {});
pool1.submit(() -> {});pool1.shutdown();
pool1.shutdownNow();
pool1.close();
关于Executors如何创建线程池
不需要过多解释的函数API
pool.isShutdown()
:判断线程池是否已经调用了shutdown()
方法进行关闭。pool.isTerminated()
:判断线程池中的所有任务是否已经执行完成并且线程池已经关闭。pool.shutdownNow()
:立即关闭线程池,并尝试终止所有正在执行的任务。该方法会返回一个包含未执行完成的任务的List<Runnable>
对象。pool.shutdown()
:优雅地关闭线程池。该方法会等待线程池中的所有任务执行完成后再关闭。- pool.awaitTermination(long timeout, TimeUnit unit):等待线程池中的所有任务执行完成,或者等待超时。该方法会阻塞当前线程,直到满足条件或超时。
public class ThreadTest6 {public static void main(String[] args) {// 创建一个可缓存的线程池,该线程池的线程数量可以根据任务的需求进行自动调整。// 当有新的任务提交时,如果有空闲线程,则立即执行;如果没有空闲线程,则创建新的线程。// 当线程空闲一段时间后,如果线程池中的线程数量超过了核心线程数(默认为0),则这些空闲线程将被终止。try (ExecutorService pool = Executors.newCachedThreadPool()) {pool.submit();pool.execute();pool.awaitTermination();pool.invokeAll();pool.isShutdown();pool.isTerminated();pool.shutdownNow();pool.shutdown();}// 创建一个固定大小的线程池,该线程池中的线程数量始终保持不变。// 当有新的任务提交时,如果线程池中有空闲线程,则立即执行;// 如果没有空闲线程,则任务将等待,直到有线程可用为止。try (ExecutorService pool = Executors.newFixedThreadPool(4)) {}// 创建一个单线程的线程池,该线程池中只有一个工作线程。// 所有提交的任务按照顺序执行,即使任务抛出异常也不会影响后续任务的执行。try (ExecutorService pool = Executors.newSingleThreadExecutor()) {}// 创建一个固定大小的线程池,该线程池可以执行定时任务和周期性任务。// 除了执行普通任务外,还可以使用 schedule() 和 scheduleAtFixedRate() 方法调度任务的执行。try (ExecutorService pool = Executors.newScheduledThreadPool(4)) {}// 创建一个单线程的线程池,该线程池可以执行定时任务和周期性任务。// 与 newScheduledThreadPool() 类似,但只有一个工作线程。try (ExecutorService pool = Executors.newSingleThreadScheduledExecutor()) {}// 创建一个工作窃取线程池,该线程池基于 Fork/Join 框架。// 它根据可用处理器的数量创建并行线程来执行任务,并且可以自动处理任务的分割和合并。try (ExecutorService pool = Executors.newWorkStealingPool()) {}}
}
submit()提交作用
将任务提交给线程池进行执行,并返回一个表示任务执行结果的 Future 对象。该方法适用于不需要立即获取任务执行结果的情况。
-
在代码中,可以执行关于
Runnable
表达式;线程池会根据可用的线程资源来调度任务的执行。具体的执行顺序可能受到多个因素的影响,例如线程池中的线程数量、任务的执行时间等。无法保证多个任务的执行顺序与调用submit
方法的顺序完全一致。 -
在使用
try-resource
中当try
代码块执行结束时,会自动调用ExecutorService
对象的close()
方法来关闭线程池,并释放相关资源。因此,在这种情况下,不需要显式调用shutdown()
方法来关闭线程池。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;public class ThreadTest7 {public static void main(String[] args) throws ExecutionException, InterruptedException {try (ExecutorService pool = Executors.newFixedThreadPool(4)) {Runnable runnable = () -> {System.out.println("执行的任务1");};pool.submit(runnable);Future<Integer> future = pool.submit(() -> {System.out.println("执行的任务2");return 1;});System.out.println(future.get());}}
}
执行顺序
执行的任务1 执行的任务2 1
submit
方法和execute
方法的主要区别在于返回值。submit
方法会返回一个Future
对象,可以通过该对象获取任务的执行结果或取消任务的执行。而execute
方法没有返回值,无法获取任务的执行结果。
execute()
将任务提交给线程池进行执行,但不返回任务执行结果。该方法适用于不需要获取任务执行结果或无需处理任务执行异常的情况。
pool.execute(() -> {System.out.println("execute 执行任务");
});
pool.execute(() -> {System.out.println("execute 执行任务2");
});
invokeAll()
提交一组任务给线程池执行,并返回一个包含所有任务执行结果的 List<Future> 对象。该方法会阻塞当前线程,直到所有任务执行完成。
public class ThreadTest8 {public static void main(String[] args) throws Exception {try (ExecutorService pool = Executors.newFixedThreadPool(4)) {Callable<String> task1 = () -> {Thread.sleep(2000);return "Task 1";};Callable<String> task2 = () -> {Thread.sleep(3000);return "Task 2";};Callable<String> task3 = () -> {Thread.sleep(1500);return "Task 3";};List<Callable<String>> tasks = Arrays.asList(task1, task2, task3);// 执行所有任务List<Future<String>> results = pool.invokeAll(tasks);results.forEach(result -> {try {String s = result.get();System.out.println(s);} catch (Exception e) {System.out.println(e.getMessage());}});}}
}
SpringBoot中设置ThreadPoolExecutor
ThreadPoolExecutor
的相关API和Executors
差不多就不介绍了,只是ThreadPoolExecutor
可以定制化比较灵活- 常用配置如下
setCorePoolSize(int corePoolSize)
:设置核心线程数,即线程池中始终保持的最小线程数。setMaxPoolSize(int maxPoolSize)
:设置最大线程数,即线程池中允许的最大线程数。setQueueCapacity(int queueCapacity)
:设置任务队列的容量,即允许的最大等待任务数。setKeepAliveSeconds(int keepAliveSeconds)
:设置线程空闲后的存活时间,超过这个时间的空闲线程会被销毁。setThreadNamePrefix(String threadNamePrefix)
:设置线程名称的前缀,用于区分线程池中的线程。setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler)
:设置拒绝策略,用于处理任务执行达到上限时的拒绝行为。setTaskDecorator(TaskDecorator taskDecorator)
:设置任务装饰器,用于包装提交到线程池的任务。setAwaitTerminationSeconds(int awaitTerminationSeconds)
:设置线程池关闭时等待所有任务完成的超时时间。setThreadFactory(ThreadFactory threadFactory)
:设置线程工厂,用于创建线程。setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut)
:设置是否允许核心线程超时。setWaitForTasksToCompleteOnShutdown(boolean waitForTasksToCompleteOnShutdown)
:设置线程池关闭时是否等待所有任务完成。setAwaitTerminationSeconds(int awaitTerminationSeconds)
:设置线程池关闭时等待所有任务完成的超时时间。
@Bean
public ThreadPoolTaskExecutor threadPoolExecutor() {ThreadPoolTaskExecutor poolExecutor = new ThreadPoolTaskExecutor();// 设置核心线程数为2,即线程池中始终保持的最小线程数poolExecutor.setCorePoolSize(2);// 设置最大线程数为4,即线程池中允许的最大线程数poolExecutor.setMaxPoolSize(4);// 设置任务队列的容量为1000,即允许的最大等待任务数poolExecutor.setQueueCapacity(1000);// 初始化线程池poolExecutor.initialize();return poolExecutor;
}
异步方法
Callable和Future
Callable常常和Future一起使用,提供异步方法。
get
获取结果,阻塞当前代码,直到等到结果或者超时,超时抛出异常get(时间,时间类型)
设置超时时间
isDone
是否完成cancel
取消任务isCancelled
是否在完成前取消
public class AsyncTest {public static void main(String[] args) throws Exception {try (ExecutorService executorService = Executors.newSingleThreadExecutor()) {Callable<Integer> task = () -> {Thread.sleep(1000);return 42;};Future<Integer> future = executorService.submit(task);// 是否完成boolean done = future.isDone();System.out.println("是否完成:" + done);// 等待任务执行完成并获取结果,方法会阻塞,直到等到结果或者结果超时Integer result = future.get();// 取消操作boolean cancel = future.cancel(true);System.out.println("是否取消:" + cancel);// 是否在任务完成前取消,如果是返回trueboolean cancelled = future.isCancelled();System.out.println("是否在任务完成前取消:" + cancelled);System.out.println("任务结果: " + result);}}
}
输出顺序
是否完成:false 是否取消:false 是否在任务完成前取消:false 任务结果: 42
Future
public class FutureTest {public static void main(String[] args) throws Exception {try (ExecutorService executorService = Executors.newSingleThreadExecutor()) {Future<Integer> future = executorService.submit(() -> {// 模拟一个耗时的计算任务Thread.sleep(1000);return 42;});// 检查任务是否完成if (future.isDone()) {// 等待任务执行完成并获取结果Integer result = future.get();System.out.println("任务结果: " + result);} else {System.out.println("任务尚未完成");}Integer result = future.get();System.out.println("任务结果: " + result);executorService.shutdown();}}
}
FutureTask
public class FutureTaskTest1 {public static void main(String[] args) throws Exception {FutureTask<Integer> futureTask = new FutureTask<>(() -> {// 模拟一个耗时的计算任务Thread.sleep(1000);return 42;});Thread thread = new Thread(futureTask);thread.start();// 等待任务执行完成并获取结果Integer result = futureTask.get();System.out.println("任务结果: " + result);}
}
CompletableFuture
常规使用示例
可以将这个当成Stream流的形式调用,在处理上会方便点
public class CompletableFutureTest8 {public static void main(String[] args) {CompletableFuture.completedFuture("https://leetcode.cn/problemset/algorithms/?difficulty=EASY&page=1&status=NOT_STARTED&sorting=W3sic29ydE9yZGVyIjoiQVNDRU5ESU5HIiwib3JkZXJCeSI6IkZST05URU5EX0lEIn1d").thenApply(url -> {try {HttpRequest httpRequest = HttpRequest.newBuilder().GET().uri(new URI(url)).build();System.out.println("httpRequest构建完成。。。");HttpClient client = HttpClient.newBuilder().build();return client.send(httpRequest, HttpResponse.BodyHandlers.ofString());} catch (Exception e) {System.out.println(e.getMessage());return null;}}).thenAccept(response -> {System.out.println(response.body());});}
}
基础案例
public class CompletableFutureTest {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {// 模拟一个耗时的计算任务try {Thread.sleep(1000);} catch (InterruptedException e) {System.out.println(e.getMessage());}return 42;});future.thenAccept(result -> {System.out.println("任务结果1: " + result);});// 等待所有任务完成CompletableFuture.allOf(future).join();// 返回结果为voidfuture.thenAccept(result -> System.out.println("任务结果2: " + result));// thenApply 的链式调用CompletableFuture<String> thenApply = future.thenApply(result -> {System.out.println("任务结果3: " + result);return "返回内容";}).thenApply(result -> {System.out.println("继续调用,看下之前结果:" + result);return "最后一次返回";});System.out.println(thenApply.get());}
}
执行顺序
任务结果1: 42 任务结果2: 42 任务结果3: 42 继续调用,看下之前结果:返回内容 最后一次返回
allOf
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
该方法接受一组 CompletableFuture
对象,并返回一个新的 CompletableFuture
,在所有输入的 CompletableFuture
对象都完成后完成。
可以使用 join()
方法等待所有任务完成。
public class CompletableFutureTest2 {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");// 传入所有的任务返回(CompletableFuture)// 等待所有任务都完成CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);// 阻塞线程allFutures.join();System.out.println("所有任务都完成了。。。");}
}
anyOf
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
该方法接受一组 CompletableFuture
对象,并返回一个新的 CompletableFuture
,在任意一个输入的 CompletableFuture
对象完成后完成。
可以使用 join()
方法获取第一个完成的任务结果。
public class CompletableFutureTest3 {public static void main(String[] args) {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");// 可以传入多个任务(CompletableFuture)// 任意一个任务完成CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);// 阻塞获取结果Object result = anyFuture.join();System.out.println("任意一个任务完成: " + result);}
}
thenApply
CompletableFuture<T> thenApply(Function<? super T,? extends U> fn)
该方法接受一个 Function
对象作为参数,用于对任务的结果进行转换。返回一个新的 CompletableFuture
,其结果类型为转换后的类型。
链式调用和JavaScript中promise有点像
public class CompletableFutureTest4 {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);// 链式调用和JavaScript中promise有点像CompletableFuture<String> transformedFuture = future.thenApply(result -> "Result1: " + result).thenApply(result -> "Result2: " + result);String result = transformedFuture.join();System.out.println(result);}
}
thenAccept
public class CompletableFutureTest5 {public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "对");CompletableFuture<Void> consumedFuture = future.thenAccept(result -> System.out.println("我说的对吧: " + result)).thenAccept(result -> System.out.println("Result2: " + result)).thenAccept(result -> System.out.println("这个也可以链式调用,只是没有返回值" + result));consumedFuture.join();}
}
执行顺序
我说的对吧: 对 Result2: null 这个也可以链式调用,只是没有返回值null
thenCompose
CompletableFuture<T> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
该方法接受一个 Function
对象作为参数,用于将任务的结果传递给下一个异步操作。返回一个新的 CompletableFuture
,其结果类型为下一个异步操作的结果类型。
public class CompletableFutureTest6 {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);// 对结果调用函数并执行返回 FutureCompletableFuture<String> composedFuture = future.thenCompose(result -> CompletableFuture.supplyAsync(() -> "Result: " + result));// 拿到返回值String result = composedFuture.join();System.out.println(result);}
}
执行顺序
Result: 42
exceptionally
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
该方法接受一个 Function
对象作为参数,用于处理异常情况并返回一个默认值。返回一个新的 CompletableFuture
,其结果类型为原始任务的结果类型。
public class CompletableFutureTest7 {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {// 模拟一个抛出异常的任务throw new RuntimeException("Task failed");});CompletableFuture<Integer> handledFuture = future.exceptionally(ex -> {System.out.println("Exception 消息是什么呢?: " + ex.getMessage());return 0;});int result = handledFuture.join();System.out.println("Result: " + result);}
}
执行顺序
Exception 消息是什么呢?: java.lang.RuntimeException: Task failed Result: 0
剩下几种
-
thenRun(Runnable action)
和thenRunAsync(Runnable action)
thenRun(Runnable action)
:当前一个任务完成时,运行指定的Runnable
,不关心前一个任务的结果。thenRunAsync(Runnable action)
:与thenRun
类似,但是会在新的线程中异步执行指定的Runnable
。
示例代码:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("Task 1")).thenRun(() -> System.out.println("Task 2")).thenRunAsync(() -> System.out.println("Task 3")); future.join();
-
whenComplete(BiConsumer<? super T, ? super Throwable> action)
和whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
whenComplete(BiConsumer<? super T, ? super Throwable> action)
:当任务完成时,对任务的结果或异常进行处理。whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
:异步执行whenComplete
中指定的操作。
示例代码:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42).thenApply(result -> result * 2).whenComplete((result, ex) -> {if (ex == null) {System.out.println("Result: " + result);} else {System.out.println("Exception occurred: " + ex.getMessage());}}); future.join();
-
exceptionallyCompose(Function<? super Throwable, ? extends CompletionStage<T>> fn)
- 该方法用于处理异常情况,并返回一个新的
CompletionStage
对象以继续执行。
示例代码:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("Task failed"); }).exceptionallyCompose(ex -> CompletableFuture.completedFuture(0));int result = future.join(); System.out.println("Result: " + result);
- 该方法用于处理异常情况,并返回一个新的
-
completeOnTimeout(T value, long timeout, TimeUnit unit)
和orTimeout(long timeout, TimeUnit unit)
completeOnTimeout(T value, long timeout, TimeUnit unit)
:如果任务在指定的时间内未完成,则将结果设置为指定的值。orTimeout(long timeout, TimeUnit unit)
:如果任务在指定的时间内未完成,则会抛出TimeoutException
。
-
thenCombine(CompletionStage<U> other, BiFunction<? super T, ? super U, ? extends V> fn)
- 该方法接受另一个
CompletionStage
和一个BiFunction
,并在两个阶段都完成后将它们的结果一起传递给BiFunction
。
示例代码:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);int result = combinedFuture.join(); System.out.println("Combined Result: " + result);
- 该方法接受另一个