一、需求:
任务超过一个小时以后,如果还为待执行状态,则自动转为结束状态。
二、实现:
- 创建延迟队列的监听任务RedisDelayedQueueListener,消费延迟队列;
- 创建新增延迟队列的类,用于创建延迟队列;
- 整体初始化,把监听任务与spring绑定,扫描各个监听延迟队列的实现类,并开启单独线程,监听任务;
- 创建延迟任务。
三、实现步骤:
1.引入redisson依赖,这里直接引入springboot整合好的依赖,如果引用原生的依赖,需要自己配置redissonClient Bean。
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.10.5</version>
</dependency>
2.创建延时队列监听接口,定义延时队列到期事件处理方法,消费延时队列
/*** redis 队列事件监听,需要实现这个方法* @param <T>*/
public interface RedisDelayedQueueListener<T> {/*** 执行方法* @param t*/void invoke(T t);
}
3.具体的延时队列消费实现
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 订单支付过期监听*/
@Component
@Slf4j
public class OrderPayExpirationListener implements RedisDelayedQueueListener<String>{@Autowiredprivate ITOrderService orderService;@Overridepublic void invoke(String orderId) {log.info("===" + orderId + ===");//查询到订单,判断为未支付,修改订单状态TOrder order = orderService.lambdaQuery().eq(TOrder::getOrderId, orderId).one();if (order.getOrderStatus() == 1) { //订单未支付TOrder tOrder = new TOrder();tOrder.setOrderId(orderId);tOrder.setOrderStatus(0); //更新订单为取消状态orderService.updateById(tOrder);}}
}
4.初始化,把监听任务与spring绑定
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Map;/*** redis 延时队列初始化*/
@Component
@Slf4j
public class RedisDelayedQueueInit implements ApplicationContextAware {@Autowiredprivate RedissonClient redissonClient;/*** 获取应用上下文并获取相应的接口实现类* @param applicationContext* @throws BeansException*/@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {String listenerName = taskEventListenerEntry.getValue().getClass().getName();startThread(listenerName, taskEventListenerEntry.getValue());}}/*** 启动线程获取队列* @param queueName 队列名称* @param redisDelayedQueueListener 任务回调监听*/private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);//由于此线程需要常驻,可以新建线程,不用交给线程池管理Thread thread = new Thread(() -> {log.info("启动监听队列线程" + queueName);while (true) {try {T t = blockingFairQueue.take();log.info("监听队列线程{},获取到值:{}", queueName, JSON.toJSONString(t));redisDelayedQueueListener.invoke(t);} catch (Exception e) {log.info("监听队列线程错误,", e);}}});thread.setName(queueName);thread.start();}
}
5.创建延时任务
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;/*** Redis 延时队列*/
@Component
@Slf4j
public class RedisDelayedQueue {@Autowiredprivate RedissonClient redissonClient;/*** 添加对象进延时队列* @param putInData 添加数据* @param delay 延时时间* @param timeUnit 时间单位* @param queueName 队列名称* @param <T>*/private <T> void addQueue(T putInData,long delay, TimeUnit timeUnit, String queueName){log.info("添加延迟队列,监听名称:{},时间:{},时间单位:{},内容:{}" , queueName, delay, timeUnit,putInData);RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(putInData, delay, timeUnit);}/*** 添加队列-秒** @param t DTO传输类* @param delay 时间数量* @param <T> 泛型*/public <T> void addQueueSeconds(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {addQueue(t, delay, TimeUnit.SECONDS, clazz.getName());}/*** 添加队列-分** @param t DTO传输类* @param delay 时间数量* @param <T> 泛型*/public <T> void addQueueMinutes(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {addQueue(t, delay, TimeUnit.MINUTES, clazz.getName());}/*** 添加队列-时** @param t DTO传输类* @param delay 时间数量* @param <T> 泛型*/public <T> void addQueueHours(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {addQueue(t, delay, TimeUnit.HOURS, clazz.getName());}/*** 添加队列-天** @param t DTO传输类* @param delay 时间数量* @param <T> 泛型*/public <T> void addQueueDays(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {addQueue(t, delay, TimeUnit.DAYS, clazz.getName());}
}
6.此时只需要再下单成功的方法里面新增以下逻辑即可
@Autowiredprivate RedisDelayedQueue redisDelayedQueue;//将订单id放入延时队列,配置过期监听的处理类
redisDelayedQueue.addQueueHours(id,2, OrderPayExpirationListener.class);
以上参考:https://www.cnblogs.com/huaixiaonian/p/16978606.html
四、我的优化
4.1 此场景中,ApplicationContextAware存在的问题
介绍ApplicationContextAware和ApplicationRunner的区别
- ApplicationContextAware:在Bean初始化过程中initializeBean()函数中;(项目没启动完成)
- ApplicationRunner:在所有bean都初始化完成后调用,在AfterFinish中执行;
因此ApplicationContextAware初始化会有两个问题:
- 未完全启动完成就监听,可能会导致消费队列的相关类未全部加载完成,导致在启动完成前这段时间,消息消费异常;
- 代码里是新建线程异步消费,当有系统启动异常时,线程还在启动着,会不断打印log.info(“监听队列线程错误,”, e);
4.1.2 优化一:ApplicationRunner替代ApplicationContextAware
@Slf4j
@Component
public class RedisDelayedQueueInitRunner implements ApplicationRunner {.......
}
@Overridepublic void run(ApplicationArguments args) {String listenerName = String.format("XX", redisDelayedQueueListener.getClass().getSimpleName());startThread(listenerName, redisDelayedQueueListener);}
4.2 上次关闭的时候的消息到期了,不会马上发送
上次关闭的时候的消息到期了,不会马上发送,要等新消息来,才会消费。
原因:因为是在添加消息的时候才初始化管道的:
解决方法:这个地方吧管道开启就可以了
这个是在启动的时候去执行 要在invoke 方法里面捕获,防止启动失败了。