您的位置:首页 > 文旅 > 旅游 > 英语学习软件_中国中信建设有限责任公司_免费网页制作平台_网站建站流程

英语学习软件_中国中信建设有限责任公司_免费网页制作平台_网站建站流程

2025/3/12 13:08:23 来源:https://blog.csdn.net/u011488477/article/details/146039416  浏览:    关键词:英语学习软件_中国中信建设有限责任公司_免费网页制作平台_网站建站流程
英语学习软件_中国中信建设有限责任公司_免费网页制作平台_网站建站流程

八、RabbitMQ队列详解

1、队列属性

  • Type

    设置队列的队列类型;

  • Name

    队列名称,就是一个字符串,随便一个字符串就可以;

  • Durability

    声明队列是否持久化,代表队列在服务器重启后是否还存在;

  • Auto delete:

    是否自动删除

    如果为true,当没有消费者连接到这个队列的时候,队列会自动删除;

  • Exclusive

    exclusive属性的队列只对第一个连接它的消费者可见(之后其它消费者无法访问该队列),并且在连接断开时自动删除

    基本上不设置它,设置成false

  • Arguments:队列的其他属性,例如指定DLX(死信交换机等);

    1. x-expires:Number

      当Queue(队列)在指定的时间未被访问则队列将被自动删除

    2. x-message-ttl:Number

      发布的消息在队列中存在多长时间后被取消(单位毫秒)

    3. x-overflow:String

      设置队列溢出行为

      当达到队列的最大长度时消息的处理方式

      • drop-head:删除头部消息

      • reject-publish:超过队列长度后,后面发布的消息会被拒绝接收

      • reject-publish-dlx:超过队列长度后,后面发布的消息会被拒绝接收并发布到死信交换机

      仲裁队列类型仅支持:drop-head and reject-publish两种

    4. x-max-length:Number

      队列所能容下消息的最大长度,当超出长度后新消息将会覆盖最前面的消息

    5. x-max-length-bytes:Number

      队列在开始从头部删除就绪消息之前可以包含的总正文大小

      受限于内存大小,超过该阈值则从队列头部开始删除消息

    6. x-single-active-consumer:默认为false

      表示队列是否是只能有一个消费者

      设置为true时注册的消费组内只有一个消费者消费消息,其他被忽略

      设置为false时消息循环分发给所有消费者(默认false)

    7. x-dead-letter-exchange:String

      指定队列关联的死信交换机

      有时候我们希望当队列的消息达到上限后溢出的消息不会被删除掉,而是走到另一个队列中保存起来;

    8. x-dead-letter-routing-key:String

      指定死信交换机的路由键

      一般和7一起定义

    9. x-queue-mode:String(理解下即可)

      队列类型x-queue-mode=lazy懒队列,在磁盘上尽可能多地保留消息以减少RAM使用

      如果未设置则队列将保留内存缓存以尽可能快地传递消息;

      如果设置则队列消息将保存在磁盘上,消费时从磁盘上读取消费

    10. x-queue-master-locator:String(用的较少)

      在集群模式下设置队列分配到的主节点位置信息;

      每个queue都有一个master节点,所有对于queue的操作都是事先在master上完成,之后再slave上进行相同的操作;

      每个不同的queue可以坐落在不同的集群节点上,这些queue如果配置了镜像队列,那么会有1个master和多个slave

      基本上所有的操作都落在master上,那么如果这些queues的master都落在个别的服务节点上,而其他的节点又很空闲,这样就无法做到负载均衡,那么势必会影响性能;

      关于master queue host 的分配有几种策略,可以在queue声明的时候使用x-queue-master-locator参数,或者在policy上设置queue-master-locator,或者直接在rabbitmq的配置文件中定义queue_master_locator有三种可供选择的策略

    • min-masters:选择master queue数最少的那个服务节点host;

    • client-local:选择与client相连接的那个服务节点host;

    • random:随机分配;

2、属性测试

8.2.1、非持久化属性测试

测试模块rabbitmq-08-properties-01

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ

server:port: 8080spring:application:name: properties-learn1rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

生产者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{Message message = MessageBuilder.withBody("hello world2".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME2, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.1";// 队列public static final String QUEUE_NAME1 = "queue.properties.1";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.test1.1";public static final String ROUTING_NAME2 = "key.properties.test1.2";}

定义MQ队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列* 不持久化、不设置名字** @return*/@Beanpublic Queue normalQueue1() {return QueueBuilder.nonDurable().build();}/*** 正常队列* 不持久化、设置名字** @return*/@Beanpublic Queue normalQueue2() {return QueueBuilder.nonDurable(RabbitMQConstant.QUEUE_NAME1).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue1* @return*/@Beanpublic Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue2* @return*/@Beanpublic Binding bindingNormal2(DirectExchange normalExchange, Queue normalQueue2) {return BindingBuilder.bind(normalQueue2).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME2);}}

发送消息

package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties01Application  implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties01Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

  • 发行消息查看控制台

    在这里插入图片描述

  • 重启服务器查看队列是否存在

    在这里插入图片描述

8.2.2、持久化测试

是否自动删除

如果为true,当没有消费者连接到这个队列的时候,队列会自动删除

测试模块rabbitmq-08-properties-02

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ

server:port: 8080spring:application:name: properties-learn2rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

生产者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{Message message = MessageBuilder.withBody("hello world2".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME2, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.2";// 队列public static final String QUEUE_NAME1 = "queue.properties.2";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.test1.2";public static final String ROUTING_NAME2 = "key.properties.test2.2";}

定义MQ队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列* 持久化、不设置名字** @return*/@Beanpublic Queue normalQueue1() {return QueueBuilder.durable().build();}/*** 正常队列* 持久化、设置名字** @return*/@Beanpublic Queue normalQueue2() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue1* @return*/@Beanpublic Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue2* @return*/@Beanpublic Binding bindingNormal2(DirectExchange normalExchange, Queue normalQueue2) {return BindingBuilder.bind(normalQueue2).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME2);}}

发送消息

package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties02Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties02Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

  • 发送消息查看控制台

    在这里插入图片描述

  • 重启服务器查看控制台

    在这里插入图片描述

8.2.3、自动删除测试

测试模块rabbit-08-properties-03

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ

server:port: 8080spring:application:name: properties-learn3rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

生产者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{Message message = MessageBuilder.withBody("hello world2".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME2, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

消费者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReceivemessageService {@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})public void receiveMsg1(Message message) {log.info("1接收到的消息为:{}", new String(message.getBody()));}@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME2})public void receiveMsg2(Message message) {log.info("2接收到的消息为:{}", new String(message.getBody()));}
}

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.3";// 队列public static final String QUEUE_NAME1 = "queue.properties.test1.3";public static final String QUEUE_NAME2 = "queue.properties.test2.3";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.test1.3";public static final String ROUTING_NAME2 = "key.properties.test2.3";}

定义MQ队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列1** @return*/@Beanpublic Queue normalQueue1() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).autoDelete()// 设置自动删除.build();}/*** 正常队列2** @return*/@Beanpublic Queue normalQueue2() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME2)// .autoDelete()// 设置不自动删除,默认就是不自动删除.build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue1* @return*/@Beanpublic Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue2* @return*/@Beanpublic Binding bindingNormal2(DirectExchange normalExchange, Queue normalQueue2) {return BindingBuilder.bind(normalQueue2).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME2);}}

发送消息

package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties03Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties03Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

  • 有消费者连接时

    在这里插入图片描述

  • 无消费连接时

    在这里插入图片描述

8.2.4、可见性测试

普通队列允许的消费者没有限制,多个消费者绑定到同一个队列时,RabbitMQ会采用轮询进行投递

如果需要消费者独占队列,在队列创建的时候,设定属性exclusive为true。

exclusive有两个作用

  • 当连接关闭时connection.close()该队列是否会自动删除;

  • 该队列是否是私有的private

    如果设置为false则可以使用两个消费者都访问同一个队列,没有任何问题

    如果设置为true则会对当前队列加锁,其他连接connection是不能访问的,同一个连接的不同channel是可以访问的。如果强制访问会报如下异常:

    It could be originally declared on another connection or the exclusive property value does not match that of the original declaration., class-id=60, method-id=20)
    

测试模块rabbit-08-properties-04

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ

server:port: 8080spring:application:name: properties-learn4rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

生产者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}{Message message = MessageBuilder.withBody("hello world2".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.4";// 队列public static final String QUEUE_NAME1 = "queue.properties.test1.4";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.test1.4";
}

定义MQ队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列1** @return*/@Beanpublic Queue normalQueue1() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).exclusive()//声明只有第一个连接的消费者可见,(之后其它消费者无法访问该队列),并且在连接断开时自动删除//.autoDelete()// 设置自动删除.build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue1* @return*/@Beanpublic Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}

消费者

这里定义的消费者表示是在同一个Connection中消费消息的多个消费者,用来测试是否同一个Connection中的多个消费者可以消费

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReceivemessageService {@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})public void receiveMsg1(Message message) {log.info("消费者1接收到的消息为:{}", new String(message.getBody()));}@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})public void receiveMsg2(Message message) {log.info("消费者2接收到的消息为:{}", new String(message.getBody()));}
}

发送消息

这里面除了发送消息,也定义两个Connection连接测试是否允许不同Connection的连接访问

package com.longdidi;import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.service.SendMessageService;
import com.rabbitmq.client.DefaultConsumer;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.rabbitmq.client.*;import java.io.IOException;@SpringBootApplication
public class Rabbitmq08Properties04Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties04Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();{//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置工厂参数factory.setHost("192.168.1.101");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("123456");factory.setVirtualHost("longdidi");Connection connection = factory.newConnection();//3.创建channelChannel channel = connection.createChannel();DefaultConsumer consumer = new DefaultConsumer(channel) {/*回调方法 当收到信息 自动执行该方法consumerTag*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("连接1接收到信息:" + new String(body));}};channel.basicConsume(RabbitMQConstant.QUEUE_NAME1, true, consumer);// 6.释放资源channel.close();connection.close();}{//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置工厂参数factory.setHost("192.168.1.101");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("123456");factory.setVirtualHost("longdidi");Connection connection = factory.newConnection();//3.创建channelChannel channel = connection.createChannel();DefaultConsumer consumer = new DefaultConsumer(channel) {/*回调方法 当收到信息 自动执行该方法consumerTag*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("连接2接收到信息:" + new String(body));}};channel.basicConsume(RabbitMQConstant.QUEUE_NAME1, true, consumer);// 6.释放资源channel.close();connection.close();}}
}

测试

  • 启动程序查看

    在这里插入图片描述

  • 查看同一Connection内的多个消费者是否可以连接

    注释掉主程序中的两个Connection连接代码重启测试

    在这里插入图片描述

  • 测试自动删除

    停止程序运行查看队列是否自动删除

    在这里插入图片描述

8.2.5、Arguments测试

(1)、删除属性测试

x-expires属性:当Queue(队列)在指定的时间未被访问则队列将被自动删除

测试模块rabbitmq-08-properties-05

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ

server:port: 8080spring:application:name: properties-learn5rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

生产者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.5";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.5";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.5";
}

定义MQ队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-expires", 10000L); //当Queue(队列)在指定的时间未被访问则队列将被自动删除return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}

发送消息

package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties05Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties05Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

  • 启动应用查看控制台

    在这里插入图片描述

  • 等待超时后查看控制台

    在这里插入图片描述

(2)、设置队列过期时间

发布的消息在队列中存在多长时间后被取消(单位毫秒)

测试模块rabbitmq-08-properties-06

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ

server:port: 8080spring:application:name: properties-learn6rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

生产者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.6";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.6";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.6";
}

定义MQ队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-message-ttl", 5000L);//发布的消息在队列中存在多长时间后被取消(单位毫秒)return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}

发送消息

package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties06Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties06Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

  • 发送消息查看控制台

    在这里插入图片描述

  • 超时后查看控制台

    在这里插入图片描述

(3)、设置队列长度

x-max-length:Number

队列所能容下消息的最大长度,当超出长度后新消息将会覆盖最前面的消息

测试模块rabbitmq-08-properties-07

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ

server:port: 8080spring:application:name: properties-learn7rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

生产者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {for (int i = 0; i < 8; i++) {String str = "hello world1" + i;Message message = MessageBuilder.withBody(str.getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.7";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.7";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.7";
}

定义MQ队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-max-length", 5);// 队列的溢出行为,删除头部return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}

发送消息

package com.longdidi;import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.service.SendMessageService;
import com.rabbitmq.client.DefaultConsumer;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.rabbitmq.client.*;import java.io.IOException;@SpringBootApplication
public class Rabbitmq08Properties07Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties07Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

  • 查看队列消息长度

在这里插入图片描述

  • 查看存活消息

    在这里插入图片描述

(4)、设置队列溢出行为

设置队列溢出行为

当达到队列的最大长度时消息的处理方式:有效值为drop-head(删除头部消息)、reject-publish(拒绝发布)或reject-publish-dlx(拒绝发布到死信交换机)

仲裁队列类型仅支持:drop-head and reject-publish两种

测试模块rabbitmq-08-properties-08

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ

server:port: 8080spring:application:name: properties-learn6rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

生产者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {for (int i = 0; i < 8; i++) {String str = "hello world1" + i;Message message = MessageBuilder.withBody(str.getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.8";public static final String EXCHANGE_DLX_NAME = "exchange.properties.dlx.8";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.8";public static final String QUEUE_DLX_NAME1 = "queue.properties.dlx.8";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.8";public static final String ROUTING_DLX_NAME1 = "key.properties.dlx.8";
}

定义MQ队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-max-length", 5);// 队列的溢出行为,删除头部// arguments.put("x-overflow", "drop-head");// 队列的溢出行为,删除头部(默认行为)//arguments.put("x-overflow", "reject-publish");//队列的溢出行为,拒绝发布arguments.put("x-overflow", "reject-publish-dlx");//队列的溢出行为,拒绝接收消息,超过长度的消息会被发送到死信交换机而不是拒绝接收return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).deadLetterExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).deadLetterRoutingKey(RabbitMQConstant.ROUTING_DLX_NAME1).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}/*** 死信交换机** @return*/@Beanpublic DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();}/*** 死信队列** @return*/@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME1).build();}/*** 死信交换机和死信队列绑定** @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME1);}
}

发送消息

package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties08Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties08Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

  • 当属性设置为drop-head时

    如果超过队列长度,先入队的消息会被先删除(如果配置了死信交换机则会移至死信交换机)

    在这里插入图片描述

  • 当属性设置为reject-publish时

    队列消息

    在这里插入图片描述

    常看消息详情

    在这里插入图片描述

  • 当属性设置为reject-publish-dlx时

    拒绝接收后面的消息并将拒绝的消息放到死信交换机中

    在这里插入图片描述

(5)、设置队列内存大小

x-max-length-bytes:Number

队列在开始从头部删除就绪消息之前可以包含的总正文大小

受限于内存大小,超过该阈值则从队列头部开始删除消息

测试模块rabbitmq-08-properties-09

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ

server:port: 8080spring:application:name: properties-learn9rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

生产者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {for (int i = 0; i < 3; i++) {String str = "你好我好大家好" + i;Message message = MessageBuilder.withBody(str.getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}for (int i = 0; i < 3; i++) {String str = "hello" + i;Message message = MessageBuilder.withBody(str.getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.9";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.9";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.9";
}

定义MQ队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-max-length-bytes", 30);//队列的内存大小return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}

发送消息

package com.longdidi;import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.service.SendMessageService;
import com.rabbitmq.client.DefaultConsumer;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.rabbitmq.client.*;import java.io.IOException;@SpringBootApplication
public class Rabbitmq08Properties09Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties09Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

在这里插入图片描述

在这里插入图片描述

(6)、设置单一消费者

表示队列是否是只能有一个消费者

设置为true时注册的消费组内只有一个消费者消费消息,其他被忽略

设置为false时消息循环分发给所有消费者(默认false)

测试模块rabbitmq-08-properties-10

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ

server:port: 8080spring:application:name: properties-learn10rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

生产者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

消费者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReceivemessageService {@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})public void receiveMsg1(Message message) {log.info("消费者1接收到的消息为:{}", new String(message.getBody()));}@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})public void receiveMsg2(Message message) {log.info("消费者2接收到的消息为:{}", new String(message.getBody()));}
}

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.10";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.10";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.10";
}

定义MQ队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-single-active-consumer", true);//队列的最大长度return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}

发送消息

package com.longdidi;import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.service.SendMessageService;
import com.rabbitmq.client.DefaultConsumer;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.rabbitmq.client.*;import java.io.IOException;@SpringBootApplication
public class Rabbitmq08Properties10Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties10Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

在这里插入图片描述

(7)、设置死信交换机

测试模块rabbitmq-08-properties-11

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

配置MQ

server:port: 8080spring:application:name: properties-learn11rabbitmq:host: 192.168.1.101port: 5672username: adminpassword: 123456virtual-host: longdidi

生产者

package com.longdidi.service;import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg() {{Message message = MessageBuilder.withBody("hello world1".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);log.info("消息发送完毕,发送时间为:{}", new Date());}}
}

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.properties.normal.11";public static final String EXCHANGE_DLX_NAME = "exchange.properties.dlx.11";// 队列public static final String QUEUE_NAME1 = "queue.properties.normal.11";public static final String QUEUE_DLX_NAME1 = "queue.properties.dlx.11";// 路由keypublic static final String ROUTING_NAME1 = "key.properties.normal.11";public static final String ROUTING_DLX_NAME1 = "key.properties.dlx.11";
}

定义MQ队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {/*** 正常交换机** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();}/*** 正常队列** @return*/@Beanpublic Queue normalQueue() {//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);//死信交换机arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME1);//死信路由keyreturn QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).ttl(5000)// 设置超时时间.withArguments(arguments).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}/*** 死信交换机** @return*/@Beanpublic DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();}/*** 死信队列** @return*/@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME1).build();}/*** 死信交换机和死信队列绑定** @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME1);}
}

发送消息

package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq08Properties11Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq08Properties11Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com