您的位置:首页 > 财经 > 金融 > redis实现延时队列

redis实现延时队列

2025/2/24 9:26:22 来源:https://blog.csdn.net/Zxdwr520/article/details/140289986  浏览:    关键词:redis实现延时队列
redis的Zset特点

redis的zset它结合了set和list的特点
 1、集合内元素不会重复
 2、元素以有序的方式排列
zset中的元素都会关联一个分数score,内部将通过这个score对集合元素进行的排序。
虽然zset集合中元素不会重复,但score可以重复。

如果有两个score相同的元素,将按照元素的字典序进行排序

score保证了队列中的消息有序性
延迟队列的实现:
将数据存到redis的zset中并指定score(double),zset会对score进行排序,让最早消费的数据位于最前,拿最前的数据跟当前时间比较,时间到了则消费

延迟消息队列使用场景
  1. 打车场景,在规定时间内,没有车主接单,那么平台就会推送消息给你,提示暂时没有车主接单。
  2. 支付场景,下单了,如果没有在规定时间付款,平台通常会发消息提示订单在有效期内没有支付完成,自动取消订单。
  3. 闹钟场景,时间到了则执行播报声音。
redis作为消息队列的优缺点
  • 优点
    • 使用相对简单
    • 不用专门维护专业的消息中间件,降低服务和运维成本
  • 缺点
    • 没有ack,消息确认机制,存在消息丢失的可能
    • 没有重试机制,建议写代码补偿
    • 对消息的可靠性有很大的要求,建议还是不要使用redis作为延时消息队列

如果是简单的日志推送,消息推送等,可以使用redis队列。

代码实现

生产者

@Slf4j
@Component
public class MessageProvider  {private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();@Lazy@ResourceMsgMapper msgMapper;@Lazy@ResourceRedisUtil redisUtil;//这里就不放出来了,大家都有的private static String USER_CHANNEL = "随便_CHANNEL_";public static final String KEY_PREFIX ="你的前缀_msg:";/*** @Description: 发送消息添加到延迟队列* @param delay 延迟时间(排序的score)* @Author: fan* @Date: 2024/7/2 15:55*/public void sendMessage(Long id,Long taskId,String messageContent, Long delay,String channel) {//消息体格式,可根据自己需要调整,这里就不放出来了Message message = new Message();String msgId = USER_CHANNEL + id;long time = System.currentTimeMillis();LocalDateTime dateTime = Instant.ofEpochMilli(time).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();message.setCreateTime(dateTime);message.setDelayTime(delay);message.setBody(messageContent);message.setMsgId(msgId);message.setStatus(ImmobilizationEnum.not_fixed_broadcast.getCode());message.setChannel(USER_CHANNEL + channel);Boolean b = false;try {//推送到队列b = pushQUeue(message);} catch (Exception e) {log.error("[sendMessage_Exception异常] e={}", e);} finally {String value = redisUtil.getKey(YOUR_KEY_PREFIX + msgId);if (StringUtil.isEmpty(value )) {//如果没有则插入数据库,代码补偿Message msg = msgMapper.selectByMsgId(msgId);if (msg == null) {msgMapper.insert(message);redisUtil.setKey(YOUR_KEY_PREFIX + msgId, UUID.toString, 20);}}}}public Boolean pushQUeue(Message queueManager){Boolean b = false;try {String messageStr = mapper.writeValueAsString(queueManager);b = redisUtil.addZset(YOUR_QUEUE_NAME_KEY, messageStr, queueManager.getDelayTime());redisUtil.expire(YOUR_QUEUE_NAME_KEY, 20, TimeUnit.SECONDS);} catch (Exception e) {log.error("[push_Exception异常] e={} ",e);}return b;}public List<Message> pullZset(){long currentTimeMillis = System.currentTimeMillis();List<Message> messageList = new ArrayList<>();try{Set<String> strings = redisUtil.rangeByScore(YOUR_QUEUE_NAME_KEY, 0, Long.MAX_VALUE);if (CollectionUtils.isEmpty(strings)) {return null;}messageList = strings.stream().map(msg -> {Message message = null;try {message = mapper.readValue(msg,Message.class);} catch (Exception e) {log.error("[pull_Exception异常] e:{}",e);}return message;}).collect(Collectors.toList());} catch (Exception e) {log.error("[pull_Exception异常]  Exception={} ", e);} finally {if (CollectionUtils.isEmpty(messageList)) {//如果缓存没有则从数据库取并下发到队列messageList = MsgMapper.selectByStatus("你的查询条件,这里按状态,具体SQL就不贴了");}}return messageList;}}

消费者

//消费者方法
public void delayingQueueConsumer(){List<Message> msgList = pullZset();if (!CollectionUtils.isEmpty(msgList)) {long current = getCurrentTime();for (int i = 0; i < msgList.size(); i++) {Message msg = msgList.get(i);//到点执行if (current >= msg.getDelayTime()) {//你的业务}}}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com