您的位置:首页 > 教育 > 锐评 > RabbitMq 消息确认和退回机制

RabbitMq 消息确认和退回机制

2024/10/6 0:32:57 来源:https://blog.csdn.net/szchen2/article/details/140126019  浏览:    关键词:RabbitMq 消息确认和退回机制

一、Rabbit中消息确认和退回机制

1、发布确认

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者 (包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,(单个)如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号。

1.1 开启发布确认

//开启发布确认
channel.confirmSelect();

1.2 发布确认的两种方式

      同步确认

      就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

public static void publishMessageIndividually() throws Exception {Channel channel = RabbitMqUtils.getChannel();//队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, true, false, false, null);//开启发布确认channel.confirmSelect();long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());//服务端返回 false 或超时时间内未返回,生产者可以消息重发boolean flag = channel.waitForConfirms();if (flag) {System.out.println("消息发送成功");}}long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");}
  异步确认 

    他是利用回调函数来达到消息可靠性传递的

//    异步确认发布private static void publishMessageAsync() throws Exception {Channel channel = RabbitMqUtils.getChannel();//队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, true, false, false, null);//开启发布确认channel.confirmSelect();/**线程安全有序的一个哈希表,适用于高并发的情况下1.轻松的将序号与消息进行关联2.轻松批量删除条目,只要给到序号3.支持高并发(多线程)**///消息确认成功,回调函数ConcurrentSkipListMap<Long,String> outstandingConfirms =new ConcurrentSkipListMap<>();ConfirmCallback ackCallback = (deliveryTag , multiple)->{ //确认了多少条,multiple:批量或单个if (multiple){  //批量的//2.轻松批量删除条目,只要给到序号ConcurrentNavigableMap<Long,String> confirmed =outstandingConfirms.headMap(deliveryTag); //消息的序号confirmed.clear();}else {  //单个的outstandingConfirms.remove(deliveryTag);}System.out.println("确认的消息:"+deliveryTag);};//消息确认失败,回调函数/*** 1.消息的标记* 2.是否为批量确认*/ConfirmCallback nackCallback = (deliveryTag , multiple)->{//3、打印一下未确认的消息都有哪些String message = outstandingConfirms.get(deliveryTag);System.out.println("未确认的消息是:"+message+":::未确认的消息:"+deliveryTag);};//准备消息的监听器 ,监听哪些消息成功了, 哪些失败了channel.addConfirmListener( ackCallback, nackCallback);//开始时间long begin = System.currentTimeMillis();//发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());//1.轻松的将序号与消息进行关联,将每一条消息都存放hash表里面outstandingConfirms.put(channel.getNextPublishSeqNo(),message);}//结束时间long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");}

2、 退回机制

退回模式(return)说的是当消息到达交换机后,但是没有找到匹配的队列时,将消息回退给生产者。

默认情况下,如果消息没有匹配到队列会直接丢弃,采用退回模式可以在生产者端监听改消息是否被成功投递到队列中

channel.addReturnListener(new ReturnCallback() {@Overridepublic void handle(Return returnMessage) {System.out.println("消息被回退,原因:"+returnMessage.getReplyText());System.out.println(returnMessage.getExchange()); // 交换机System.out.println(returnMessage.getReplyCode()); // 返回原因的代码System.out.println(returnMessage.getReplyText()); // 返回信息,例如NO_ROUTESystem.out.println(returnMessage.getRoutingKey()); // 路由KEY}});

二、Spring boot 中Rabbit 实现消息确认和退回机制

1、 开启配置

spring:#配置rabbitMq 服务器rabbitmq:host: 127.0.0.1port: 5672username: rootpassword: root#虚拟host 可以不设置,使用server默认hostvirtual-host: JCcccHost#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true

2、配置消息确认和退回机制

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author : JCccc* @CreateTime : 2019/9/3* @Description :**/
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);// 配置发布到交换机的确认rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/***correlationData :客户端在发送原始消息时提供的对象。*ack:exchange交换机是否成功收到了消息。true成功,false代表失败。*cause:失败原因。*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);System.out.println("ConfirmCallback:     "+"确认情况:"+ack);System.out.println("ConfirmCallback:     "+"原因:"+cause);}});// 配置交换机没有找到对应的消息队列时,消息退回时的处理rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("ReturnCallback:     "+"消息:"+message);System.out.println("ReturnCallback:     "+"回应码:"+replyCode);System.out.println("ReturnCallback:     "+"回应信息:"+replyText);System.out.println("ReturnCallback:     "+"交换机:"+exchange);System.out.println("ReturnCallback:     "+"路由键:"+routingKey);}});return rabbitTemplate;}}

2.1 其他方式

package com.student.rabbitmq.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** 第二种方式*/
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;// 将我们实现的MyCallBack接口注入到RabbitTemplate中@PostConstructpublic void init() {// 设置确认消息交给谁处理rabbitTemplate.setConfirmCallback(this);// 设置回退消息交给谁处理rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法** @param correlationData 保存回调消息的ID以及相关信息* @param ack             表示交换机是否收到消息(true表示收到)* @param cause           表示消息接收失败的原因(收到消息为null)*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {System.out.println("交换机已经收到ID为:{}的消息");} else {System.out.println("交换机还未收到ID为:{}的消息,原因为:{}" + cause);}}/*** 路由出现问题的消息回退方法**/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println(new String(message.getBody())+exchange+replyText+routingKey);}}

三、参考

发布确认—发布确认逻辑和发布确认的策略

消息可靠性之发布确认、退回机制_rabbittemplate 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com