目录
一、介绍
二、案例演示
2.1. 创建SpringBoot工程
2.2. 父工程pom依赖
2.3. 生产者pom依赖
2.4. 生产者配置文件
2.5. 生产者代码
2.6. 消费者RabbitMQConfig
2.7. 消费者pom依赖
2.8. 消费者配置文件
2.9. 消费者核心代码
2.10. 运行效果
2.11. 取消自定义转换器效果
一、介绍
在RabbitMQ中,消息转换器(Message Converter)用于在发送消息时将Java对象转换为RabbitMQ可以理解的字节流,以及在消费者接收消息时将字节流转换回Java对象。
在实际的RabbitMQ应用中,你会配置转换器作为Spring AMQP的一部分,并在发送和接收消息时由Spring框架自动应用转换。如果需要特定的序列化行为,你可以实现自定义的转换器。
Spring对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的,而默认实现是SimpleMessageConverter。
基于JDK的ObjectOutputStream完成序列化存在下列问题:
1. JDK的序列化有安全风险
2. JDK序列化的消息太大
3. JDK序列化的消息可读性差
建议采用JSON序列化代替默认的JDK序列化,要做两件事情:
在publisher和consumer中都要引入jackson依赖:
在publisher和consumer中都要配置MessageConverter:
我们这边作为演示就直接配在启动类里,实际项目中可以单独定义转换器类
二、案例演示
案例需求:
测试利用SpringAMQP发送对象类型的消息 声明一个队列,名为object.queue
编写单元测试,向队列中直接发送一条消息,消息类型为Map
在控制台查看消息,总结你能发现的问题
2.1. 创建SpringBoot工程
完整的工程目录结构及代码文件如下:
2.2. 父工程pom依赖
引入核心依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>mq-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>mq-demo</name><description>mq-demo</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.14.3</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
2.3. 生产者pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>publisher</artifactId><version>0.0.1-SNAPSHOT</version><name>publisher</name><description>publisher</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
2.4. 生产者配置文件
spring:rabbitmq:# 主机host: 127.0.0.1# 端口port: 5672# 默认用户密码是guest guest 如果需要自己创建新用户,参看我的第四章节内容username: Wangzhexiaopassword: Wangzhexiao# 默认的虚拟主机是/ 如果需要自己创建虚拟主机,参看我的第四章节内容virtual-host: /hangzhou
2.5. 生产者代码
SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。发送消息代码如下:
package com.example.publisher;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;import java.util.HashMap;
import java.util.Map;@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMessage() throws InterruptedException {String exchangeName = "wzx.topic";Map<String,Object> msg = new HashMap<>(2);msg.put("name", "wangzhexiao");msg.put("motto", "只要学不死,就往死里学!");rabbitTemplate.convertAndSend(exchangeName,"china.news", msg);}
}
package com.example.publisher;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class PublisherApplication {public static void main(String[] args) {SpringApplication.run(PublisherApplication.class, args);}@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
2.6. 消费者RabbitMQConfig
package com.example.consumer;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@BeanQueue topicQueue1() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("topic.queue1").build();}@BeanQueue topicQueue2() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("topic.queue2").build();}@BeanTopicExchange topicExchange() {return new TopicExchange("wzx.topic");}@BeanBinding binding1(Queue topicQueue1, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue1).to(topicExchange).with("china.#"); // 绑定队列和交换机}@BeanBinding binding2(Queue topicQueue2, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue2).to(topicExchange).with("#.news"); // 绑定队列和交换机}
}
2.7. 消费者pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>consumer</name><description>consumer</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.34</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
2.8. 消费者配置文件
spring:rabbitmq:# 主机host: 127.0.0.1# 端口port: 5672# 默认用户密码是guest guest 如果需要自己创建新用户,参看我的第四章节内容username: Wangzhexiaopassword: Wangzhexiao# 默认的虚拟主机是/ 如果需要自己创建虚拟主机,参看我的第四章节内容virtual-host: /hangzhou
2.9. 消费者核心代码
SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:
package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "topic.queue1")public void listener1(Map<String, Object> msg) throws InterruptedException {System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");}@RabbitListener(queues = "topic.queue2")public void listener2(Map<String, Object> msg) throws InterruptedException {System.err.println("消费者2:人生是个不断攀登的过程【" + msg + "】");}
}
2.10. 运行效果
通过自定义的消息转换器,我们可以看到在RabbitMQ浏览器界面中消息内容为正常的中文:
2.11. 取消自定义转换器效果
package com.example.consumer;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}}
我们删除生产者启动类中声明的转换器代码,然后单独启动生产者并进入RabbitMQ浏览器界面,查看队列中的消息,可以看到消息内容为一串序列化后的乱码: