一、前言
本篇主要是围绕着 Spring Boot 3.x 与 RabbitMQ 的动态配置集成,对比上一篇文章,进一步集成RabbitMQ动态操作,比如动态新增 RabbitMQ 实例,以及动态实例中的交换机、队列等操作。
二、动态RabbitMQ实例,创建、删除
1、RabbitMQ动态实例配置
DynamicRabbitMQConfig.java
mport com.chain.air.rpp.exchange.properties.RabbitInstance;
import com.chain.air.rpp.exchange.properties.RabbitProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Configuration
public class DynamicRabbitMQConfig {private final RabbitProperties rabbitProperties;private Map<String, RabbitInstance> rabbitInstanceMap = new HashMap<>();private Map<String, RabbitTemplate> rabbitTemplateMap = new HashMap<>();private Map<String, RabbitAdmin> rabbitAdminMap = new HashMap<>();private Map<String, ConnectionFactory> connectionFactoryMap = new HashMap<>();@Autowiredpublic DynamicRabbitMQConfig(RabbitProperties rabbitProperties) {this.rabbitProperties = rabbitProperties;}@PostConstructpublic void init() {rabbitProperties.getInstances().forEach(this::createRabbitInstance);}public void createRabbitInstance(RabbitInstance instance) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(instance.getHost());connectionFactory.setPort(instance.getPort());connectionFactory.setUsername(instance.getUsername());connectionFactory.setPassword(instance.getPassword());connectionFactory.setVirtualHost(instance.getVirtualHost());connectionFactoryMap.put(instance.getName(), connectionFactory);RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(new ObjectMapper()));rabbitTemplateMap.put(instance.getName(), rabbitTemplate);RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);rabbitAdminMap.put(instance.getName(), rabbitAdmin);rabbitInstanceMap.put(instance.getName(), instance);}public RabbitTemplate getRabbitTemplate(String name) {return rabbitTemplateMap.get(name);}public RabbitAdmin getRabbitAdmin(String name) {return rabbitAdminMap.get(name);}public ConnectionFactory getConnectionFactory(String name) {return connectionFactoryMap.get(name);}public Map<String, Object> getRabbitInstance(String name) {Map<String, Object> result = new HashMap<>();result.put("instance", rabbitInstanceMap.get(name));result.put("rabbitAdmin", rabbitAdminMap.get(name));result.put("rabbitTemplate", rabbitTemplateMap.get(name));result.put("connectionFactory", connectionFactoryMap.get(name));return result;}public Boolean checkInstanceExist(String name) {return rabbitInstanceMap.containsKey(name);}public List<String> getDynamicInstanceNames() {return new ArrayList<>(rabbitInstanceMap.keySet());}public void removeRabbitInstance(String name) {rabbitInstanceMap.remove(name);rabbitAdminMap.remove(name);rabbitTemplateMap.remove(name);connectionFactoryMap.remove(name);}
}
2、RabbitMQ动态实例操作Service组件
RabbitDynamicInstanceService.java
import com.chain.air.rpp.exchange.config.rabbit.DynamicRabbitMQConfig;
import com.chain.air.rpp.exchange.properties.RabbitInstance;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.List;
@Slf4j
@Service
public class RabbitDynamicInstanceService {private final DynamicRabbitMQConfig dynamicRabbitMQConfig;@Autowiredpublic RabbitDynamicInstanceService(DynamicRabbitMQConfig dynamicRabbitMQConfig) {this.dynamicRabbitMQConfig = dynamicRabbitMQConfig;}public Boolean createRabbitInstance(RabbitInstance rabbitInstance) {try {boolean instanceExist = dynamicRabbitMQConfig.checkInstanceExist(rabbitInstance.getName());if (instanceExist) {log.warn("实例【{}】已存在,无需重复创建", rabbitInstance.getName());return true;}dynamicRabbitMQConfig.createRabbitInstance(rabbitInstance);return true;} catch (Exception e) {log.error("创建RabbitMQ实例失败,失败原因:{}", e.getMessage());return false;}}public Boolean removeRabbitInstance(String name) {try {boolean instanceExist = dynamicRabbitMQConfig.checkInstanceExist(name);if (!instanceExist) {log.warn("实例【{}】不存在,无需删除", name);return true;}dynamicRabbitMQConfig.removeRabbitInstance(name);return true;} catch (Exception e) {log.error("删除RabbitMQ实例失败,失败原因:{}", e.getMessage());return false;}}public List<String> getDynamicInstanceNames() {return dynamicRabbitMQConfig.getDynamicInstanceNames();}public Boolean appointRabbitWithExchangeSendMessage(String name, String exchange, String routingKey, String message) {try {Boolean instanceExist = dynamicRabbitMQConfig.checkInstanceExist(name);if (!instanceExist) {log.warn("实例【{}】不存在,无法发送消息", name);return false;}dynamicRabbitMQConfig.sendMessage(name, exchange, routingKey, message);return true;} catch (Exception e) {log.error("指定RabbitMQ实例发送消息失败,失败原因:{}", e.getMessage());return false;}}public Boolean appointRabbitWithQueueSendMessage(String name, String queue, String message) {try {Boolean instanceExist = dynamicRabbitMQConfig.checkInstanceExist(name);if (!instanceExist) {log.warn("实例【{}】不存在,无法发送消息", name);return false;}dynamicRabbitMQConfig.sendMessage(name, queue, message);return true;} catch (Exception e) {log.error("指定RabbitMQ实例发送消息失败,失败原因:{}", e.getMessage());return false;}}}
二、动态RabbitMQ实例中的exchange、queue动态新增及监听
1、DynamicRabbitMQConfig.java 新增操作代码
import com.chain.air.rpp.exchange.properties.RabbitInstance;
import com.chain.air.rpp.exchange.properties.RabbitProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Configuration
public class DynamicRabbitMQConfig {private final RabbitProperties rabbitProperties;private Map<String, RabbitInstance> rabbitInstanceMap = new HashMap<>();private Map<String, RabbitTemplate> rabbitTemplateMap = new HashMap<>();private Map<String, RabbitAdmin> rabbitAdminMap = new HashMap<>();private Map<String, ConnectionFactory> connectionFactoryMap = new HashMap<>();@Autowiredpublic DynamicRabbitMQConfig(RabbitProperties rabbitProperties) {this.rabbitProperties = rabbitProperties;}@PostConstructpublic void init() {rabbitProperties.getInstances().forEach(this::createRabbitInstance);}public void createRabbitInstance(RabbitInstance instance) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(instance.getHost());connectionFactory.setPort(instance.getPort());connectionFactory.setUsername(instance.getUsername());connectionFactory.setPassword(instance.getPassword());connectionFactory.setVirtualHost(instance.getVirtualHost());connectionFactoryMap.put(instance.getName(), connectionFactory);RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(new ObjectMapper()));rabbitTemplateMap.put(instance.getName(), rabbitTemplate);RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);rabbitAdminMap.put(instance.getName(), rabbitAdmin);rabbitInstanceMap.put(instance.getName(), instance);}public RabbitTemplate getRabbitTemplate(String name) {return rabbitTemplateMap.get(name);}public RabbitAdmin getRabbitAdmin(String name) {return rabbitAdminMap.get(name);}public ConnectionFactory getConnectionFactory(String name) {return connectionFactoryMap.get(name);}public Map<String, Object> getRabbitInstance(String name) {Map<String, Object> result = new HashMap<>();result.put("instance", rabbitInstanceMap.get(name));result.put("rabbitAdmin", rabbitAdminMap.get(name));result.put("rabbitTemplate", rabbitTemplateMap.get(name));result.put("connectionFactory", connectionFactoryMap.get(name));return result;}public Boolean checkInstanceExist(String name) {return rabbitInstanceMap.containsKey(name);}public List<String> getDynamicInstanceNames() {return new ArrayList<>(rabbitInstanceMap.keySet());}public void removeRabbitInstance(String name) {rabbitInstanceMap.remove(name);rabbitAdminMap.remove(name);rabbitTemplateMap.remove(name);connectionFactoryMap.remove(name);}public void sendMessage(String name, String exchange, String routingKey, String message) {try {RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(name);rabbitTemplate.convertAndSend(exchange, routingKey, message);} catch (Exception e) {log.error("RabbitMQ实例对象:{},发送消息失败,失败原因:{}", name, e.getMessage());log.error("异常堆栈信息:{}", e);}}public void sendMessage(String name, String queue, String message) {RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(name);rabbitTemplate.convertAndSend(queue, message);}public void createDirectExchange(String name, String exchangeName) {RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);DirectExchange directExchange = new DirectExchange(exchangeName, true, false);rabbitAdmin.declareExchange(directExchange);log.info("RabbitMQ实例对象:{},Direct 交换机创建成功: {}", name, exchangeName);}public void createFountExchange(String name, String exchangeName) {RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);DirectExchange fanoutExchange = new DirectExchange(exchangeName, true, false);rabbitAdmin.declareExchange(fanoutExchange);log.info("RabbitMQ实例对象:{},Fanout 交换机创建成功: {}", name, exchangeName);}public void createTopicExchange(String name, String exchangeName) {RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);TopicExchange topicExchange = new TopicExchange(exchangeName, true, false);rabbitAdmin.declareExchange(topicExchange);log.info("RabbitMQ实例对象:{},Topic 交换机创建成功: {}", name, exchangeName);}public void createHeadersExchange(String name, String exchangeName) {RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);HeadersExchange headersExchange = new HeadersExchange(exchangeName, true, false);rabbitAdmin.declareExchange(headersExchange);log.info("RabbitMQ实例对象:{},Headers 交换机创建成功: {}", name, exchangeName);}public void createQueue(String name, String queueName) {RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);Queue queue = new Queue(queueName, true, false, false);rabbitAdmin.declareQueue(queue);log.info("RabbitMQ实例对象:{},队列创建成功: {}", name, queueName);}public void deleteQueue(String name, String queueName) {RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);rabbitAdmin.deleteQueue(queueName);log.info("RabbitMQ实例对象:{},队列删除成功: {}", name, queueName);}public void createQueue(String name, String queueName, Boolean isListener) {RabbitAdmin rabbitAdmin = rabbitAdminMap.get(name);ConnectionFactory connectionFactory = connectionFactoryMap.get(name);Queue queue = new Queue(queueName, true, false, false);rabbitAdmin.declareQueue(queue);log.info("RabbitMQ实例对象:{},队列创建成功: {}", name, queueName);if (isListener) {createListener(connectionFactory, queueName);}}public void createListener(ConnectionFactory connectionFactory, String queueName) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(queueName);container.setMessageListener(new MessageListenerAdapter(new Object() {public void handleMessage(String message) {System.out.println("收到来自RabbitMQ中队列:" + queueName + " 队列的消息:" + message);}}));container.start();System.out.println("RabbitMQ队列监听器已启动:" + queueName);}
}