您的位置:首页 > 健康 > 美食 > rabbitMQ消息的可靠性

rabbitMQ消息的可靠性

2024/10/5 17:26:02 来源:https://blog.csdn.net/2302_77426533/article/details/141435876  浏览:    关键词:rabbitMQ消息的可靠性

一、发送者的可靠性

        1.生产者的重连

因网络问题连接MQ失败,解决在配置文件中配置失败后的重连机制(阻塞式的,影响业务)

spring:rabbitmq:host: 192.168.88.129port: 5672virtual-host: /hmallusername: hmallpassword: #connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled:true #开启超时重试机制initial-interval:1000ms  #失败后的初始等待时间multiplier:1  #失败后下次的等待时长倍数,下次等待时长=initial-interval * multipliermax-attempts:3  #最大重试次数

 2.生产者确认

  有两种 Publisher Confirm和Publisher Return两种确认机制。开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
        消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
        临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
        持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
        其它情况都会返回NACK,告知投递失败

3.解决

在发送者的配置文件中配置

rabbitmg :publisher-confirm-type:correlated #开publisher confirm机制,并设置confirm类型publisher-returns:true #开启publisher return机制
#配置说明:
#这里publisher-confirm-type有三种模式可选:
#  none:关闭confirm机制
#  simple:同步阻塞等待MQ的回执消息
#  correlated:MO异步回调方式返回回执消息

写一个配置类

package com.itheima.publisher.Config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置回调rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){@Overridepublic void returnedMessage(ReturnedMessage returned) {log.debug("收到消息的return callback, exchange:{},key:{},msg:{}, code:{}, text:{}",returned.getExchange(), returned.getRoutingKey(), returned.getMessage(),returned.getReplyCode(), returned.getReplyText());}});}
}

在每次发消息时

 @Testvoid testConfirmCallback() throws InterruptedException {//1.创建cdCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());//2.添加Confirm// Callbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {log.error("消息回调失败", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {log.debug("收到confirm callback回执");if (result.isAck()) {//消息发送成功log.debug("消息发送成功,收到ack");} else {//消息发送失败log.error("消息发送失败,收到nack,原因:{}", result.getReason());}}});rabbitTemplate.convertAndSend("hmall.direct", "red", "hello", cd);Thread.sleep(2000);}

二、消息队列的可靠性


  1 分析

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题

一旦MQ宕机,内存中的消息会丢失

内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞

 2.解决

 2.1数据持久化(交换机持久化,队列持久化和消息持久化)

 2.2 Lazy Queue(推荐)

基于注解

  @RabbitListener(queuesToDeclare = @Queue(name="lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode",value = "lazy")))public void listenDelayQueues(String msg){log.info("接收到了delay。queue的消息{}",msg);}

三、接收者的可靠性

1.消费者确认机制

     为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制,当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值

ack:成功处理消息,RabbitMQ从队列中删除该消息
nack:消息处理失败,RabbitMQ需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

1.1 在接收者的配置文件中配置

spring:rabbitmq:    listener:simple:prefetch: 1acknowledge-mode: auto #确认机制  :none关闭ack,manual手动ack,auto自动ack

2.失败重试机制,当接收者程序抛异常后,会不断地重新发送再抛异常

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次reaueue,无限循环,导致mq的消息处理飙升,带来不必要的压力。

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列

2.1配置

2.2失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(建议)

新建一个队列,投递进去进行人工处理


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorConfiguration {@Beanpublic DirectExchange errorExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue");}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){log.debug("加载RepublishMessageRecoverer");return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

3.业务的幂等性

1.创建唯一消息id

消费者和发送者的启动类配一个bean

2.结合业务

四、延迟消息

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。延迟任务:设置在一定时间之后才执行的任务

1.死信交换机实现

    当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false

消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费

要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead LetterExchange,简称DLX)。

2.延迟消息插件

1.执行命令安装插件

docker exec -it mq rabbitmg-plugins enable rabbitmq_delayed_message_exchange

2.使用(使用在延迟时间较短的场景)

2.1定义延迟交换机

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com