RabbitMQ高级特性1
- 一.消息确认
- 1.消息确认机制
- 2.手动确认代码
- 肯定确认
- 否定确认1
- 否定确认2
- Spring中的代码
- 二.持久性
- 1.交换机持久化
- 2.队列的持久化
- 3.消息的持久化
- 非持久化代码实现
- 三方面都持久化,数据也会丢失
- 三.发送方确认
- 1.Confirm确认模式
- 2.return返回模式
- 四.总结
- RabbitMQ保证消息可靠传输
一.消息确认
1.消息确认机制
-
自动确认:当autoAck等于true时, RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。自动确认模式适合对于消息可靠性要求不高的场景。
-
手动确认:当autoAck等于false时,RabbitMQ会等待消费者显式地调用Basic.Ack命令,回复确认信号后才从内存(或者磁盘)中移去消息。这种模式适合对消息可靠性要求比较高的场景.
2.手动确认代码
肯定确认
Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了。
参数说明
1)deliveryTag:消息的唯⼀标识,它是⼀个单调递增的64位的长整型值。 deliveryTag 是每个通道(Channel)独立维护的,所以在每个通道上都是唯⼀的。当消费者确认(ack)⼀条消息时,必须使用对应的通道上进行确认。
2)multiple:是否批量确认。在某些情况下,为了减少网络流量,可以对⼀系列连续的 deliveryTag 进行批量确认。值为true则会⼀次性把ack所有小于或等于指定deliveryTag的消息。值为false,则只确认当前指定deliveryTag的消息。
否定确认1
Channel.basicReject(long deliveryTag, boolean requeue)
RabbitMQ在2.0.0版本开始引⼊了 Basic.Reject 这个命令,消费者客户端可以调用
channel.basicReject方法来告诉RabbitMQ拒绝这个消息。
参数说明
1)deliveryTag:参考channel.basicAck。
2)requeue:表示拒绝后,这条消息如何处理。如果requeue参数设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下⼀个订阅的消费者。如果requeue参数设置为false,则RabbitMQ会把消息从队列中移除,而不会把它发送给新的消费者。
否定确认2
Channel.basicNack(long deliveryTag, boolean multiple,boolean requeue)
Basic.Reject命令⼀次只能拒绝⼀条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令。消费者客户端可以调用channel.basicNack方法来实现。
参数说明
参数介绍参考上面两个方法。
multiple参数设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息。
Spring中的代码
- AcknowledgeMode.NONE
这种模式下,消息⼀旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ就会自动确认消息,从RabbitMQ队列中移除消息,如果消费者处理消息失败,消息可能会丢失。
ym配置
spring:application:name: rabbitmqdemorabbitmq:addresses: amqp://账号:密码@IP:端口号/虚拟机listener:simple:acknowledge-mode: none
- AcknowledgeMode.AUTO(默认)
这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息,但是会一直尝试重发消息。
将yml配置中的 acknowledge-mode改成auto。
上述两种模式代码相同
Configuration
package com.example.rabbitmqdemo.config;import com.example.rabbitmqdemo.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfiguration {//消息确认//队列@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}//虚拟机@Bean("directExchange")public DirectExchange directExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();}//队列和虚拟机绑定@Bean("ackBinding")public Binding ackBinding(@Qualifier("ackQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with("ack");}
}
** Constants**
package com.example.rabbitmqdemo.constant;
public class Constants {public static final String ACK_QUEUE = "ack.queue";public static final String ACK_EXCHANGE = "ack.exchange";
}
Controller
package com.example.rabbitmqdemo.controller;import com.example.rabbitmqdemo.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","ack is ok");return "ack is ok!";}
}
Listener
@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑System.out.printf("接收到消息: %s , deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//不做具体实现的消费者业务逻辑}}
- AcknowledgeMode.MANUAL
手动确认模式下,消费者必须在成功处理消息后显式调用 basicAck 方法来确认消息。如果消息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息,这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理。
将yml配置中的 acknowledge-mode改成manual。
Listener
package com.example.rabbitmqdemo.listener;import com.example.rabbitmqdemo.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.io.UnsupportedEncodingException;/*** Created with IntelliJ IDEA.* Description:* User: hp* Date: 2025-04-03* Time: 9:26*/
@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws IOException {//消费者逻辑System.out.printf("接收到消息: %s , deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//不做具体实现的消费者业务逻辑try {//int sum = 3 / 0;//确认消息(肯定)channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e) {//否定确认//最后一个参数为true,则发生异常重新入队,false,为不再入队channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}}}
二.持久性
1.交换机持久化
交换器的持久化是通过在声明交换机时是将durable参数置为true实现的。
相当于将交换机的属性在服务器内部保存,当MQ的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机,交换机会自动建立,相当于⼀直存在。
如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失,对⼀个长期使用的交换器来说,建议将其置为持久化的。
设置交换机的持久化
2.队列的持久化
队列的持久化是通过在声明队列时将 durable 参数置为true实现的。
如果队列不设置持久化,那么在RabbitMQ服务重启之后,该队列就会被删掉,此时数据也会丢失。(队列没有了,消息也无处可存了)
队列的持久化能保证该队列本⾝的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将消息设置为持久化。
咱们前面用的创建队列的方式都是持久化的。
队列持久化
队列非持久化
3.消息的持久化
消息实现持久化,需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,也就是MessageDeliveryMode.PERSISTENT
设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧存在。如果只设置队列持久化,重启之后消息会丢失。
如果只设置消息的持久化,重启之后队列消失,继而消息也丢失。所以单单设置消息持久化而不设置队列的持久化显得毫无意义
非持久化代码实现
交换机、队列和绑定
//非持久化队列@Bean("presQueue")public Queue presQueue() {return QueueBuilder.nonDurable(Constants.PRES_QUEUE).build();}//非持久化交换机@Bean("presExchagne")public DirectExchange presExchange() {return ExchangeBuilder.directExchange(Constants.PRES_EXCHANGE).durable(false).build();}@Bean("presBinding")public Binding presBinding(@Qualifier("presQueue") Queue queue,@Qualifier("presExchagne") Exchange exchange) {//如果参数传递的是Exchange类型而不是DirectExchang类型就需要使用noargs作为收尾return BindingBuilder.bind(queue).to(exchange).with("pres").noargs();}
Producer
@RequestMapping("/pres")public String pres() {Message message = new Message("Presistent test...".getBytes(),new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE,"pres",message);return "pres is ok!";}
RabbitMQ服务器的虚拟机和队列
三方面都持久化,数据也会丢失
-
从消费者来说,如果在订阅消费队列时将autoAck参数设置为true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据居丢失。这种情况很好解决,将autoAck参数设置为false,并进行手动确认。
-
在持久化的消息正确存入RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存⼊磁盘中。RabbitMQ并不会为每条消息都进行同步存盘(调用内核的fsync方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内RabbitMQ服务节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。
三.发送方确认
1.Confirm确认模式
Producer在发送消息的时候,对发送端设置⼀个ConfirmCallback的监听,无论消息是否到达Exchange,这个监听都会被执行,如果Exchange成功收到,ACK( Acknowledge character ,确认字符)为true,如果没收到消息,ACK就为false。
yml配置
spring:application:name: rabbitmqdemorabbitmq:addresses: amqp://账号:Miami@IP:端口号/虚拟机listener:simple:#acknowledge-mode: none#acknowledge-mode: autoacknowledge-mode: manualpublisher-confirm-type: correlated #消息发送确认
Configuration
package com.example.rabbitmqdemo.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调函数rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack) {System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n",correlationData == null ? null : correlationData.getId(),cause);//相应的业务处理}}});return rabbitTemplate;}
}
Producer
@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");rabbitTemplateConfig.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm test...",correlationData);return "confirm is ok!";}
2.return返回模式
Configuration
@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调函数rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack) {System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n",correlationData == null ? null : correlationData.getId(),cause);//相应的业务处理}}});//return模式rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("消息退回: " + returnedMessage);}});return rabbitTemplate;}
四.总结
RabbitMQ保证消息可靠传输
Producer -> Broker:发送方确认
- Producer -> Exchange :Confirm模式(网络问题)
- Exchange -> Queue : return模式(代码或者配置层错误,导致消息路由失败)
- 队列移除:死信等
Broker:持久化(RabbitMQ服务器宕机导致消息丢失)
- 交换机持久化
- 队列持久化
- 消息持久化
Broker -> Consumer 消息确认方式(消费者未来得及消费信息,就宕机了)
- 自动确认
- 手动确认