需求:用户可以开启任务,暂停任务和中止任务。
用户开启任务后,可以随时暂停或者中止。暂停后又可以回到原进度继续运行。
这里写目录标题
- demo版-使用废弃的stop、suspend、resume实现
- 为什么废弃了?
- 不用stop,如何销毁线程呢?
- 正式版
- 延迟版:wait和notify、join和interrupt、LockSupport
- 非延迟版:无法实现
- 分布式集群最终版
demo版-使用废弃的stop、suspend、resume实现
一个MyTask类来实现线程,一个MyButton来模拟界面(最开始是想开线程监听控制台的,但是日志打印的频率不好控制,所以就出现了MyButton类)
package com.example.springbootproject.thread;import lombok.extern.slf4j.Slf4j;@Slf4j
public class MyTask extends Thread {@Overridepublic void run() {// 开启运行业务代码...processBusiness();}private void processBusiness() {/*** 模拟业务运行,不用sleep,因为会抛出打断异常;*/for (int i = 0; i < 1000000; i++) {log.info("business running...{}", i);for (int j = 0; j < 1000000; j++) {for (int k = 0; k < 100000; k++) {for (int l = 0; l < 100000; l++) {log.info("business running...l is" + l);for (int m = 0; m < 100000; m++) {for (int n = 0; n < 1000; n++) {int aa = i +j +k+m+n;int bb = aa *aa - m -n -i -j;for (int o = 0; o < aa; o++) {bb = aa+ bb;}}}}}}}}public void mySuspend() {this.suspend();log.info("suspend success");}public void myReStart() {this.resume();log.info("resume success");}public void myStop() {this.stop();log.info("stop success");}
}
package com.example.springbootproject.thread;import lombok.extern.slf4j.Slf4j;import javax.swing.*;
import java.awt.*;
import java.awt.event.*;@Slf4j
public class MyButton {public static void main(String[] args) {// 创建一个 JFrame 窗口JFrame frame = new JFrame("My Button");frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);// 创建一个 JPanel 面板JPanel panel = new JPanel();panel.setLayout(new FlowLayout());MyTask myTask = new MyTask();// 创建一个 JButton 按钮JButton startbutton = new JButton("start");startbutton.addActionListener(new ActionListener() {public void actionPerformed(ActionEvent e) {myTask.start();log.info("start clicked!");}});final boolean[] flag = {false};// 创建一个 JButton 按钮JButton suspendbutton = new JButton("suspend");suspendbutton.addActionListener(new ActionListener() {public void actionPerformed(ActionEvent e) {if (flag[0] == false) {myTask.mySuspend();flag[0] =true;} else {myTask.myReStart();flag[0] =false;}log.info("suspend clicked!");}});// 创建一个 JButton 按钮JButton stopButton = new JButton("stop");stopButton.addActionListener(new ActionListener() {public void actionPerformed(ActionEvent e) {log.info("stop clicked!");myTask.myStop();}});// 将按钮添加到面板panel.add(startbutton);panel.add(stopButton);panel.add(suspendbutton);// 将面板添加到窗口frame.getContentPane().add(panel);// 设置窗口的大小和可见性frame.setSize(300, 200);frame.setVisible(true);}
}
暂停后,输出是32,取消暂停后,又从33开始输出。最后停止线程
但是jdk自带的这三个方法已经废弃了,所以不用。
为什么废弃了?
比如可以随时调用,可能会破坏线程的状态;导致死锁等问题。
但具体的代码还没有分析,先占个位置吧。TODO。
不用stop,如何销毁线程呢?
resume和suspend我们有很多函数可以代替。但是stop呢?
没有好办法。只能让线程里面的代码运行完,自己去关闭。
实际中都是用线程池去提交任务。那线程池的任务cancel可以吗?不可以。因为还是需要我们自己去控制当被打断时的逻辑
futureTask的cancel原码如下。传入一个布尔值,用来控制是否需要去打断当前任务。
- 首先进行cas操作,失败直接返回false;
- 如果设置了可打断,就去打断该任务
- 最后完成任务:里面的代码就是调用LockSupport.unpark打断线程
所以如果我们没有处理该打断标志位或者没有处理好打断异常,代码还是会继续运行。
正式版
正式版是用线程池去提交任务,和实际使用保持一致。
延迟版:wait和notify、join和interrupt、LockSupport
- wait和notify原理: 这俩都是获得了monitor对象(synchronized锁对象)后才能使用。
获得了obj的对象锁,当前线程调用obj.wait(),然后当前线程在其monitor对象上去等待,直到被打断(调用wait()前被打断,调用后被打断,都会抛出异常并清除打断标志)或者 其他线程调用了obj.notify或notifyall才可能会醒来。为啥可能呢?
因为notify唤醒它之后,他还要竞争锁成功才能真正被唤醒,否则就进入阻塞状态。 - join和interrupt: join,他不需要锁。当前线程调用了obj.join(),是当前线程 陷入阻塞。除非obj线程运行完成,或者线程被打断
我觉得无法实时去响应用户的操作,因为你如何让正在运行的 业务线程 去调用wait、join、LockSupport方法呢?
延迟版可以实现,可以对任务进行分步,每一步都可以用一个标志位去判断,如果为true,表示被暂停。
下面贴一个wait和notify版的,其他的join和locksupport也都可以实现,不再赘述
package com.example.springbootproject.thread;import lombok.extern.slf4j.Slf4j;import javax.swing.*;
import java.awt.*;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j
public class MyButton {public static void main(String[] args) {// 创建一个 JFrame 窗口JFrame frame = new JFrame("My Button");frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);// 创建一个 JPanel 面板JPanel panel = new JPanel();panel.setLayout(new FlowLayout());ExecutorService executorService = Executors.newSingleThreadExecutor();final MyTask[] myTask = {null};// 创建一个 JButton 按钮JButton startbutton = new JButton("start");startbutton.addActionListener(new ActionListener() {public void actionPerformed(ActionEvent e) {myTask[0] = new MyTask();executorService.submit(myTask[0]);log.info("start clicked!");}});final boolean[] flag = {false};// 创建一个 JButton 按钮JButton suspendbutton = new JButton("suspend");suspendbutton.addActionListener(new ActionListener() {public void actionPerformed(ActionEvent e) {if (flag[0] == false) {myTask[0].mySuspend();flag[0] =true;} else {myTask[0].myReStart();flag[0] =false;}log.info("suspend clicked!");}});// 创建一个 JButton 按钮JButton stopButton = new JButton("stop");stopButton.addActionListener(new ActionListener() {public void actionPerformed(ActionEvent e) {log.info("stop clicked!");myTask[0].myStop();}});// 将按钮添加到面板panel.add(startbutton);panel.add(stopButton);panel.add(suspendbutton);// 将面板添加到窗口frame.getContentPane().add(panel);// 设置窗口的大小和可见性frame.setSize(300, 200);frame.setVisible(true);}
}
package com.example.springbootproject.thread;import lombok.extern.slf4j.Slf4j;@Slf4j
public class MyTask extends Thread{private Thread currentRunTask;@Overridepublic void run() {// 注意:this和Thread.currentThread不一样。因为我们使用线程池提交的任务// 前者是MyTask实例(state =new),后者是当前正在运行的线程(state=running)。// 而this就是MyTask实例代表的线程,它的状态是newcurrentRunTask = Thread.currentThread();// 开启运行业务代码...try {processBusiness();} catch (Exception e) {log.info("业务线程终止");}}private String processBusiness() throws InterruptedException {/*** 模拟业务运行,*/for (int i = 0; i < 1000000; i++) {log.info("business running...{}", i);for (int j = 0; j < 1000000; j++) {for (int k = 0; k < 100000; k++) {for (int l = 0; l < 100000; l++) {while (flag) { // 不用if。避免虚假唤醒synchronized (this) { // 获取try {log.info("业务线程 开始wait");this.wait();log.info("业务线程结束 wait");} catch (InterruptedException e) {log.info("处理业务过程中抛出一个异常");throw e;}}}log.info("business running...l is" + l);for (int m = 0; m < 100000; m++) {for (int n = 0; n < 10000; n++) {int aa = i + j + k + m + n;int bb = aa * aa - m - n - i - j;for (int o = 0; o < aa; o++) {bb = aa + bb;}}}}}}}return "0";}private boolean flag = false;public void mySuspend() {flag = true;log.info("suspend success");}public void myReStart() {synchronized (this) {flag = false;this.notifyAll();log.info("resume success");}}public void myStop() {currentRunTask.interrupt();flag = true;log.info("stop success");}
}
非延迟版:无法实现
分布式集群最终版
- 如何保证两次请求都打到同一个服务器上呢?
- 上下文切换也消耗时间和内存。如果暂停的线程多了,如何处理呢?
- 暂停多长时间合适呢?如果用户一直暂停呢?
所以,最终只能用数据库来保存才能达到要求。
按照我的需求,用户给我数据A,我处理后返回给他数据B。而且按照正常情况,用户一般不会经常点击暂停。而且用户量不大,但不能保证用户多开。
- 把数据分组(或者叫程序分步骤)。比如用户传了1w数据,我就分每1k条就去判断用户是否点击暂停或者停止。
- 每一步处理完后的数据、进度需要存数据库。可以用redis缓存进度条
- 如果用户点击暂停,使用MQ广播,判断是哪一个服务器正在运行这个任务。然后就结束这个任务。
- 如果用户点击恢复,从服务器上面重新读取文件和数据库里面的匹配,重新运行暂停前没有跑的数据(或者最开始把所有数据都存到数据库,就不用从服务器上面获取文件、解析数据这些操作了。)这里不用MQ了,因为所有的数据都在数据库里面存着,直接继续跑就行了。
- 定时任务清理表。比如每15天,清理一直暂停的任务、已经跑完的数据等。