八、RabbitMQ队列详解
1、队列属性
-
Type
设置队列的队列类型;
-
Name
队列名称,就是一个字符串,随便一个字符串就可以;
-
Durability
声明队列是否持久化,代表队列在服务器重启后是否还存在;
-
Auto delete:
是否自动删除
如果为true,当没有消费者连接到这个队列的时候,队列会自动删除;
-
Exclusive
exclusive属性的队列只对第一个连接它的消费者可见(之后其它消费者无法访问该队列),并且在连接断开时自动删除
基本上不设置它,设置成false
-
Arguments:队列的其他属性,例如指定DLX(死信交换机等);
-
x-expires:Number
当Queue(队列)在指定的时间未被访问则队列将被自动删除
-
x-message-ttl:Number
发布的消息在队列中存在多长时间后被取消(单位毫秒)
-
x-overflow:String
设置队列溢出行为
当达到队列的最大长度时消息的处理方式
-
drop-head:删除头部消息
-
reject-publish:超过队列长度后,后面发布的消息会被拒绝接收
-
reject-publish-dlx:超过队列长度后,后面发布的消息会被拒绝接收并发布到死信交换机
仲裁队列类型仅支持:drop-head and reject-publish两种
-
-
x-max-length:Number
队列所能容下消息的最大长度,当超出长度后新消息将会覆盖最前面的消息
-
x-max-length-bytes:Number
队列在开始从头部删除就绪消息之前可以包含的总正文大小
受限于内存大小,超过该阈值则从队列头部开始删除消息
-
x-single-active-consumer:默认为false
表示队列是否是只能有一个消费者
设置为true时注册的消费组内只有一个消费者消费消息,其他被忽略
设置为false时消息循环分发给所有消费者(默认false)
-
x-dead-letter-exchange:String
指定队列关联的死信交换机
有时候我们希望当队列的消息达到上限后溢出的消息不会被删除掉,而是走到另一个队列中保存起来;
-
x-dead-letter-routing-key:String
指定死信交换机的路由键
一般和7一起定义
-
x-queue-mode:String(理解下即可)
队列类型x-queue-mode=lazy懒队列,在磁盘上尽可能多地保留消息以减少RAM使用
如果未设置则队列将保留内存缓存以尽可能快地传递消息;
如果设置则队列消息将保存在磁盘上,消费时从磁盘上读取消费
-
x-queue-master-locator:String(用的较少)
在集群模式下设置队列分配到的主节点位置信息;
每个queue都有一个master节点,所有对于queue的操作都是事先在master上完成,之后再slave上进行相同的操作;
每个不同的queue可以坐落在不同的集群节点上,这些queue如果配置了镜像队列,那么会有1个master和多个slave
基本上所有的操作都落在master上,那么如果这些queues的master都落在个别的服务节点上,而其他的节点又很空闲,这样就无法做到负载均衡,那么势必会影响性能;
关于master queue host 的分配有几种策略,可以在queue声明的时候使用x-queue-master-locator参数,或者在policy上设置queue-master-locator,或者直接在rabbitmq的配置文件中定义queue_master_locator有三种可供选择的策略
-
min-masters:选择master queue数最少的那个服务节点host;
-
client-local:选择与client相连接的那个服务节点host;
-
random:随机分配;
-
2、属性测试
8.2.1、非持久化属性测试
测试模块rabbitmq-08-properties-01
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ
server:port: 8080spring:application:name: properties-learn1rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
生产者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{Message message = MessageBuilder.withBody("hello world2".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME2, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.1";// 队列public static final String QUEUE_NAME1 = "queue.properties.1";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.test1.1";public static final String ROUTING_NAME2 = "key.properties.test1.2";}
定义MQ队列
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列* 不持久化、不设置名字** @return*/@Beanpublic Queue normalQueue1() {return QueueBuilder.nonDurable().build();}/*** 正常队列* 不持久化、设置名字** @return*/@Beanpublic Queue normalQueue2() {return QueueBuilder.nonDurable(RabbitMQConstant.QUEUE_NAME1).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue1* @return*/@Beanpublic Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue2* @return*/@Beanpublic Binding bindingNormal2(DirectExchange normalExchange, Queue normalQueue2) {return BindingBuilder.bind(normalQueue2).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME2);}}
发送消息
package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties01Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties01Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
测试
-
发行消息查看控制台
-
重启服务器查看队列是否存在
8.2.2、持久化测试
是否自动删除
如果为true,当没有消费者连接到这个队列的时候,队列会自动删除
测试模块rabbitmq-08-properties-02
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ
server:port: 8080spring:application:name: properties-learn2rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
生产者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{Message message = MessageBuilder.withBody("hello world2".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME2, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.2";// 队列public static final String QUEUE_NAME1 = "queue.properties.2";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.test1.2";public static final String ROUTING_NAME2 = "key.properties.test2.2";}
定义MQ队列
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列* 持久化、不设置名字** @return*/@Beanpublic Queue normalQueue1() {return QueueBuilder.durable().build();}/*** 正常队列* 持久化、设置名字** @return*/@Beanpublic Queue normalQueue2() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue1* @return*/@Beanpublic Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue2* @return*/@Beanpublic Binding bindingNormal2(DirectExchange normalExchange, Queue normalQueue2) {return BindingBuilder.bind(normalQueue2).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME2);}}
发送消息
package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties02Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties02Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
测试
-
发送消息查看控制台
-
重启服务器查看控制台
8.2.3、自动删除测试
测试模块rabbit-08-properties-03
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ
server:port: 8080spring:application:name: properties-learn3rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
生产者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{Message message = MessageBuilder.withBody("hello world2".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME2, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
消费者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReceivemessageService {@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})public void receiveMsg1(Message message) {log.info("1接收到的消息为:{}", new String(message.getBody()));}@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME2})public void receiveMsg2(Message message) {log.info("2接收到的消息为:{}", new String(message.getBody()));}
}
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.3";// 队列public static final String QUEUE_NAME1 = "queue.properties.test1.3";public static final String QUEUE_NAME2 = "queue.properties.test2.3";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.test1.3";public static final String ROUTING_NAME2 = "key.properties.test2.3";}
定义MQ队列
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列1** @return*/@Beanpublic Queue normalQueue1() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).autoDelete()// 设置自动删除.build();}/*** 正常队列2** @return*/@Beanpublic Queue normalQueue2() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME2)// .autoDelete()// 设置不自动删除,默认就是不自动删除.build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue1* @return*/@Beanpublic Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue2* @return*/@Beanpublic Binding bindingNormal2(DirectExchange normalExchange, Queue normalQueue2) {return BindingBuilder.bind(normalQueue2).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME2);}}
发送消息
package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties03Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties03Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
测试
-
有消费者连接时
-
无消费连接时
8.2.4、可见性测试
普通队列允许的消费者没有限制,多个消费者绑定到同一个队列时,RabbitMQ会采用轮询进行投递
如果需要消费者独占队列,在队列创建的时候,设定属性exclusive为true。
exclusive有两个作用
-
当连接关闭时connection.close()该队列是否会自动删除;
-
该队列是否是私有的private
如果设置为false则可以使用两个消费者都访问同一个队列,没有任何问题
如果设置为true则会对当前队列加锁,其他连接connection是不能访问的,同一个连接的不同channel是可以访问的。如果强制访问会报如下异常:
It could be originally declared on another connection or the exclusive property value does not match that of the original declaration., class-id=60, method-id=20)
测试模块rabbit-08-properties-04
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ
server:port: 8080spring:application:name: properties-learn4rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
生产者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{Message message = MessageBuilder.withBody("hello world2".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.4";// 队列public static final String QUEUE_NAME1 = "queue.properties.test1.4";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.test1.4";
}
定义MQ队列
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列1** @return*/@Beanpublic Queue normalQueue1() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).exclusive()//声明只有第一个连接的消费者可见,(之后其它消费者无法访问该队列),并且在连接断开时自动删除//.autoDelete()// 设置自动删除.build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue1* @return*/@Beanpublic Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}
消费者
这里定义的消费者表示是在同一个Connection中消费消息的多个消费者,用来测试是否同一个Connection中的多个消费者可以消费
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReceivemessageService {@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})public void receiveMsg1(Message message) {log.info("消费者1接收到的消息为:{}", new String(message.getBody()));}@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})public void receiveMsg2(Message message) {log.info("消费者2接收到的消息为:{}", new String(message.getBody()));}
}
发送消息
这里面除了发送消息,也定义两个Connection连接测试是否允许不同Connection的连接访问
package com.longdidi;import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.service.SendMessageService;
import com.rabbitmq.client.DefaultConsumer;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.rabbitmq.client.*;import java.io.IOException;@SpringBootApplication
public class Rabbitmq08Properties04Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties04Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();{//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置工厂参数factory.setHost("192.168.1.101");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("123456");factory.setVirtualHost("longdidi");Connection connection = factory.newConnection();//3.创建channelChannel channel = connection.createChannel();DefaultConsumer consumer = new DefaultConsumer(channel) {/*回调方法 当收到信息 自动执行该方法consumerTag*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("连接1接收到信息:" + new String(body));}};channel.basicConsume(RabbitMQConstant.QUEUE_NAME1, true, consumer);// 6.释放资源channel.close();connection.close();}{//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置工厂参数factory.setHost("192.168.1.101");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("123456");factory.setVirtualHost("longdidi");Connection connection = factory.newConnection();//3.创建channelChannel channel = connection.createChannel();DefaultConsumer consumer = new DefaultConsumer(channel) {/*回调方法 当收到信息 自动执行该方法consumerTag*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("连接2接收到信息:" + new String(body));}};channel.basicConsume(RabbitMQConstant.QUEUE_NAME1, true, consumer);// 6.释放资源channel.close();connection.close();}}
}
测试
-
启动程序查看
-
查看同一Connection内的多个消费者是否可以连接
注释掉主程序中的两个Connection连接代码重启测试
-
测试自动删除
停止程序运行查看队列是否自动删除
8.2.5、Arguments测试
(1)、删除属性测试
x-expires属性:当Queue(队列)在指定的时间未被访问则队列将被自动删除
测试模块rabbitmq-08-properties-05
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ
server:port: 8080spring:application:name: properties-learn5rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
生产者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.5";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.5";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.5";
}
定义MQ队列
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-expires", 10000L); //当Queue(队列)在指定的时间未被访问则队列将被自动删除return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}
发送消息
package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties05Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties05Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
测试
-
启动应用查看控制台
-
等待超时后查看控制台
(2)、设置队列过期时间
发布的消息在队列中存在多长时间后被取消(单位毫秒)
测试模块rabbitmq-08-properties-06
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ
server:port: 8080spring:application:name: properties-learn6rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
生产者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.6";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.6";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.6";
}
定义MQ队列
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-message-ttl", 5000L);//发布的消息在队列中存在多长时间后被取消(单位毫秒)return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}
发送消息
package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties06Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties06Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
测试
-
发送消息查看控制台
-
超时后查看控制台
(3)、设置队列长度
x-max-length:Number
队列所能容下消息的最大长度,当超出长度后新消息将会覆盖最前面的消息
测试模块rabbitmq-08-properties-07
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ
server:port: 8080spring:application:name: properties-learn7rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
生产者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {for (int i = 0; i < 8; i++) {String str = "hello world1" + i;Message message = MessageBuilder.withBody(str.getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.7";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.7";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.7";
}
定义MQ队列
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-max-length", 5);// 队列的溢出行为,删除头部return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}
发送消息
package com.longdidi;import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.service.SendMessageService;
import com.rabbitmq.client.DefaultConsumer;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.rabbitmq.client.*;import java.io.IOException;@SpringBootApplication
public class Rabbitmq08Properties07Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties07Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
测试
- 查看队列消息长度
-
查看存活消息
(4)、设置队列溢出行为
设置队列溢出行为
当达到队列的最大长度时消息的处理方式:有效值为drop-head(删除头部消息)、reject-publish(拒绝发布)或reject-publish-dlx(拒绝发布到死信交换机)
仲裁队列类型仅支持:drop-head and reject-publish两种
测试模块rabbitmq-08-properties-08
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ
server:port: 8080spring:application:name: properties-learn6rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
生产者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {for (int i = 0; i < 8; i++) {String str = "hello world1" + i;Message message = MessageBuilder.withBody(str.getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.8";public static final String EXCHANGE_DLX_NAME = "exchange.properties.dlx.8";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.8";public static final String QUEUE_DLX_NAME1 = "queue.properties.dlx.8";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.8";public static final String ROUTING_DLX_NAME1 = "key.properties.dlx.8";
}
定义MQ队列
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-max-length", 5);// 队列的溢出行为,删除头部// arguments.put("x-overflow", "drop-head");// 队列的溢出行为,删除头部(默认行为)//arguments.put("x-overflow", "reject-publish");//队列的溢出行为,拒绝发布arguments.put("x-overflow", "reject-publish-dlx");//队列的溢出行为,拒绝接收消息,超过长度的消息会被发送到死信交换机而不是拒绝接收return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).deadLetterExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).deadLetterRoutingKey(RabbitMQConstant.ROUTING_DLX_NAME1).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}/*** 死信交换机** @return*/@Beanpublic DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();}/*** 死信队列** @return*/@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME1).build();}/*** 死信交换机和死信队列绑定** @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME1);}
}
发送消息
package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties08Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties08Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
测试
-
当属性设置为drop-head时
如果超过队列长度,先入队的消息会被先删除(如果配置了死信交换机则会移至死信交换机)
-
当属性设置为reject-publish时
队列消息
常看消息详情
-
当属性设置为reject-publish-dlx时
拒绝接收后面的消息并将拒绝的消息放到死信交换机中
(5)、设置队列内存大小
x-max-length-bytes:Number
队列在开始从头部删除就绪消息之前可以包含的总正文大小
受限于内存大小,超过该阈值则从队列头部开始删除消息
测试模块rabbitmq-08-properties-09
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ
server:port: 8080spring:application:name: properties-learn9rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
生产者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {for (int i = 0; i < 3; i++) {String str = "你好我好大家好" + i;Message message = MessageBuilder.withBody(str.getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}for (int i = 0; i < 3; i++) {String str = "hello" + i;Message message = MessageBuilder.withBody(str.getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.9";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.9";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.9";
}
定义MQ队列
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-max-length-bytes", 30);//队列的内存大小return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}
发送消息
package com.longdidi;import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.service.SendMessageService;
import com.rabbitmq.client.DefaultConsumer;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.rabbitmq.client.*;import java.io.IOException;@SpringBootApplication
public class Rabbitmq08Properties09Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties09Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
测试
(6)、设置单一消费者
表示队列是否是只能有一个消费者
设置为true时注册的消费组内只有一个消费者消费消息,其他被忽略
设置为false时消息循环分发给所有消费者(默认false)
测试模块rabbitmq-08-properties-10
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ
server:port: 8080spring:application:name: properties-learn10rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
生产者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
消费者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReceivemessageService {@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})public void receiveMsg1(Message message) {log.info("消费者1接收到的消息为:{}", new String(message.getBody()));}@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})public void receiveMsg2(Message message) {log.info("消费者2接收到的消息为:{}", new String(message.getBody()));}
}
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.10";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.10";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.10";
}
定义MQ队列
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-single-active-consumer", true);//队列的最大长度return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}
发送消息
package com.longdidi;import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.service.SendMessageService;
import com.rabbitmq.client.DefaultConsumer;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.rabbitmq.client.*;import java.io.IOException;@SpringBootApplication
public class Rabbitmq08Properties10Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties10Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
测试
(7)、设置死信交换机
测试模块rabbitmq-08-properties-11
引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
配置MQ
server:port: 8080spring:application:name: properties-learn11rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi
生产者
package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}
定义常量
package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.11";public static final String EXCHANGE_DLX_NAME = "exchange.properties.dlx.11";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.11";public static final String QUEUE_DLX_NAME1 = "queue.properties.dlx.11";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.11";public static final String ROUTING_DLX_NAME1 = "key.properties.dlx.11";
}
定义MQ队列
package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);//死信交换机arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME1);//死信路由keyreturn QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).ttl(5000)// 设置超时时间.withArguments(arguments).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}/*** 死信交换机** @return*/@Beanpublic DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();}/*** 死信队列** @return*/@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME1).build();}/*** 死信交换机和死信队列绑定** @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME1);}
}
发送消息
package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties11Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties11Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}
测试