您的位置:首页 > 财经 > 金融 > Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier)

Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier)

2024/11/16 18:57:04 来源:https://blog.csdn.net/weixin_43564627/article/details/139525020  浏览:    关键词:Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier)

Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier)

@Async如何使用

使用@Async标注在方法上,可以使该方法异步的调用执行。而所有异步方法的实际执行是交给TaskExecutor的。

1.启动类添加@EnableAsync注解
2. 方法上添加@Async,类上添加@Component

三个异步方法

    @Asyncpublic void doTaskTwo( CyclicBarrier barry) throws Exception {System.out.println("开始做任务二");long start = System.currentTimeMillis();Thread.sleep(random.nextInt(3000));long end = System.currentTimeMillis();System.out.println("完成任务二,耗时:" + (end - start) + "毫秒");barry.await();}@Asyncpublic void doTaskThree( CyclicBarrier barry) throws Exception {System.out.println("开始做任务三");long start = System.currentTimeMillis();Thread.sleep(random.nextInt(5000));long end = System.currentTimeMillis();System.out.println("完成任务三,耗时:" + (end - start) + "毫秒");barry.await();}@Override@Asyncpublic void doTask2One(CountDownLatch count) throws Exception {System.out.println("开始做任务1");long start = System.currentTimeMillis();Thread.sleep(random.nextInt(3000));long end = System.currentTimeMillis();System.out.println("完成任务1,耗时:" + (end - start) + "毫秒");count.countDown();}

CyclicBarrier

@GetMapping("/doTask")public void doLogin() throws Exception {// 通过它可以实现让一组线程等待至某个状态之后再全部同时执行//第一个参数,表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。//第二个参数,表示用于在线程到达屏障时,优先执行barrierAction这个Runnable对象,方便处理更复杂的业务场景。CyclicBarrier barry = new CyclicBarrier(3, new Runnable() {@Overridepublic void run() {System.out.println("1111111111111");}});kafkaTopicService.doTaskOne(barry);kafkaTopicService.doTaskTwo(barry);kafkaTopicService.doTaskThree(barry);}

执行结果

开始做任务一
开始做任务二
开始做任务三
完成任务一,耗时:1263毫秒
完成任务二,耗时:2508毫秒
完成任务三,耗时:3753毫秒
1111111111111

注意: 接口响应成功时候 后台逻辑还在走
CyclicBarrier 的使用场景也很丰富。

比如,司令下达命令,要求 10 个士兵一起去完成项任务。

这时就会要求 10 个士兵先集合报到,接着,一起雄赳赳,气昂昂地去执行任务当 10 个士兵把自己手上的任务都执行完了,那么司令才能对外宣布,任务完成

CyclicBarrier 比 CountDownLatch 略微强大一些,它可以接收一个参数作为 barrierAction。

所谓 barrierAction 就是当计数器一次计数完成后,系统会执行的动作。

如下构造函数,其中, parties 表示计数总数,也就是参与的线程总数。

public CyclicBarrier(int parties, Runnable barrierAction) 

package com.shockang.study.java.concurrent.aqs;import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo {public static class Soldier implements Runnable {private String soldier;private final CyclicBarrier cyclic;Soldier(CyclicBarrier cyclic, String soldierName) {this.cyclic = cyclic;this.soldier = soldierName;}public void run() {try {//等待所有士兵到齐 第一次等待cyclic.await();doWork();//等待所有士兵完成工作 第二次等待cyclic.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}void doWork() {try {Thread.sleep(Math.abs(new Random().nextInt() % 10000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(soldier + ":任务完成");}}public static class BarrierRun implements Runnable {boolean flag;int N;public BarrierRun(boolean flag, int N) {this.flag = flag;this.N = N;}public void run() {if (flag) {System.out.println("司令:[士兵" + N + "个,任务完成!]");} else {System.out.println("司令:[士兵" + N + "个,集合完毕!]");flag = true;}}}public static void main(String args[]) throws InterruptedException {final int N = 10;Thread[] allSoldier = new Thread[N];boolean flag = false;CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));//设置屏障点,主要是为了执行这个方法System.out.println("集合队伍!");for (int i = 0; i < N; ++i) {System.out.println("士兵 " + i + " 报道!");allSoldier[i] = new Thread(new Soldier(cyclic, "士兵 " + i));allSoldier[i].start();}}
}

控制台输出

集合队伍!
士兵 0 报道!
士兵 1 报道!
士兵 2 报道!
士兵 3 报道!
士兵 4 报道!
士兵 5 报道!
士兵 6 报道!
士兵 7 报道!
士兵 8 报道!
士兵 9 报道!
司令:[士兵10个,集合完毕!]
士兵 0:任务完成
士兵 3:任务完成
士兵 6:任务完成
士兵 4:任务完成
士兵 9:任务完成
士兵 8:任务完成
士兵 2:任务完成
士兵 5:任务完成
士兵 7:任务完成
士兵 1:任务完成
司令:[士兵10个,任务完成!]

说明
上述代码第 65 行创建了 CyclicBarrier 实例,并将计数器设置为 10 ,要求在计数器达到指标时,执行第 51 行的 run() 方法。

每一个士兵线程都会执行第 18 行定义的 run() 方法。

在第 24 行,每一个士兵线程都会等待,直到所有的士兵都集合完毕。

集合完毕意味着 CyclicBarrier 的一次计数完成,当再一次调用 CyclicBarrier.await() 方法时,会进行下一次计数。

第 22 行模拟了士兵的任务。

当一个士兵任务执行完,他就会要求 CyclicBarrier 开始下次计数,这次计数主要目的是监控是否所有的士兵都己经完成了任务。

一旦任务全部完成,第 42 行定义的 BarrierRun 就会被调用,打印相关信息。

2、CountDownLatch

 @GetMapping("/doTask2")public void doTask2() throws Exception {CountDownLatch count = new CountDownLatch(3);kafkaTopicService.doTask2One(count);kafkaTopicService.doTask2Two(count);kafkaTopicService.doTask2Three(count);count.await();System.out.println("11111111111111111");}

执行结果:

开始做任务1
开始做任务二
开始做任务3
完成任务1,耗时:179毫秒
完成任务3,耗时:1829毫秒
完成任务二,耗时:2376毫秒
11111111111111111

注意: 接口响应成功时候 后台逻辑已经走完

1、juc中condition接口提供的await、signal、signalAll方法,需配合lock

List<Integer> villageList = new ArrayList<>();List<Integer> villageList2 = new ArrayList<>();villageList.add(1);villageList.add(2);villageList.add(3);ExecutorService threadPool = Executors.newFixedThreadPool(2);Lock lock = new ReentrantLock();Condition cond = lock.newCondition();for(int flag = 0;flag<villageList.size();flag++){Integer i = villageList.get(flag);threadPool.execute(() -> {try {villageList2.add(i);} catch (Exception e) {e.printStackTrace();}if(villageList2.size() == villageList.size()){lock.lock();cond.signal();lock.unlock();}});}lock.lock();try {cond.await(5, TimeUnit.SECONDS);lock.unlock();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(villageList2.size());threadPool.shutdown();
}

CountDownLatch和CyclicBarrier的比较

1.CountDownLatch是减计数方式,countDown()方法只参与计数不会阻塞线程,而CyclicBarrier是加计数方式,await()方法参与计数,会阻塞线程。
2.CountDownLatch计数为0无法重置,而CyclicBarrier计数达到初始值,则可以重置,因此CyclicBarrier可以复用。

附:CountDownLatch有发令枪的效果,可以用来并发测试

public class test2 {private final static CountDownLatch CountDownLatch=new CountDownLatch(100);public static void main(String[] args) throws Exception{for (int i=0;i<100;i++){new Thread(() -> {try {CountDownLatch.await();} catch (Exception e) {e.printStackTrace();}getUUID();}).start();CountDownLatch.countDown(); //如果减到0 统一出发,发枪开炮}}public static void getUUID(){UUID uuid= UUID.randomUUID();System.out.println(uuid);}
}

注意事项

  1. CountDownLatch 对象的计数器只能减不能增,即一旦计数器为 0,就无法再重新设置为其他值,因此在使用时需要根据实际需要设置初始值。

  2. CountDownLatch 的计数器是线程安全的,多个线程可以同时调用 countDown() 方法,而不会产生冲突。

  3. 如果 CountDownLatch 的计数器已经为 0,再次调用 countDown() 方法也不会产生任何效果。

  4. 如果在等待过程中,有线程发生异常或被中断,计数器的值可能不会减少到 0,因此在使用时需要根据实际情况进行异常处理。

当worker1线程由于异常没有执行countDown()方法,最后state结果不为0,导致所有线程停在AQS中自旋(死循环)。所以程序无法结束。(如何解决这个问题呢?请看案例二)

// 等待 3 个线程完成任务
if (!latch.await(5, TimeUnit.SECONDS)) {LOGGER.warn("{} time out", worker1.name);
}// 所有线程完成任务后,执行下面的代码
LOGGER.info("all workers have finished their jobs!");
  1. CountDownLatch 可以与其他同步工具(如 Semaphore、CyclicBarrier)结合使用,实现更复杂的多线程同步。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com