RabbitMQ的简单使用
- 为什么要使用消息队列?
- RabbitMQ的组成
- 案例
- spring整合RabbitMQ
- 一个消费者实现
- 多个消费者实现
- 三种交换器
- 基于注解的形式生成队列、交换器以及绑定关系
- 消息转换器
为什么要使用消息队列?
消息队列是一种不同系统或者服务之间进行异步通信的一种方式。在很多业务中都可以使用到消息队列,例如存在订单秒杀活动,我们可以通过异步的方式,先判断库存和是否是一人一单,校验成功后生成订单id放入消息队列中,直接返回,由消息消费者监听队列获取订单信息来进行对数据库的修改库存和生成订单操作,还有微服务的多个服务之中,可以使用消息队列来进行通信,而RabbitMQ是目前最流行的消息中间件之一,我们来简单的了解它。
RabbitMQ的组成
生产者:消息的生产者
队列(queue):生产者生产消息可以直接放入队列中
交换器(Exchanger):消息的路由器,消息生产之后可以交给交换器,由交换器负责将消息路由到队列中,交换器一共有三种,分别是direct、fanout、topic
消费者:从消息队列中获取消息并且做出处理
虚拟主机:虚拟主机包含交换器和队列,多个用户可以设置不同的虚拟主机的访问,起到数据隔离的作用
模型如下图
案例
spring整合RabbitMQ
1、导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、配置属性文件,让生产者和消费者连接RabbitMQ
spring:rabbitmq://ip地址host: //端口号port: 5672//虚拟主机名称virtual-host://RabbitMQ用户名和密码 username: password:
一个消费者实现
生产者:
@SpringBootTest
public class SpringAmqpTest {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessage2Queue() {String queueName = "changedb";String message = "我爱你";rabbitTemplate.convertAndSend(queueName, message);}}
消费者:
@Component
public class RedisMQListener {
//通过注解自动监听队列消息@RabbitListener(queues = "changedb")public void listenMessage(String msg) throws InterruptedException {System.out.println("收到消息发布者的消息:"+msg);}
结果:
多个消费者实现
如果消息生产者生产消息过快,导致消费者处理不及,可以添加消费者个数,防止消息堆积
如果由多个性能不同的消费者:
队列1可以在1秒内处理五十条消息,队列2可以在一秒内处理5条消息
消费者:
@RabbitListener(queues = "changedb")public void listenMessage(String msg) throws InterruptedException {System.out.println("队列1收到消息发布者的消息:"+msg);Thread.sleep(20);}@RabbitListener(queues = "changedb")public void listenMessage2(String msg) throws InterruptedException {System.err.println("队列2收到消息发布者的消息:"+msg);Thread.sleep(200);}
生产者一秒内发送50条数据
生产者:
@Testvoid testSendMessage3Queue() throws InterruptedException {String queueName = "changedb";for (int i = 0; i < 50; i++) {rabbitTemplate.convertAndSend(queueName, i);Thread.sleep(20);}}
结果:
如上图,RabbitMQ的队列将消息轮询给每个消费者,不考虑消费者的性能,导致消费者2的消息堆积,我们可以通过配置让消费者每次只获得一个消息,等消息处理完后才可以再次获取消息
spring:rabbitmq://ip地址host: //端口号port: 5672//虚拟主机名称virtual-host://RabbitMQ用户名和密码 username: password: listener:simple:prefetch: 1
优化后的结果:
三种交换器
fanout:将收到的消息路由到每一个绑定的队列中
direct:队列通过路由键和交换器绑定,路由消息时通过路由键将消息路由到匹配的队列
topic:和direct类似,但是可以通过通配符路由多个队列,更加简单,#表示0个或多个字符串,*表示1一个字符串
基于注解的形式生成队列、交换器以及绑定关系
消费者:
@RabbitListener(bindings = @QueueBinding(//生成队列,name:队列名称,durable:是否持久化value = @Queue(name = "topic.queue1",durable = "true"),//生成交换器,anme:交换器名称,type:交换器类型exchange = @Exchange(name = "hmdp.topic",type = ExchangeTypes.TOPIC),//key:路由键匹配key = "china.*"))public void listenTopicMessage(String msg) throws InterruptedException {System.out.println("中国一条频道收到消息发布者的消息:"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2",durable = "true"),exchange = @Exchange(name = "hmdp.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicMessage2(String msg) throws InterruptedException {System.out.println("中国任意频道收到消息发布者的消息:"+msg);}
生产者:
@Testvoid testSendMessage9Queue() throws InterruptedException {String exchanger = "hmdp.topic";String msg = "关于中国的新闻";String routeKey = "china.news.ee";rabbitTemplate.convertAndSend(exchanger, routeKey, msg);}
消息转换器
生产者写入一个对象
@Testvoid testSendMessageConvertQueue() throws InterruptedException {Map<String, Object> map = new HashMap<>();map.put("dashda","123");map.put("dskkadls",898);String queueName="object.queue";rabbitTemplate.convertAndSend(queueName,map);}
我们查看消息队列,发现存入的是字节,消息的可读性差,并且占据的空间极大,我们导入jackson的依赖,使其对消息进行转换
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>
@Beanpublic MessageConverter jacksonMessageConvertor() {return new Jackson2JsonMessageConverter();}
存入的结果:
我们发现其可读性刚好,并且存入占据的空间更小