您的位置:首页 > 游戏 > 手游 > 哈尔滨建站服务网站开发_互联网电商板块火箭发射_网站展示型推广_苏州seo关键词优化价格

哈尔滨建站服务网站开发_互联网电商板块火箭发射_网站展示型推广_苏州seo关键词优化价格

2025/1/15 16:31:46 来源:https://blog.csdn.net/a1241436267/article/details/144989603  浏览:    关键词:哈尔滨建站服务网站开发_互联网电商板块火箭发射_网站展示型推广_苏州seo关键词优化价格
哈尔滨建站服务网站开发_互联网电商板块火箭发射_网站展示型推广_苏州seo关键词优化价格

1. 消息发送者的可靠性

保证消息的可靠性可以通过发送者重连发送者确认来实现


发送者重连

发送者重连机制就是在发送信息的时候如果连接不上mq不会立即结束,而是会在一定的时间间隔之类进行重新连接,连接的次数和时间都是由我们在配置文件中指定的,具体的就是通过retry属性来

spring: rabbitmq: # rabbitmq配置 host: localhost # rabbitmq地址port: 5672 # rabbitmq端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码template: # 消息发送相关配置retry: # 重试相关配置enabled: true # 启用重试max-attempts: 3 # 最大重试次数initial-interval: 1000 # 初始重试间隔multiplier: 2 # 重试间隔倍数max-interval: 10000 # 最大重试间隔

测试

将MQ关闭,然后随便写一个消息发送案例,就能够看见效果

package com.itheima.publisher;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.HashMap;
import java.util.Map;@SpringBootTest
public class PublisherApplicationTest {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test() {String exchangeName = "fanout.hamll";Map map=new HashMap();map.put("name","hamll");map.put("age",18);map.put("sex","男");rabbitTemplate.convertAndSend("fanout.hamll.query2", map);}}

发送者确认

在一般的情况下,消息很少会出现问题,但是还是有出现问题的可能性,比如:

1. 消息发送后无法路由键找不到相关队列

2. 绑定的交换机不存在

3. 消息发送出现异常

针对这一情况,MQ为我们提供了多种消息确认机制,比如:Publisher Return、Publisher Confirm

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

Publisher Return

着重于绑定的队列、交换机、路由是否成功,并且能够监听到相关的信息,比如交换机、路由、提示等

在使用的过程总需要一个全局的配置类

package com.itheima.publisher.config;import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;/*** RabbitMQ 配置类,用于配置 RabbitTemplate 的回调函数。*/
@Slf4j // 使用 Lombok 注解引入日志记录器
@AllArgsConstructor // 使用 Lombok 注解生成全参构造函数
@Configuration // 标记为 Spring 配置类
public class MqConfig {private final RabbitTemplate rabbitTemplate; // 注入 RabbitTemplate 实例/*** 初始化方法,在 Bean 创建后立即执行。* 设置 RabbitTemplate 的返回消息回调函数。*/@PostConstruct // 标记为初始化方法public void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {/*** 当消息被 broker 返回时触发的回调函数。* @param returned 返回的消息对象*/@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,"); // 记录错误日志,表示触发了返回回调log.debug("exchange: {}", returned.getExchange()); // 记录交换机名称log.debug("routingKey: {}", returned.getRoutingKey()); // 记录路由键log.debug("message: {}", returned.getMessage()); // 记录消息内容log.debug("replyCode: {}", returned.getReplyCode()); // 记录回复代码log.debug("replyText: {}", returned.getReplyText()); // 记录回复文本}});}
}


Publisher Confirm

适用于更加复杂复杂的业务,MQ通过方法回调来告诉发送者消息是否发送成功,提供了两个方法的回调:

1. onFailure 在发送消息出现异常的时候会被捕获、并且接收了一个异常对象来返回异常信息。

2. onSuccess 在发送的时候如果成功被MQ接收到就会触发、onSuccess通常会接收两个参数作为参数(CorrelationData.Confirm )、Confirm有一个IsAck()方法来表示是否被确认:

  • true:表示消息被成功确认(ack),即消息已经被 RabbitMQ 正确接收并处理。
  • false:表示消息未被确认(nack),可能是因为 RabbitMQ 内部错误或其他原因导致消息无法被正确处理
package com.itheima.publisher;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** 测试类,用于验证消息发布功能。*/
@SpringBootTest
@Slf4j // 使用 Lombok 注解引入日志记录器
public class PublisherApplicationTest {@Autowiredprivate RabbitTemplate rabbitTemplate; // 注入 RabbitTemplate 实例/*** 测试方法,验证消息发布功能。* @throws InterruptedException 可能抛出的中断异常*/@Testpublic void test() throws InterruptedException {// 创建 CorrelationData 对象,用于唯一标识消息CorrelationData correlationData = new CorrelationData();// 设置回调函数,处理消息发送的结果correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {/*** 消息发送失败时的回调函数。* @param ex 异常信息*/@Overridepublic void onFailure(Throwable ex) {// 记录消息发送失败的异常信息log.info("消息发送失败、异常!{}", ex.getMessage());}/*** 消息发送成功时的回调函数。* @param result 确认结果*/@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 检查消息是否被确认if (result.isAck()) {// 记录消息发送成功的日志log.info("消息发送成功");} else {// 记录消息发送失败的原因log.info("消息发送失败!{}", result.getReason());}}});// 发送消息到指定的交换机和路由键rabbitTemplate.convertAndSend("pay.direct", "pay.success", "hello rabbitmq", correlationData);}
}

数据持久化

默认情况下MQ的数据都是临时数据,MQ故障重启后消息都会丢失,为了保证消息的可靠性就需要做持久化操作,MQ的持久化包括:

1. 交换机持久化

2. 队列持久化

3. 消息持久化


交换机持久化

可以在控制台创建的时候设置为Durable就是持久化模式,Transient就是临时模式。


如果是代码注解开发可以在参数列表通过durable为true指定持久化或者不持久化,交换机通过注解创建一般都是默认的持久化

 @RabbitListener(bindings =@QueueBinding(value = @Queue(name = "fanout.hamll.query1",durable = "true"),exchange = @Exchange(name = "fanout.hamll", type = ExchangeTypes.FANOUT,durable = "true")))

队列持久化

可以在控制台创建的时候设置为Durable就是持久化模式,Transient就是临时模式。


如果是代码注解开发可以在参数列表通过durable为true指定持久化或者不持久化,队列一般都是默认不持久化,需要手动设置
 

@RabbitListener(bindings =@QueueBinding(value = @Queue(name = "fanout.hamll.query1",durable = "true"),exchange = @Exchange(name = "fanout.hamll", type = ExchangeTypes.FANOUT,durable = "true")))

消息持久化

消费者发送的消息默认情况下都是临时的消息,在MQ重启的时候消息会丢失。而开启持久化之后消息会被永久保存在MQ,即使MQ服务器挂了也不会丢失。

在发送消息的时候会由java的api将我们传入的object转换成Message对象,默认是不会帮我们持久化的,MQ重启消息就没了

想要持久化也很简单,就是我们自己来创建Message对象然后开启持久化

 //消息持久化Message message = MessageBuilder.withBody("hello rabbitmq".getBytes(StandardCharsets.UTF_8)) // 消息内容.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 消息持久化.build();// 构建消息rabbitTemplate.convertAndSend("pay.direct", "pay.su1ccess", message, correlationData);

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com