开发环境
- jdk:1.8
- spring-rabbit:2.3.9
- springboot:2.4.8
项目目录
配置文件
spring:datasource:type: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/library?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&rewriteBatchedStatements=trueusername: rootpassword: 123456rabbitmq:host: localhostport: 5675publisher-confirm-type: correlated # 开启确认模式publisher-returns: true # 开启退回模式listener:simple:acknowledge-mode: manual # 手动确认concurrency: 10 # 并发数max-concurrency: 20 # 最大并发数jpa:hibernate:ddl-auto: updateproperties:hibernate:dialect: org.hibernate.dialect.MySQL8Dialectserver:port: 8112
第一种模式
/*** @Description 1. "Hello World!"* @Author WuYiLong* @Date 2024/8/14 15:34*/
@Slf4j
@Service
public class FirstModeExample {public static final String QUEUE = "firstModeQueue";public static final String EXCHANGE = "firstModeExchange";public static final String ROUTE_KEY = "firstModeRouteKey";@RabbitListener(bindings = {@QueueBinding(value = @Queue(name = QUEUE), exchange = @Exchange(name = EXCHANGE), key = {ROUTE_KEY})})public void consumer(String data, Channel channel, Message message) throws IOException {log.info("第一种模式:生产者-队列-消费者,数据:{}",data);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}
第二种模式
/*** @Description 2. Work Queues* @Author WuYiLong* @Date 2024/8/14 16:12*/
@Slf4j
@Service
public class SecondModeExample {public static final String QUEUE = "secondModeQueue";public static final String EXCHANGE = "secondModeExchange";public static final String ROUTE_KEY = "secondModeRouteKey";@RabbitListener(bindings = @QueueBinding(value =@Queue(name = QUEUE),exchange = @Exchange(name = EXCHANGE),key = ROUTE_KEY))public void consumer1(String data, Channel channel, Message message) throws IOException {log.info("消费者1正在消费:{}",data);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}@RabbitListener(bindings = @QueueBinding(value =@Queue(name = QUEUE),exchange = @Exchange(name = EXCHANGE),key = ROUTE_KEY))public void consumer2(String data, Channel channel, Message message) throws IOException {log.info("消费者2正在消费:{}",data);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
第三种模式
/*** @Description 3. Publish/Subscribe* @Author WuYiLong* @Date 2024/8/14 16:37*/
@Slf4j
@Service
public class ThirdModeExample {public static final String QUEUE1 = "thirdModeQueue1";public static final String QUEUE2 = "thirdModeQueue2";public static final String EXCHANGE = "thirdModeExchange";@RabbitListener(bindings = @QueueBinding(value = @Queue(QUEUE1), exchange = @Exchange(name = EXCHANGE,type = ExchangeTypes.FANOUT)))public void consumer1(String data, Channel channel, Message message) throws Exception {log.info("consumer1: {}", data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(bindings = @QueueBinding(value = @Queue(QUEUE2), exchange = @Exchange(name=EXCHANGE,type = ExchangeTypes.FANOUT)))public void consumer2(String data, Channel channel, Message message) throws Exception {log.info("consumer2: {}", data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
第四种模式
/*** @Description 4. Routing* @Author WuYiLong* @Date 2024/8/15 14:05*/
@Slf4j
@Service
public class FourModeExample {public static final String QUEUE1 = "fourModeQueue1";public static final String QUEUE2 = "fourModeQueue2";public static final String EXCHANGE = "fourModeExchange";public static final String ROUTING_KEY1 = "fourModeRoutingKey1";public static final String ROUTING_KEY2 = "fourModeRoutingKey2";public static final String ROUTING_KEY3 = "fourModeRoutingKey3";@RabbitListener(bindings = @QueueBinding(value = @Queue(name = QUEUE1),exchange = @Exchange(value = EXCHANGE),key = {ROUTING_KEY1,ROUTING_KEY2,ROUTING_KEY3}))public void consumer1(String data, Channel channel, Message message) throws Exception {log.info("consumer1->{}", data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = QUEUE2),exchange = @Exchange(value = EXCHANGE),key = {ROUTING_KEY1}))public void consumer2(String data, Channel channel, Message message) throws Exception {log.info("consumer2->{}", data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
第五种模式
/*** @Description 5. Topics* @Author WuYiLong* @Date 2024/8/16 10:39*/
@Slf4j
@Service
public class FiveModeExample {public static final String QUEUE1 = "queue1";public static final String QUEUE2 = "queue2";public static final String EXCHANGE = "exchange";public static final String ROUTING_KEY1 = "*.a.*";public static final String ROUTING_KEY2 = "*.*.b";public static final String ROUTING_KEY3 = "c.#";@RabbitListener(bindings = @QueueBinding(value = @Queue(name = QUEUE1),exchange = @Exchange(name = EXCHANGE,type = ExchangeTypes.TOPIC),key = ROUTING_KEY1))public void consumer1(String data, Channel channel, Message message) throws Exception {log.info("consumer1: {}", data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = QUEUE2),exchange = @Exchange(name = EXCHANGE,type = ExchangeTypes.TOPIC),key = {ROUTING_KEY2,ROUTING_KEY3}))public void consumer2(String data, Channel channel, Message message) throws Exception {log.info("consumer2: {}", data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
第六种模式
注意:客户端和服务器端不能在同一个服务,必须分开两个服务写
要不会造成不断循环
服务器端
- 配置类
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setConfirmCallback(new DefaultConfirmCallback());
rabbitTemplate.setReturnsCallback(new DefaultReturnsCallback());
rabbitTemplate.setMandatory(true); //设置为true,才会触发DefaultReturnsCallback方法
// rabbitTemplate.setChannelTransacted(true); // 支持事务,但使用RPC模式时需要关闭
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); // 消息转换器,支持转换对象
return rabbitTemplate;
}
- 队列和交换机的绑定
/*** @Description* @Author WuYiLong* @Date 2024/8/19 17:24*/
@Configuration
public class RabbitRpcConfig {public static final String REPLY_QUEUE = "sixModeReplyQueue";public static final String RPC_QUEUE = "sixModeRpcQueue";public static final String EXCHANGE = "sixModeExchange";public static final String RPC_ROUTING_KEY = "sixModeRpcRoutingKey";public static final String REPLY_ROUTING_KEY = "sixModeReplyRoutingKey";@Beanpublic Queue replyQueue() {return QueueBuilder.durable(REPLY_QUEUE).build();}@Beanpublic Exchange rpcExchange() {return ExchangeBuilder.topicExchange(EXCHANGE).build();}@Beanpublic Binding replyBinding() {return BindingBuilder.bind(replyQueue()).to(rpcExchange()).with(REPLY_ROUTING_KEY).noargs();}@Beanpublic Queue rpcQueue() {return QueueBuilder.durable(RPC_QUEUE).build();}@Beanpublic Binding rpcBinding() {return BindingBuilder.bind(rpcQueue()).to(rpcExchange()).with(RPC_ROUTING_KEY).noargs();}}
- 监听类
用于监听客户端发送过来的消息
/*** @Description rpc模式服务器端* @Author WuYiLong* @Date 2024/8/16 14:31*/
@Slf4j
@Service
public class SixModeExample {@Autowiredprivate RabbitTemplate rabbitTemplate;@RabbitListener(queues = RabbitRpcConfig.RPC_QUEUE)public void consumer(Channel channel,Message message) throws IOException {log.info("consumer->{}", StringUtils.toEncodedString(message.getBody(), StandardCharsets.UTF_8));Message build = MessageBuilder.withBody("已收到消息了".getBytes(StandardCharsets.UTF_8)).build();channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);String correlationId = message.getMessageProperties().getCorrelationId();CorrelationData correlationData = new CorrelationData(correlationId);rabbitTemplate.sendAndReceive(RabbitRpcConfig.EXCHANGE, RabbitRpcConfig.REPLY_ROUTING_KEY,build ,correlationData);}
}
客户端
- 配置类
/*** @Description* @Author WuYiLong* @Date 2024/8/19 17:20*/
@Configuration
public class RabbitClientConfig {@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();simpleMessageListenerContainer.setConnectionFactory(connectionFactory);simpleMessageListenerContainer.setQueueNames(RabbitRpcConfig.REPLY_QUEUE); // 监听的队列simpleMessageListenerContainer.setMessageListener(rabbitTemplate(connectionFactory));return simpleMessageListenerContainer;}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); // 消息转换器,支持转换对象rabbitTemplate.setReplyAddress(RabbitRpcConfig.REPLY_QUEUE); // 使用 RabbitTemplate发送和接收消息,并设置回调队列地址rabbitTemplate.setReplyTimeout(60000);return rabbitTemplate;}
}
- 客户端发送消息
/*** @Description* @Author WuYiLong* @Date 2024/8/19 17:30*/
@Api(tags = "rpc-客户端api")
@RestController
@RequestMapping(value = "rpcClient")
public class RpcClientController {@Autowiredprivate RabbitTemplate rabbitTemplate;@ApiOperation(value = "客户端发送消息")@GetMapping(value = "sendMessage")public void sendMessage(String msg) {Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).build();Message result = rabbitTemplate.sendAndReceive(RabbitRpcConfig.EXCHANGE, RabbitRpcConfig.RPC_ROUTING_KEY, message);if(result != null) {String correlationId = message.getMessageProperties().getCorrelationId();String returnCorrelationId = result.getMessageProperties().getHeader("spring_returned_message_correlation");if(correlationId.equals(returnCorrelationId)) {System.out.println("已确认该消息由该客户端发送");}System.out.println(new String(result.getBody()));}}
}
死信队列(也可以当延迟队列使用)
- 队列和交换机的设置
/********************************测试死信队列****************************************/@Beanpublic Queue ordinaryQueue() {return QueueBuilder.durable(DeadLetterQueueExample.ORDINARY_QUEUE).ttl(5000).deadLetterExchange(DeadLetterQueueExample.DEAD_LETTER_EXCHANGE).deadLetterRoutingKey(DeadLetterQueueExample.DEAD_LETTER_ROUTING_KEY).build();}@Beanpublic Exchange ordinaryExchange() {return ExchangeBuilder.directExchange(DeadLetterQueueExample.ORDINARY_EXCHANGE).durable(true).build();}@Beanpublic Binding ordinaryBinding() {return BindingBuilder.bind(ordinaryQueue()).to(ordinaryExchange()).with(DeadLetterQueueExample.ORDINARY_ROUTING_KEY).noargs();}@Beanpublic Queue deadLetterQueue() {return QueueBuilder.durable(DeadLetterQueueExample.DEAD_LETTER_QUEUE).build();}@Beanpublic Exchange deadLetterExchange() {return ExchangeBuilder.directExchange(DeadLetterQueueExample.DEAD_LETTER_EXCHANGE).durable(true).build();}@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DeadLetterQueueExample.DEAD_LETTER_ROUTING_KEY).noargs();}
- 监听类
只需要监听deadLetterQueue队列的消息
/*** @Description 死信队列栗子* @Author WuYiLong* @Date 2024/8/20 9:41*/
@Slf4j
@Service
public class DeadLetterQueueExample {public static final String ORDINARY_QUEUE = "ordinaryQueue";public static final String ORDINARY_EXCHANGE = "ordinaryExchange";public static final String ORDINARY_ROUTING_KEY = "ordinaryRoutingKey";public static final String DEAD_LETTER_QUEUE = "deadLetterQueue";public static final String DEAD_LETTER_EXCHANGE = "deadLetterExchange";public static final String DEAD_LETTER_ROUTING_KEY = "deadLetterRoutingKey";@RabbitListener(queues = DEAD_LETTER_QUEUE)public void deadConsumer(Channel channel, Message message) throws IOException {log.info("deadConsumer message: {}", new String(message.getBody()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
- 测试类
/*** @Description* @Author WuYiLong* @Date 2024/8/20 10:46*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class DeadLetterQueueExampleTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testDeadLetterQueueExample() throws InterruptedException {for (int i = 0; i < 10; i++) {String msg = "java是一门计算机语言"+i;
// rabbitTemplate.convertAndSend(DeadLetterQueueExample.ORDINARY_EXCHANGE,DeadLetterQueueExample.ORDINARY_ROUTING_KEY,msg);Message message = new Message(msg.getBytes(StandardCharsets.UTF_8));
// message.getMessageProperties().setExpiration("5000"); // 如果消息设置了过期时间,队列就不用设置了.ttl(5000)rabbitTemplate.send(DeadLetterQueueExample.ORDINARY_EXCHANGE,DeadLetterQueueExample.ORDINARY_ROUTING_KEY,message);}TimeUnit.SECONDS.sleep(20);}
}
仓库地址
github