RokcetMQ的介绍和基础知识见这篇博客——RocketMQ学习(1) 快速入门
本篇为上一篇的深入学习,很多基础知识不再赘述。
目录
- 消息重复消费问题(去重;幂等)
- 布隆过滤器
- 重试机制
- 死信消息
- SpringBoot集成RocketMQ
- 集成SpringBoot发送不同消息模式(同步消息)
- 异步消息
- 单向消息
- 延迟消息
- 顺序消息
- 批量消息
- 事务消息
- 发送对象消息和集合消息
- 消息标签,tag
- 消息key
- 负载均衡和广播,两种消息消费模式
- 消息堆积问题
- 消息丢失问题
- 消息安全问题
- 底层设计小探
消息重复消费问题(去重;幂等)
为什么会出现重复消费问题呢?
消息重复消费的主要原因有两个:
- 生产者多次投递:同一条消息可能被生产者多次发送。
- 消费者方重试机制:消费者在扩容或其他情况下会重试消费。
广播模式(BROADCASTING)
在广播模式下,所有注册的消费者都会收到并处理消息。通常,这些消费者是集群部署的微服务,因此每台机器都会消费同一条消息。虽然这是根据需求选择的模式,但会导致多台机器重复消费相同的消息。
负载均衡模式(CLUSTERING)
在负载均衡模式下,如果一个topic被多个consumer group消费,也会出现重复消费的情况。即使在同一个consumer group中,一个队列只会分配给一个消费者,但以下情况仍可能导致重复消费:
- 负载均衡重新分配:当一个新的消费者加入或一个现有的消费者下线时,同组的所有消费者需要重新进行负载均衡。新的消费者需要获取之前的消费偏移量(offset)。如果之前的消费者已经消费了一条消息但尚未提交offset,那么新的消费者可能会重新消费这条消息。
- 顺序消费模式(Orderly):在顺序消费模式下,前一个消费者解锁后,新的消费者加锁再进行消费。虽然这种方式比并发消费(concurrently)更严格,但由于加锁的线程和提交offset的线程不同,极端情况下仍会出现重复消费。
- 批量消息处理:当发送批量消息时,整个批量消息会被当作一条消息处理。如果批量中的部分消息处理成功而其他消息失败,重新消费时会导致已经成功处理的消息被再次消费。
如何避免重复消费?
在负载均衡模式下,并且在同一个消费者组中,如果不希望消息被重复消费,可以进行去重操作。具体方法如下:
消息唯一标识:为每条消息设置唯一的标识(如msgId或自定义的唯一key)。
去重逻辑:在消费者端实现去重逻辑,通过检查消息的唯一标识来判断消息是否已经被消费过。
查看官方文档可知:
RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
还是强烈推荐学习一下官方文档
其中实际过程要考虑原子性问题”是指确保判断消息是否存在和插入消息这两个操作要作为一个不可分割的整体进行,这样才能避免并发情况下的竞态条件,确保数据的一致性。
为了解决这个问题,我们需要确保“判断消息是否存在”和“插入消息”这两个操作是原子的,可以通过数据库的唯一约束(如唯一键)和原子操作来实现。
利用唯一约束和插入操作:
可以使用数据库的唯一键约束和插入操作的组合来确保原子性。如果插入操作因唯一键约束失败,则说明消息已经存在,避免重复消费。
唯一标识如何存储?
想法很好,但是消息的体量是非常大的,可能在生产环境中会到达上千万甚至上亿条,那么我们该如何选择一个容器来保存所有消息的标识,并且又可以快速的判断是否存在呢?
内存里的map可以吗?不行,内存重启就没了,且集群之间不共享内存。
redis可以吗?mysql可以吗?
Mysql可以使用去重表,在数据库操作中使用唯一约束,确保相同的操作不会被执行多次。而Redis的setnx命令天然就支持幂等。
使用redis的优点:高性能、数据结构丰富、分布式特性、TTL (过期时间)支持
使用redis的缺点:内存大小限制、持久化机制不如传统关系型数据库、数据丢失风险
使用MySQL 的优点:持久化存储、查询能力强、事务支持
使用MySQL 的缺点:性能瓶颈、扩展性差不如Redis、维护成本高
所以对于高并发大流量场景可以使用Redis,对于数据持久化要求高、历史数据分析需求强的场景可以使用Mysql
在实际项目中,可以根据具体需求,将两者结合使用
这里贴一下使用Mysql的方法:
生产者
@Test
void repeatProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("repeat-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();String key = UUID.randomUUID().toString();System.out.println(key);// 测试 发两个key一样的消息Message m1 = new Message("repeatTopic", null, key, "扣减库存-1".getBytes());Message m1Repeat = new Message("repeatTopic", null, key, "扣减库存-1".getBytes());producer.send(m1);producer.send(m1Repeat);System.out.println("发送成功");producer.shutdown();
}
消费者
/*** 幂等性(mysql的唯一索引, redis(setnx) )* 多次操作产生的影响均和第一次操作产生的影响相同* 新增:普通的新增操作 是非幂等的,唯一索引的新增,是幂等的* 修改:看情况* 查询: 是幂等操作* 删除:是幂等操作* ---------------------* 我们设计一个去重表 对消息的唯一key添加唯一索引* 每次消费消息的时候 先插入数据库 如果成功则执行业务逻辑 [如果业务逻辑执行报错 则删除这个去重表记录]* 如果插入失败 则说明消息来过了,直接签收了** @throws Exception*/@Testvoid repeatConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("repeatTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// 先拿keyMessageExt messageExt = msgs.get(0);String keys = messageExt.getKeys();// 原生方式操作Connection connection = null;try {connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&useSSL=false", "root", "123456");} catch (SQLException e) {e.printStackTrace();}PreparedStatement statement = null;try {// 插入数据库 因为我们 key做了唯一索引statement = connection.prepareStatement("insert into order_oper_log(`type`, `order_sn`, `user`) values (1,'" + keys + "','123')");} catch (SQLException e) {e.printStackTrace();}try {// 新增 要么成功 要么报错 修改 要么成功,要么返回0 要么报错statement.executeUpdate();} catch (SQLException e) {System.out.println("executeUpdate");if (e instanceof SQLIntegrityConstraintViolationException) {// 唯一索引冲突异常// 说明消息来过了System.out.println("该消息来过了");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}e.printStackTrace();}// 处理业务逻辑// 如果业务报错 则删除掉这个去重表记录 delete order_oper_log where order_sn = keys;// 要不然下次来的时候它依然还在里面 就没法去重试了System.out.println(new String(messageExt.getBody()));System.out.println(keys);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();}
布隆过滤器
还以选择布隆过滤器(BloomFilter)
布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。
在hutool的工具中我们可以直接使用,当然你自己使用redis的bitmap类型手写一个也是可以的。
在微服务架构中,各个服务实例通常运行在不同的机器或容器上,并且内存不共享,此时可以将布隆过滤器存储在分布式缓存系统中,比如 Redis 或 Memcached。这些缓存系统支持跨多个实例的数据共享,并且可以提供高效的读取和写入操作。比如使用 Redis 的布隆过滤器模块(如 RedisBloom)来创建和管理布隆过滤器。
还将布隆过滤器存储在一个共享的外部存储系统中,例如将布隆过滤器序列化并存储在数据库或共享文件系统中。这样所有的微服务实例都可以访问相同的布隆过滤器。
生产者
@Test
public void testRepeatProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-group");// 设置nameServer地址producer.setNamesrvAddr("localhost:9876");// 启动实例producer.start();// 我们可以使用自定义key当做唯一标识String keyId = UUID.randomUUID().toString();System.out.println(keyId);Message msg = new Message("TopicTest", "tagA", keyId, "我是一个测试消息".getBytes());SendResult send = producer.send(msg);System.out.println(send);// 关闭实例producer.shutdown();
}
添加hutool的依赖
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.11</version>
</dependency>
消费者
/*** 在boot项目中可以使用@Bean在整个容器中放置一个单例对象*/
public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100);@Test
public void testRepeatConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");consumer.setMessageModel(MessageModel.BROADCASTING);// 设置nameServer地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个主题来消费 表达式,默认是*consumer.subscribe("TopicTest", "*");// 注册一个消费监听 MessageListenerConcurrently是并发消费// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 拿到消息的keyMessageExt messageExt = msgs.get(0);String keys = messageExt.getKeys();// 判断是否存在布隆过滤器中if (bloomFilter.contains(keys)) {// 直接返回了 不往下处理业务return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 这个处理业务,然后放入过滤器中// do sth...bloomFilter.add(keys);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
重试机制
生产者重试:
@Test
public void retryProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();// 生产者发送消息 重试次数 一般也不会去改它 情况比较少见// 一般连接也不会断开 断开了重试也没用producer.setRetryTimesWhenSendFailed(2); // 同步消息重试次数 默认2次producer.setRetryTimesWhenSendAsyncFailed(2); // 异步消息重试次数 默认2次String key = UUID.randomUUID().toString();System.out.println(key);Message message = new Message("retryTopic", "vip1", key, "我是vip666的文章".getBytes());producer.send(message);System.out.println("发送成功");producer.shutdown();
}
// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);
在消费者放return ConsumeConcurrentlyStatus.RECONSUME_LATER;后就会执行重试,下面代码中说明了,我们在实际生产过程中,一般重试3-5次,如果还没有消费成功,则可以把消息签收了,通知人工等处理。
消息分为消息体和消息头,消息头里有延迟等级、队列di、broker名字、发送时间、重试次数等信息,如下所示我们就在业务逻辑里打出了重试次数
死信消息定义见下面代码注释,死信消息会被存放在一个死信主题中去 主题的名称:%DLQ%retry-consumer-group
,即%DLQ%消费者组名称
/*** 重试的时间间隔* 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h* 默认重试16次* 1.能否自定义重试次数 consumer.setMaxReconsumeTimes(2);* 2.如果重试了16次(并发模式) 顺序模式下(int最大值次)都是失败的? 是一个死信消息 则会放在一个死信主题中去 主题的名称:%DLQ%retry-consumer-group* 3.当消息处理失败的时候 该如何正确的处理? 再写一个消费者专门去消费死信消息* --------------* 重试的次数 一般的业务 5次就够了* @throws Exception*/
@Test
public void retryConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");// 设定重试次数consumer.setMaxReconsumeTimes(2);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);System.out.println(new Date());// 打印重试次数System.out.println(messageExt.getReconsumeTimes());System.out.println(new String(messageExt.getBody()));// 业务报错了 返回null 返回 RECONSUME_LATER 都会重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();System.in.read();
}
测试效果:
死信消息
上面的案例我们知道当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName
,死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。我们也可以去监听死信队列,然后进行自己的业务上的逻辑
死信队列监听逻辑
/// 直接监听死信主题的消息 消费是消费不了了 那么就记录下 通知人工接入处理
@Test
public void retryDeadConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("%DLQ%retry-consumer-group", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);System.out.println(new Date());System.out.println(new String(messageExt.getBody()));System.out.println("记录到特别的位置 文件 or mysql 通知人工处理");// 业务报错了 返回null 返回 RECONSUME_LATER 都会重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
但是现实场景里可能有几十个主题,那么就有几十个死信主题,那么写几十个消费者都去监听吗?不切实际了,所以现实开发一般这样写,在业务处理逻辑中对重试次数进行判断,如果超过某个次数了,就直接在业务逻辑里进行死信的记录、人工的通知逻辑等。有的小公司都不写这个逻辑,直接让运维去dashBoard定期检查死信主题里有没有消息,然后进行人工操作。
第二种方案 用法比较多@Test
public void retryConsumer2() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);System.out.println(new Date());// 业务处理try {handleDb();} catch (Exception e) {// 重试int reconsumeTimes = messageExt.getReconsumeTimes();if (reconsumeTimes >= 3) {// 不要重试了System.out.println("记录到特别的位置 文件 mysql 通知人工处理");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 业务报错了 返回null 返回 RECONSUME_LATER 都会重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}private void handleDb() {int i = 10 / 0;
} //模拟现实的业务处理 比如一些orderService userService的操作
SpringBoot集成RocketMQ
首先来搭建rocketmq-producer(消息生产者)
完整的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.11</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.powernode</groupId><artifactId>b-rocketmq-boot-p</artifactId><version>0.0.1-SNAPSHOT</version><name>b-rocketmq-boot-p</name><description>b-rocketmq-boot-p</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.25</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
修改配置文件application.yml
spring:application:name: rocketmq-producer
rocketmq:name-server: 你的服务ip:9876 # rocketMq的nameServer地址producer:group: powernode-group # 生产者组别send-message-timeout: 3000 # 消息发送的超时时间 默认就是3sretry-times-when-send-async-failed: 2 # 异步消息发送失败重试次数 默认就是2max-message-size: 4194304 # 消息的最大长度 默认就是这个 4M大小
我们在测试类里面测试发送消息
/*** 注入rocketMQTemplate,我们使用它来操作mq*/
@Autowired
private RocketMQTemplate rocketMQTemplate;/*** 测试发送简单的消息** @throws Exception*/
@Test
public void testSimpleMsg() throws Exception {// 往powernode的主题里面以同步的方式发送一个简单的字符串消息SendResult sendResult = rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");// 拿到消息的发送状态System.out.println(sendResult.getSendStatus());// 拿到消息的idSystem.out.println(sendResult.getMsgId());
}
这里rocketMQTemplate的send方法有很多,可以自己去尝试,然后看看,syncSend表示同步发送,asyncSend表示异步发送,sendOneWay表示单向发,sendOneWayOrderly表示单向顺序发,syncSendOrderly表示同步顺序发,asyncSendOrderly表示异步顺序发等等。
此时查看控制台可以看到这条消息。
然后搭建rocketmq-consumer(消息消费者)
完整的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.11</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.powernode</groupId><artifactId>c-rocketmq-boot-c</artifactId><version>0.0.1-SNAPSHOT</version><name>c-rocketmq-boot-c</name><description>c-rocketmq-boot-c</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.25</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
修改配置文件application.yml
spring:application:name: rocketmq-consumer
server:port: 8081
rocketmq:name-server: 你的服务ip:9876 # rocketMq的nameServer地址# 一个boot项目中可以写很多个消费者成像,但是一般在开发中一个boot项目只对应一个消费者# 主题名也一般不在这里指定 在listener里写
添加一个监听的类SimpleMsgListener
@Component // 需要交给IOC容器管理
// 注解里指定主题和消费者组
@RocketMQMessageListener(topic = "bootTestTopic", consumerGroup = "boot-test-consumer-group")
public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> { /*** 这个方法就是消费者的方法 注意要实现接口RocketMQListener* 如果接口泛型制定了固定的类型 那么onMessage就会把消息体转换成那个类型 比如String* 我们这里写成MessageExt 这个类型是消息的所有内容* ------------------------* 方法也不需要return* 没有报错 就签收了* 如果报错了 就是拒收 就会重试** @param message*/@Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}
然后启动rocketmq-consumer的启动项,查看控制台,发现我们已经监听到消息了
集成SpringBoot发送不同消息模式(同步消息)
上面是经典的同步发送消息,理解为:消息由消费者发送到broker后,会得到一个确认,是具有可靠性的。这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知等。
我们在上面的快速入门中演示的消息,就是同步消息,即
rocketMQTemplate.syncSend()
rocketMQTemplate.send()
rocketMQTemplate.convertAndSend()
这三种发送消息的方法,底层都是调用syncSend,发送的是同步消息
下面我们来玩一玩不同的消息模式
异步消息
rocketMQTemplate.asyncSend()
// 异步
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("失败" + throwable.getMessage());}
});
// 测试一下异步的效果
System.out.println("谁先执行");
// 挂起jvm 不让方法结束
System.in.read();
单向消息
这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送
// 单向
rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");
延迟消息
还记得我们使用rocketmq原生的api怎么发延迟消息的吗,我们是将消息new出来后,通过消息来设置延迟时间,但是这里的Message对象和之前的其实不同了,之前是org.apache.rocketmq.common.message.Message;现在是org.springframework.messaging.Message;这个消息对象没有给设置延迟时间的api,而是通过重载syncSend方法实现延迟时间的设置:
这里第三个参数是连接mq的超时时间,第四个是消息的延迟等级,这个delayLevel才对应我们之前的延迟消息发送设置。
// 延迟Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 3);
顺序消息
还是上一篇博客的订单类案例,看不懂的兄弟一定要先去看第一篇博客,对应起来学就很容易了。
这里也注意api的使用,第一个参数订阅主题,第二个参数对象,第三个参数传入一个HashKey取到某一个消息的唯一标识,用在我们选择哪一个队列时,这里我们传入订单号,这样同一个订单就会被放入同一个队列了,并且这里不需要我们手动判断选择哪个队列了。
// 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去 消费者 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer", 1, "下单"),new MsgModel("qwer", 1, "短信"),new MsgModel("qwer", 1, "物流"),new MsgModel("zxcv", 2, "下单"),new MsgModel("zxcv", 2, "短信"),new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {// 发送 第二个参数虽然是Object类型 但是一般都是以json的方式进行处理rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
});
这里的消费者代码因为与一般的代码有点区别,所以再单独写一下
@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",consumerGroup = "boot-orderly-consumer-group",consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式默认是并发模式 因为是顺序消息 要修改成单线程maxReconsumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);System.out.println(msgModel);}
}
重启rocketmq-consumer服务,可以看到是局部顺序消费的:
批量消息
List<String> messages = Arrays.asList(new String("批量消息-消息1"),new String("批量消息-消息2"),new String("批量消息-消息3"),new String("批量消息-消息4")
);
List<Message<?>> messageList = messages.stream().map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList());// RocketMQ每批次消息的最大字节限制是4MB,需根据此限制进行拆分
int size = messageList.size();
int batchSize = 100; // 每批发送100条消息
for (int i = 0; i < size; i += batchSize) {int end = Math.min(size, i + batchSize);List<Message<?>> subList = messageList.subList(i, end);rocketMQTemplate.syncSend("bootBatchTopic", subList);
}
}
由于RocketMQ每批次消息的最大字节限制是4MB,因此需要对消息进行分批处理发送。这里简单设定每批次发送100条消息,可以根据实际情况调整。
事务消息
这个可以先不学了,比较鸡肋,基本不用,后面我们会专门学分布式事务seata,它是专门解决分布式事务问题的。这里也附上代码,有兴趣可以学一下。
生产者
/*** 测试事务消息* 默认是sync(同步的)* 事务消息会有确认和回查机制* 事务消息都会走到同一个监听回调里面,所以我们需要使用tag或者key来区分过滤** @throws Exception*/
@Test
public void testTrans() throws Exception {// 构建消息体Message<String> message = MessageBuilder.withPayload("这是一个事务消息").build();// 发送事务消息(同步的) 最后一个参数才是消息主题TransactionSendResult transaction = rocketMQTemplate.sendMessageInTransaction("powernode", message, "消息的参数");// 拿到本地事务状态System.out.println(transaction.getLocalTransactionState());// 挂起jvm,因为事务的回查需要一些时间System.in.read();
}
消费者
/*** 事务消息的监听与回查* 类上添加注解@RocketMQTransactionListener 表示这个类是本地事务消息的监听类* 实现RocketMQLocalTransactionListener接口* 两个方法为执行本地事务,与回查本地事务*/
@Component
@RocketMQTransactionListener(corePoolSize = 4,maximumPoolSize = 8)
public class TmMsgListener implements RocketMQLocalTransactionListener {/*** 执行本地事务,这里可以执行一些业务* 比如操作数据库,操作成功就return RocketMQLocalTransactionState.COMMIT;* 可以使用try catch来控制成功或者失败;* @param msg* @param arg* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 拿到消息参数System.out.println(arg);// 拿到消息头System.out.println(msg.getHeaders());// 返回状态COMMIT,UNKNOWNreturn RocketMQLocalTransactionState.UNKNOWN;}/*** 回查本地事务,只有上面的执行方法返回UNKNOWN时,才执行下面的方法 默认是1min回查* 此方法为回查方法,执行需要等待一会* xxx.isSuccess() 这里可以执行一些检查的方法* 如果返回COMMIT,那么本地事务就算是提交成功了,消息就会被消费者看到** @param msg* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {System.out.println(msg);return RocketMQLocalTransactionState.COMMIT;}
}
测试发送事务,建议断点启动
- 消息会先到事务监听类的执行方法,
- 如果返回状态为COMMIT,则消费者可以直接监听到
- 如果返回状态为ROLLBACK,则消息发送失败,直接回滚
- 如果返回状态为UNKNOW,则过一会会走回查方法
- 如果回查方法返回状态为UNKNOW或者ROLLBACK,则消息发送失败,直接回滚
- 如果回查方法返回状态为COMMIT,则消费者可以直接监听到
发送对象消息和集合消息
这个上面已经说过了其实,主要就是监听的时候泛型中写对象的类型即可
生产者
/*** 测试发送对象消息** @throws Exception*/
@Test
public void testObjectMsg() throws Exception {Order order = new Order();order.setOrderId(UUID.randomUUID().toString());order.setOrderName("我的订单");order.setPrice(998D);order.setCreateTime(new Date());order.setDesc("加急配送");// 往powernode-obj主题发送一个订单对象rocketMQTemplate.syncSend("powernode-obj", order);
}
消费者也添加一个Order类(拷贝过来)
/*** 创建一个对象消息的监听* 1.类上添加注解@Component和@RocketMQMessageListener* 2.实现RocketMQListener接口,注意泛型的使用*/
@Component
@RocketMQMessageListener(topic = "powernode-obj", consumerGroup = "powernode-obj-group")
public class ObjMsgListener implements RocketMQListener<Order> {/*** 消费消息的方法** @param message*/@Overridepublic void onMessage(Order message) {System.out.println(message);}
}
发送集合消息
和对象消息同理,创建一个Order的集合,发送出去,监听方注意修改泛型中的类型为Object即可,这里就不做重复演示了
同时这里注意与发送批量消息进行区分,发送批量消息时,发送的类型是List<Message<?>>
,此时List里的消息批量发送到一个队列,才会被一条一条的消费。
消息标签,tag
我们从源码注释得知,tag带在主题后面用:来携带,感谢注释
我们往下去看源码,在
org.apache.rocketmq.spring.support.RocketMQUtil 的getAndWrapMessage方法里面看到了具体细节,我们也知道了keys在消息头里面携带
// topic:tag
rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");
消费者:
@Component
@RocketMQMessageListener(topic = "bootTagTopic",consumerGroup = "boot-tag-consumer-group",selectorType = SelectorType.TAG,// 选择类型 默认就是tag过滤模式selectorExpression = "tagA || tagB" // 默认"*"代表所有
// selectorType = SelectorType.SQL92,// sql92过滤模式
// selectorExpression = "a in (3,5,7)" // 相当于"3 || 5 || 7"
// SQL92需要broker.conf中开启enbalePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}
过滤模式也可以使用sql92模型,一般几乎不用,默认也是不支持的
需要在配置文件broker.conf中开启enbalePropertyFilter=true,查看源码可知其语法:
比如主题是bootTagTopic:3,那么可以这样写过滤表达式:“a > 5”,消息过不来,这种写法几乎用不到
消息key
// key是写带在消息头的
Message<String> message = MessageBuilder.withPayload("我是一个带key的消息").setHeader(RocketMQHeaders.KEYS, "qwertasdafg").build();
rocketMQTemplate.syncSend("bootKeyTopic", message);
在消费者里也可以取key,在消息头里,不再赘述。
负载均衡和广播,两种消息消费模式
之前说过Rocketmq消息消费的模式分为两种:负载均衡模式和广播模式
负载均衡模式表示多个消费者交替消费同一个主题里面的消息
广播模式表示每个每个消费者都消费一遍订阅的主题的消息
其中在注解中的messageModel属性可以配置消息模式,默认就是MessageModel.CLUSTERING
,即集群模式(负载均衡),点进源码可以看到另外一种模式是MessageModel.BROADCASTING
,即广播模式
下面进行演示,建两个类做两个消费者监听器DC1和DC2
/*** [CLUSTERING] 集群模式下 队列会被消费者分摊, 注意队列数量>=消费者数量 消息的消费位点 mq服务器会记录处理* BROADCASTING 广播模式下 消息会被每一个消费者都处理一次, mq服务器不会记录消费点位,也不会重试*/
@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-a",messageModel = MessageModel.CLUSTERING, // 集群模式 负载均衡consumeThreadNumber = 40)
public class DC1 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-a组的第一个消费者:" + message);}
}
@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-a",messageModel = MessageModel.CLUSTERING // 集群模式
)
public class DC2 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-a组的第二个消费者:" + message);}
}
发送10条消息测试一下,发现消息是被均匀分配的。
/ 测试消息消费模式 集群模块 广播模式@Test
void modeTest() throws Exception {for (int i = 1; i <= 10; i++) {rocketMQTemplate.syncSend("modeTopic", "我是第" + i + "个消息");}
}
再加一个消费者3,发现第2个消费者消费了5条消息。
这里再强调一下上篇博客的内容,如果是广播模式,那好说,一个组内所有的消费者都会拿到每个队列里的消息,如果是负载均衡模式,假设有c1、c2两个消费者,那么每个队列是要固定联系好消费者的,即比如队列1、2的消息只会给c1,2、3的消息只会给c2,比如4个队列都指定了消费者,组内再有一个消费者,那它永远没有消息。
所以最好队列数量>=组内消费者数量
另外一般最好先启动消费者再启动生产者,假设先启动生产者,然后有10条消息堆积,再启动消费者,消费者组可能有多个消费者,如果某个消费者c1启动的快,它可能认为该组只有它一个,就会去消费,等c2启动好就没有消息了。
再来玩一下广播模式,我们再开3个消费者,到另一个消费者组去,也订阅这个主题,两个消费者组都能消费到消息,观察一下它们的区别,DC4~DC6的代码就贴个DC4:
@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-b",messageModel = MessageModel.BROADCASTING // 广播模式
)
public class DC4 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-b组的第一个消费者:" + message);}
}
然后我们发5条消息,观察发现b组消费了15条消息,a组只消费了5条。
观察dashboard也可发现,a组是负载均衡模式会分配着来,差值都为0,正常被消费,而b组的终端都是1个,且差值不对应,还有差值,因为广播模式终端是不会帮你处理点位的,即广播模式下,消息消费成功与否都不会重试!
消息堆积问题
当差值很大,即MQ里堆积了很多消息没有消费,一般认为单条队列消息差值>=10w时 算堆积问题。
这个问题如何解决?这个问题要分析问题发生的情况来分析
什么情况下会出现堆积:1. 生产太快了;2. 消费者消费出现问题
如果是生产太快了,那么
- 生产方可以做业务限流,常见的限流算法我这篇博客有写
- 增加消费者数量,但是也得消费者数量<=队列数量,适当的设置最大的消费线程数量(根据任务划分,mq大部分处理的是IO密集型(2n)/如果是CPU计算型(n+1))。这个可以在
@RocketMQMessageListener
注解的consumeThreadNumber
属性配置 - 动态扩容队列数量,从而增加消费者数量(dashboard可以操作,注意别在消息还没消费完的时候去缩容,要不然消息就没了)
如果是消费者消费出现问题,那么就排查消费者程序的问题,还有就是优化消费逻辑,减少单条消息的处理时间。我这篇博客有写已经上线的bug如何排查,还有这篇博客也写了JVM的排查
除此之外要监控系统的消息堆积情况,及时扩容 Broker 或调整负载均衡策略。有些第三方监控工具专门用于监控消息中间件的状态,例如 Prometheus、Grafana、Datadog 等,也可以自定义监控脚本。
PS:补充一些很少使用的操作
dashboard还提供了跳过堆积的操作,选中对应的消费者组,该堆积的消息将不会被该消费者消费了,即位点后移。
还有重置消费位点的操作,消费过的消息想再次消费,那么可以前移消费位点。可以选择给某个消费者组的位点重置到前面的某个时间上。
消息丢失问题
broker刷盘丢失
这是最容易发生丢失问题的情况。
生产者把消息发送给broker后,broker要对消息进行持久化,持久化分为同步刷盘和异步刷盘。同步刷盘会在broker端开启一块磁盘区进行顺序IO,等到持久化成功然后告诉生产者,同步刷盘比较安全一般不会出现丢失问题,但是性能不高。而异步刷盘有一个内存buffer,统一刷盘,性能高,但是容易出现消息丢失问题。
解决这个问题可以采用同步刷盘但是性能很低,大部分情况下不会使用同步刷盘,如果使用异步刷盘如何避免消息丢失,消息丢失不可怕,怕不知道哪一条丢了,那么我们可以在生产者端也进行持久化,发送消息后自己持久化log到文件或者mysql中。保存消息的key creatTime status(发送成功),消费者在处理完业务逻辑后,去更新这个消息的状态,key handleTime status(处理成功)。然后我们可以通过定时任务定期检查数据库,哪些消息发送很久但还没被消费,进行补发,同时结合幂等操作防止重复消费。
这个问题确实深挖起来还是有很多可以讲的,收集了在网上一些博主对这个问题的看法,消息丢失问题可能因为这些原因
在消息发送端:
- Producer故障,在消息成功发送broker之前,Producer发送故障或者宕机
- 网络中断,在Producer与Broker之间的消息传递过程中,网络连接中断或延迟
- Broker未确认,接收到消息后由于某些原因未能向Producer发送确认
解决方案:增强发送端的可靠性
- 对于同步发送,确保消息发送成功后才继续后续操作。
- 对于异步发送,设置回调函数,使用重试机制,Producer在发送消息的时候如果没有收到Broker的确认,可以采用重试策略
- 事务消息,rocketmq支持事务消息,可以确保在特定业务的操作中消息发送的可靠性
在消息存储端:
- 单点故障,如果消息只保存在Master Broker上,那么一旦Master发生故障,消息就可能丢失,
- 磁盘故障,存储消息的磁盘损坏会导致数据丢失
解决方案:强化消息存储的安全性
- Master-Slave同步,确保每一条消息都在Master和至少一个Slave上有保存
- 设置合理的刷盘策略,例如同步刷盘(SYNC_FLUSH)模式,确保消息写入磁盘后才返回成功
- 使用RAID,在硬件层面采用RAID技术来增强磁盘的容错能力。(RAID(Redundant Array of Independent Disks)技术是一种数据存储技术,通过将多个独立的硬盘组合起来,以提高数据的可靠性、性能或者两者兼顾的存储系统。)
在消息中间件Broker端:
- 同步延迟,Master与Slave之间的同步延迟会导致Master的节点未及时同步到Slave节点
- Slave故障,在同步的过程中Slave宕机或出现问题
解决方案:优化消息同步策略
- 选择适合当前业务特性的Master-Slave同步机制,比如同步复制或者异步复制
- 增加监控的机制去实时监控Master和Slave的同步状态,一旦出现延迟或者故障立刻触发告警(Dashboard 可视化、编写监控工具设计合理的监控指标进行历史数据分析) 有些第三方监控工具专门用于监控消息中间件的状态,例如 Prometheus、Grafana、Datadog 等。
在消息消费端:
- 消费者在处理消息时出现异常,消息未能正确处理,导致消息丢失
- 消费者在处理消息后未能正确提交 offset,导致重复消费或丢失消息
解决方案:强化消息消费的可靠性
- 在消费逻辑中手动增加异常处理机制,确保消息处理失败时能够重新消费,也能自定义去查消息体里的重试次数,避免自动重试过多,也避免额外的死信监听
- 使用手动提交 offset 的方式,确保在消息成功处理后才提交 offset
- 使用幂等性设计,确保消息重复消费时不会造成业务错误
还有就是消息堆积导致丢失,这个可见上一小节。
消息轨迹监控
rocketmq提供了消息轨迹监控功能,会影响性能,一般不开。
1.在broker.conf中开启消息追踪 traceTopicEnable=true
2.重启broker即可
3.生产者的yml配置文件开启发送方消息轨迹 enable-msg-trace: true
原生的api直接在定义消费者组的时候有个参数enableMsgTrace可以指定是否开启
4. 消费者开启消息轨迹功能,可以给单独的某一个消费者开启,在注解的enableMsgTrace属性中指定开启 enableMsgTrace = true
在rocketmq的面板中可以查看消息轨迹(通过message_id或者key查看)
默认会将消息轨迹的数据存在 RMQ_SYS_TRACE_TOPIC 主题里面
消息安全问题
RocketMQ 的消息安全问题涉及到消息的传输安全、身份认证、权限控制等方面。以下是 RocketMQ 消息安全的几个主要方面及其解决方案:
-
传输安全:
-
- 问题:在消息传输过程中,可能存在窃听、篡改等安全威胁,导致消息泄露或被篡改。
-
- 解决方案:使用 TLS/SSL 加密协议对消息进行传输加密,确保消息在网络传输过程中的机密性和完整性。
-
身份认证:
-
- 问题:在分布式系统中,需要对客户端和服务端进行身份认证,以防止未经授权的用户访问消息系统。
-
- 解决方案:采用身份认证机制,如用户名密码、JWT(JSON Web Token)等方式对客户端进行身份认证,确保只有经过授权的用户能够访问消息系统。
-
访问控制:
-
- 问题:需要对用户访问消息队列的权限进行控制,以确保用户只能访问其有权访问的队列。
-
- 解决方案:实现访问控制列表(ACL)机制,对消息队列进行权限控制,包括读取和写入权限等。只有具有相应权限的用户才能对队列进行操作。
-
数据加密:
-
- 问题:在消息存储和传输过程中,需要对消息内容进行加密,以保护消息的机密性。
-
- 解决方案:对消息内容进行加密处理,确保消息在存储和传输过程中的机密性。可以使用对称加密算法或非对称加密算法对消息内容进行加密。
-
防止重放攻击:
-
- 问题:在消息传输过程中,可能会受到重放攻击,即攻击者重复发送已经被记录的有效消息,以达到非法访问或篡改消息的目的。
-
- 解决方案:采用消息序列号、时间戳、令牌等方式对消息进行防重放处理,确保每条消息只能被处理一次。
-
日志审计:
-
- 问题:需要对消息系统的操作进行日志审计,以便追溯和监控用户的操作行为。
-
- 解决方案:记录用户对消息系统的操作日志,包括用户的登录、访问权限、消息发送和接收等操作,以便于后续审计和监控。
其中访问控制具体操作如下:
一、开启acl的控制 在broker.conf中开启aclEnable=true
二、配置账号密码 修改plain_acl.yml(跟broker.conf同级目录),文件内容如下所示
globalWhiteRemoteAddresses是全局白名单,accounts是账号,密码必须大于8位,默认给了两个案例账号,比如第一个账号,admin:false说明不是管理员账号,第二个账号是,管理员是可以随便操作的。再看第一个账号,不是管理员账号,defaultTopicPerm默认权限是拒绝,只能SUB(只读),PUB是发表,他给一些主题设置了权限,主题A不能读不能写,B能读能写,C只读。对group也设置了权限。
另外账号2我们要修改一个地方,他的白名单是192.168.1.*,比如我们的ip是192.168.206.186,那么可以修改成白名单是192.168.*.*
三、dashboard的配置文件也要改,虽然你是通过nameserver监控broker,但是broker现在有acl了,你也得认证。
修改控制面板的配置文件 (控制面板就是个jar包,改里面的application.properties),放开52/53行 ,如果想把面板的登录也弄个账号密码那就把49行改为true
把修改好的application.properties上传到服务器的jar包平级目录下即可 覆盖内部的配置文件
配置文件的加载优先级:当前项目project所在目录的/config目录下 > 当前项目project所在目录下 > classpath的/config目录 > classpath的根目录
重启服务发现面板需要登录账号密码了,面板的账号密码被定义在application.properties同级的users.properties下,管理员默认账号密码是admin和admin
生产者和消费者使用密码也很简单,注意跟dashboard面板的的账号密码不是同一个。在yml文件里配置,否则发消息会报错没有权限访问,消费者也是
rocketmq:name-server: 你的ip:9876 # rocketMq的nameServer地址consumer:access-key: rocketmq2secret-key: 12345678
rocketmq:name-server: 你的ip:9876 # rocketMq的nameServer地址producer:group: boot-producer-group # 生产者组的名字access-key: rocketmq2secret-key: 12345678
底层设计小探
官网提供源码版本的下载,以后学习某个新技术也是这个思路,先了解它是什么,然后快速上手,搭起来把简单的api过一遍,先入门再说。然后再学习它的深入特性和原理,最后是源码级别的学习。
官网提供的文档其实都很详细,也提供了源码的下载:
解压完之后是一个项目,里面是各种各样的东西的实现,比如broker的实现,nameserver的实现,源码的注释比较少,但是结构很清晰,源码的学习也是大牛的必经之路,
作为小白我们先不看它的源码,先去看它的docs,大部分的问题,其实在文档里都会有写。最重要的第一步,先去看它的总体设计,即下图的design.md,工作中更多的是根据自己的需要去查找相关的文档及源码。在这里偷懒,我就不贴阅读体会了。