运行代码
package com.by.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author lenovo* @version 0.1* @className TopicConsumer 主题交换机* @date 2024/12/24 20:15* @since jdk11*/
@Configuration
@Slf4j
public class TopicConsumer {@Bean // 创建消息队列public Queue topicQuery1() {return QueueBuilder.durable("Topic-牛牛").build();}@Bean // 创建消息队列public Queue topicQuery2() {return QueueBuilder.durable("Topic-花花").build();}@Bean // 创建消息队列public Queue topicQuery3() {return QueueBuilder.durable("Topic-峰峰").build();}@Bean //创建主题交换机 //durable 属性决定了队列是否在 RabbitMQ 服务器重启后仍然存在public Exchange topicExchange() {return ExchangeBuilder.topicExchange("Topic").durable(true).build();}@Bean //绑定队列到交换机public Binding Tbinding1() {return BindingBuilder.bind(topicQuery1()).to(topicExchange()).with("1.6.*").noargs();}@Bean //绑定队列到交换机public Binding Tbinding2() {return BindingBuilder.bind(topicQuery2()).to(topicExchange()).with("#").noargs();}@Bean //绑定队列到交换机public Binding Tbinding3() {return BindingBuilder.bind(topicQuery3()).to(topicExchange()).with("1.8.*").noargs();}@RabbitListener(queues = "Topic-牛牛") //消费者消费消息队列的信息,就不会在队列中显示public void Tconsumer1(String msg) {log.info("Topic-牛牛 收到消息:{}",msg);}@RabbitListener(queues = "Topic-花花") //消费者消费消息队列的信息,就不会在队列中显示public void Tconsumer2(String msg) {log.info("Topic-花花 收到消息:{}",msg);}@RabbitListener(queues = "Topic-峰峰") //消费者消费消息队列的信息,就不会在队列中显示public void Tconsumer3(String msg) {log.info("Topic-峰峰 收到消息:{}",msg);}
}
package com.by.provider;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** @author lenovo* @version 0.1* @className FanoutProvider* @date 2024/12/24 20:01* @since jdk11*/
@Service
@Slf4j
public class TopicProvider {@AutowiredRabbitTemplate rabbitTemplate;public void send(String routingKey , String msg){//如果交换机的名称是错误的,在投递消息的时候,会包错 404 找不到 exchangerabbitTemplate.convertAndSend("Topic" , routingKey , msg);log.error("主题交换机发送成功——>{}" , msg);}
}
测试
package com.by;import com.by.provider.TopicProvider;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.io.IOException;@SpringBootTest
class TopicTests {@AutowiredTopicProvider topicProvider;@Testvoid test() throws IOException {topicProvider.send("1.6.5","张柏芝");System.in.read();}@Testvoid test2() throws IOException {topicProvider.send("1.8.8","赵丽娜");System.in.read();}@Testvoid test3() throws IOException {topicProvider.send("2.3.0","外星女人");System.in.read();}
}