文章目录
- 前言
- 创建 Spring 项目
- 添加配置
- 工作队列模式
- Publisher/Subscribe 发布订阅模式
- Routing 路由模式
- Topics 通配符模式
- 如果发送的消息是对象类型该怎么做
- 结论
前言
前面我们学习了 RabbitMQ 官方提供的 Java API 实现了 RabbitMQ 的其中工作模式,那么既然说到 Java,肯定不可避免的提到 Spring,Spring 对原生的 RabbitMQ 进行了封装,并且集成到了 SpringBoot 中,那么这篇文章我们将来学习 SpringBoot 整合 RabbitMQ。
这里是 Spring AMQP 官方文档 Spring AMQP
创建 Spring 项目
我们在创建 Spring 项目的时候,在添加依赖的时候将 message -> spring for rabbitmq 给勾上:
或者我们也可以直接去 maven 中央仓库中将 spring for rabbitmq 依赖手动添加进去:
添加配置
当添加完成 RabbitMQ 依赖之后,我们需要在 application.properties 或者 application.yml 文件中添加 RabbitMQ 相关的依赖,也就类似于 JDBC 或者 mybatis 的配置:
yml 文件配置:
spring:rabbitmq:host: x.x.x.xport: 5672 //RabbitMQ连接使用的默认端口username: adminpassword: ***virtual-host: test #默认值为 /
或者这样配置:
#amqp://username:password@Ip:port/virtual-host
spring:rabbitmq:addresses: amqp://admin:***@x.x.x.x:5672/test
当引入完成 RabbitMQ 依赖以及进行了相关的配置了之后,我们就来通过 SpringBoot 来实现 RabbitMQ 中的工作队列模式,广播模式,路由模式和通配符模式。
工作队列模式
编写生产者代码:
//工作模式队列
public static final String WORK_QUEUE = "work.queue";
application.yml 配置文件中的配置就相当于上一篇文章我们使用原生 API 中的建立连接和开启信道,下一步对于我们生产者来说就是声明队列和交换器了,工作队列模式的话我们就使用默认的交换器,所以也就没必要显式的声明交换器。
@Configuration
public class RabbitMQConfig {@Bean("workQueue")public Queue workQueue() {return QueueBuilder.durable(Constants.WORK_QUEUE).build();}
}
注意:这里的 Queue 不是 java.util 下的 Queue,而是 org.springframework.amqp.core.Queue
下的 Queue,SpringBoot 整合的 RabbitMQ 中大部分导入的包都是 springframework 下的包。
durable 和 nonDurable 表示创建持久化的队列还是非持久化的队列。
生产者生产消息。SpringBoot 整合的 RabbitMQ 中生产者发送消息依赖于 RabbitTemplate
类,所以我们将这个类给注入进来。
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work() {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE,"spring amqp work" + i);}return "发送成功";}
}
编写消费者代码:
与 RabbitMQ 建立连接在配置文件中配置了,声明队列也在生产者声明的时候声明了,那么我们消费者要做的就是接收和处理消息了:
@Component
public class WorkListener {@RabbitListener(queues = Constants.WORK_QUEUE)public void queueListener1(Message message) {System.out.println("listener 1 [" + Constants.WORK_QUEUE + "] 接收到消息" + message);}@RabbitListener(queues = Constants.WORK_QUEUE)public void queueListener2(Message message) {System.out.println("listener 2 [" + Constants.WORK_QUEUE + "] 接收到消息" + message);}
}
@RabbitListener 是 Spring 框架中用于监听 RabbitMQ 队列的注解,通过使用这个注解,可以定义一个方法,以便从 RabbitMQ 队列中接收消息,该注解支持多种参数类型,这些参数类型代表了从 RabbitMQ 接收到的消息的相关信息。
@RabbitListener 注解中常用的参数类型有:
- String 消息的内容
- Message(import org.springframework.amqp.core.Message):Spring AMQP的 Message 类,返回原始的消息体以及消息的属性,如消息ID,内容,队列信息等
- Channel(com.rabbitmq.client.Channel):RabbitMQ 的通道对象,可以用于进行高级的操作,如手动确认消息
我们将参数类型从 Message 变为 String,并且加上 Channel 看看里面的内容是什么:
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener2(String message, Channel channel) {System.out.println("listener 2 [" + Constants.WORK_QUEUE + "] 接收到消息" + message + channel);
}
@RabbitMQListener 是方法注解也是类注解,也就是说这个注解是可以注解在类上的,如果注解在这个类上就说名这个类监听的队列都是同一个队列。
@RabbitListener(queues = Constants.WORK_QUEUE)
@Component
public class WorkListener {public void queueListener1(Message message) {System.out.println("listener 1 [" + Constants.WORK_QUEUE + "] 接收到消息" + message);}public void queueListener2(String message, Channel channel) {System.out.println("listener 2 [" + Constants.WORK_QUEUE + "] 接收到消息" + message + channel);}
}
当我们将这个注解注解在类上的时候,然后启动项目并且访问我们这个项目的时候发现,队列中的消息并没有被消费:
这是为什么呢?这是因为当 @RabbitMQListener 注解在类上的时候,类中的方法也需要一个 @RabbitMQHandler
注解。
- @RabbitMQHandler 注解是一个方法级别的注解,当使用 @RabbitMQHandler 注解时,这个方法将被调用处理特定的信息
@RabbitListener(queues = Constants.WORK_QUEUE)
@Component
public class WorkListener {@RabbitHandlerpublic void queueListener1(Message message) {System.out.println("listener 1 [" + Constants.WORK_QUEUE + "] 接收到消息" + message);}@RabbitHandlerpublic void queueListener2(String message, Channel channel) {System.out.println("listener 2 [" + Constants.WORK_QUEUE + "] 接收到消息" + message + channel);}
}
在方法上添加注解之后,队列中的消息就能够被消费了:
Publisher/Subscribe 发布订阅模式
相较于工作队列模式,发布订阅模式需要声明交换器,并且绑定交换器与队列:
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
声明队列和交换器,并且绑定交换器和队列:
@Bean("fanoutQueue1")
public Queue fanoutQueue1() {return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
}@Bean("fanoutQueue2")
public Queue fanoutQueue2() {return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
}@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();//绑定交换器和队列
@Bean("fanoutQueueBinding1")
public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue) {return BindingBuilder.bind(queue).to(fanoutExchange);
}@Bean("fanoutQueueBinding2")
public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue) {return BindingBuilder.bind(queue).to(fanoutExchange);
}
当我们将自己构造的对象使用 @Bean
注解交给 Spring 管理的时候,Spring 是根据对象的类型来进行管理的,上面我们使用 @Bean 注解管理了多个返回类型为 Queue 类型的方法,那么当我们要使用这些管理的对象的时候,为了告诉 Spring 我们具体使用的是哪个对象就需要使用 @Qualifier(bean名)
注解来将 bean 注入到我们的方法中。bean 名默认是方法名的小驼峰命名模式,如果方法名第一个字母为大写,则 bean 名就是方法名本身。
生产者代码:
@RequestMapping("/fanout")
public String fanout() {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "","spring amqp work" + i);}return "发送成功";
}
消费者代码:
@Component
public class FanoutListener {@RabbitListener(queues = Constants.FANOUT_QUEUE1)public void queueListener1(String message) {System.out.println("队列["+Constants.FANOUT_QUEUE1+"] 接收到消息:" +message);}@RabbitListener(queues = Constants.FANOUT_QUEUE2)public void queueListener2(String message) {System.out.println("队列["+Constants.FANOUT_QUEUE2+"] 接收到消息:" +message);}
}
Routing 路由模式
当交换机的类型是 Direct 类型的时候,交换器会将消息交给符合指定 Routing Key 的队列,这里路由模式具体是什么我这里就不多说了,大家要是还是不理解,可以去看看我的上一篇文章。
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
声明队列、交换机和绑定交换机、队列:
//声明队列
@Bean("directQueue1")
public Queue directQueue1() {return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
}@Bean("directQueue2")
public Queue directQueue2() {return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
}//声明交换机
@Bean("directExchange")
public DirectExchange directExchange() {return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
}//绑定交换机和队列
@Bean("directQueueBinding1")
public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue1") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with("orange");
}@Bean("directQueueBinding2")
public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with("black");
}@Bean("directQueueBinding3")
public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with("orange");
}
生产者代码:
@RequestMapping("/direct")
public String direct() {rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,"orange","direct exchange orange");rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,"black","direct exchange black");return "发送成功";
}
消费者代码:
@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void queueListener1(String message, Channel channel) {System.out.println("队列["+ Constants.DIRECT_QUEUE1+"] 接收到消息:" + message + channel);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void queueListener2(String message, Channel channel) {System.out.println("队列["+ Constants.DIRECT_QUEUE2+"] 接收到消息:" + message + channel);}
}
Topics 通配符模式
通配符模式,在交换机和队列绑定的时候使用的 Binding Key 可以是通配符字符串,但是生产者发送来的 Routing Key 必须是确定的字符串。
public static final String TOPICS_EXCHANGE = "topics.exchange";
public static final String TOPICS_QUEUE1 = "topics.queue1";
public static final String TOPICS_QUEUE2 = "topics.queue2";
声明队列、交换机以及绑定交换机和队列:
//声明队列
@Bean("topicsQueue1")
public Queue topicsQueue1() {return QueueBuilder.durable(Constants.TOPICS_QUEUE1).build();
}@Bean("topicsQueue2")
public Queue topicsQueue2() {return QueueBuilder.durable(Constants.TOPICS_QUEUE2).build();
}//声明交换机
@Bean("topicsExchange")
public TopicExchange topicExchange() {return ExchangeBuilder.topicExchange(Constants.TOPICS_EXCHANGE).durable(true).build();
}//绑定交换机和队列
@Bean("topicsQueueBinding1")
public Binding topicsQueueBinding1(@Qualifier("topicsExchange") TopicExchange topicExchange,@Qualifier("topicsQueue1") Queue queue) {return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");
}@Bean("topicsQueueBinding2")
public Binding topicsQueueBinding2(@Qualifier("topicsExchange") TopicExchange topicExchange,@Qualifier("topicsQueue2") Queue queue) {return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");
}@Bean("topicsQueueBinding3")
public Binding topicsQueueBinding3(@Qualifier("topicsExchange") TopicExchange topicExchange,@Qualifier("topicsQueue2") Queue queue) {return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");
}
生产者代码:
@RequestMapping("/topics")
public String topics() {rabbitTemplate.convertAndSend(Constants.TOPICS_EXCHANGE,"red.orange.blue","direct exchange orange");rabbitTemplate.convertAndSend(Constants.TOPICS_EXCHANGE,"dog.cat.rabbit","direct exchange black");rabbitTemplate.convertAndSend(Constants.TOPICS_EXCHANGE,"lazy.outgoing.happy","direct exchange black");return "发送成功";
}
消费者代码:
@Component
public class TopicsListener {@RabbitListener(queues = Constants.TOPICS_QUEUE1)public void queueListener1(String message, Channel channel) {System.out.println("队列["+ Constants.TOPICS_QUEUE1+"] 接收到消息:" + message + channel);}@RabbitListener(queues = Constants.TOPICS_QUEUE2)public void queueListener2(String message, Channel channel) {System.out.println("队列["+ Constants.TOPICS_QUEUE2+"] 接收到消息:" + message + channel);}
}
如果发送的消息是对象类型该怎么做
前面我们发送的消息的类型都是字符串类型,那么如果我们想要发送对象类型的消息该怎么办呢?
我们可以将我们的对象实现 Serializable
接口:
@Data
public class MessageInfo implements Serializable {private Integer ID;
}
声明队列,交换机就使用默认的交换机:
@Bean("hello")
public Queue hello() {return QueueBuilder.durable("hello").build();
}
生产者代码:
@RequestMapping("/test")
public String test() {MessageInfo messageInfo = new MessageInfo();messageInfo.setID(123);rabbitTemplate.convertAndSend("","hello",messageInfo);return "发送成功";
}
消费者代码:
@Component
public class TestListener {@RabbitListener(queues = "hello")public void queueListener(MessageInfo messageInfo) {System.out.println("接收到消息:" + messageInfo);}
}
这是一种方法,Spring AMQP 推荐使用 JSON 序列化,Spring AMQP 提供了 Jackson2JsonMessageConverter
和 MappingJackson2MessageConverter
等转换器,我们需要把一个 MessageConverter
设置到 RabbitTemplate
中。
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {return new Jackson2JsonMessageConverter();
}@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());return rabbitTemplate;
}
然后我们把 MessageInfo 这个类中的实现 Serializable
接口给去掉,再来试试:
结论
感谢各位朋友们能够看到结尾,找不到工作?某直聘已读不回?不妨看看这里超快回复,助力每一位程序员早日找到理想的工作