您的位置:首页 > 娱乐 > 明星 > 全部免费网站软件_软件开发文档规范_合肥百度推广优化排名_企业网站营销的优缺点

全部免费网站软件_软件开发文档规范_合肥百度推广优化排名_企业网站营销的优缺点

2024/12/22 15:14:41 来源:https://blog.csdn.net/weixin_44990255/article/details/144081807  浏览:    关键词:全部免费网站软件_软件开发文档规范_合肥百度推广优化排名_企业网站营销的优缺点
全部免费网站软件_软件开发文档规范_合肥百度推广优化排名_企业网站营销的优缺点

1、引入依赖

  <!--MQTT start--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.4.4</version></dependency><!--MQTT end--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency>

2、增加yml配置

  spring:mqtt:username: testpassword: testurl: tcp://127.0.0.1:8080subClientId: singo_sub_client_id_888 #订阅 客户端idpubClientId: singo_pub_client_id_888 #发布 客户端idconnectionTimeout: 30keepAlive: 60

3、资源配置类

@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfigurationProperties {private String username;private String password;private String url;private String subClientId;private String pubClientId;private int connectionTimeout;private int keepAlive;
}

注意启动类需要增加注解

@EnableConfigurationProperties(MqttConfigurationProperties.class)

4、MQTT配置类


@Configuration
public class MqttConfig {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties;/*** 连接参数** @return*/@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttConfigurationProperties.getUsername());options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});options.setConnectionTimeout(mqttConfigurationProperties.getConnectionTimeout());options.setKeepAliveInterval(mqttConfigurationProperties.getKeepAlive());options.setCleanSession(true); // 设置为false以便断线重连后恢复会话options.setAutomaticReconnect(true);return options;}/*** 连接工厂** @param options* @return*/@Beanpublic MqttPahoClientFactory mqttClientFactory(MqttConnectOptions options) {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(options);return factory;}/*** 消息输入通道* 每次只有一个消息处理器可以消费消息。* 当前消息的处理完成之前,新消息需要排队等待,无法并行处理。* 默认是:单线程、顺序执行的* @return*/// @Bean// public DirectChannel mqttInputChannel() {//     return new DirectChannel();// }/*** 支持多线程并发处理消息的输入通道** @return*/@Beanpublic ExecutorChannel mqttInputChannel() {return new ExecutorChannel(Executors.newFixedThreadPool(10)); // 线程池大小可以调整}/*** 配置入站适配器** @param mqttClientFactory* @return*/@Beanpublic MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter(MqttPahoClientFactory mqttClientFactory) {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getSubClientId(), mqttClientFactory);// adapter.addTopic("pub/300119110099"); 订阅主题,也可以放在初始化动态配置adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** 配置消息处理器** @return*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel") // 指定通道public MessageHandler messageHandler() {return new MqttReceiverMessageHandler();}}

5、消息处理器配置

@Slf4j
@Component
public class MqttReceiverMessageHandler implements MessageHandler {@Autowiredprivate MqttMessageProcessingService mqttMessageProcessingService;@Overridepublic void handleMessage(Message<?> message) throws MessagingException {MessageHeaders headers = message.getHeaders();log.info("线程名称:{},收到消息,主题:{},消息:{}", Thread.currentThread().getName(), headers.get("mqtt_receivedTopic").toString(), message.getPayload());// log.info("收到消息主题:{}", headers.get("mqtt_receivedTopic").toString());// log.info("收到消息:{}", message.getPayload());// 消息保存到内存队列里面,定时批量入库,也可以在这里直接入库mqttMessageProcessingService.addMessage(message.getPayload().toString());}
}

6、消息主题缓存对象

@Component
public class MqttTopicStore {private final ConcurrentHashMap<String, String> topics = new ConcurrentHashMap<>();public ConcurrentHashMap<String, String> getTopics() {return topics;}
}

7、动态订阅数据库主题配置

@Slf4j
@Component
public class MqttInit {@Autowiredprivate MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter;@Autowiredprivate MqttTopicStore mqttTopicStore;@PostConstructpublic void init() {subscribeAllTopics();}public void subscribeAllTopics() {// List<MqttTopicConfig> topics = topicConfigMapper.findAllEnabled();// for (MqttTopicConfig topic : topics) {//     subscribeTopic(topic);// }log.info("===================>从数据库里获取并初始化订阅所有主题");List<String> topics = ListUtil.list(false, "pub/300119110099", "pub1/3010230209810018992", "pub1/30102302098100");topics.stream().forEach(t -> {messageDrivenChannelAdapter.addTopic(t);// 同时往MqttTopicStore.topics中增加一条记录用于缓存});}}

8、消息处理服务

@Service
public class MqttMessageProcessingService {@Autowiredprivate MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter;@Autowiredprivate MqttTopicStore mqttTopicStore;// 内存队列,用于暂存消息private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();// 添加消息到队列public void addMessage(String message) {messageQueue.add(message);}/*** 可以放到定时任务里面去,注入后取队列方便维护* 定时任务,每5秒执行一次 ,建议2分钟一次 理想的触发间隔应略小于数据到达间隔,以确保及时处理和插入* 如果每 5 分钟收到一条数据,可以设置任务执行周期为4 分钟或更短,以便任务有足够的时间处理数据,同时减少积压的可能性。*/@Scheduled(fixedRate = 1 * 60 * 1000)public void batchInsertToDatabase() {System.out.println("定时任务执行中,当前队列大小:" + messageQueue.size());List<String> batch = new ArrayList<>();messageQueue.drainTo(batch, 500); // 一次性取最多500条消息if (!batch.isEmpty()) {// 批量插入数据库saveMessagesToDatabase(batch);}}private void saveMessagesToDatabase(List<String> messages) {// 假设这是批量插入逻辑System.out.println("批量插入数据库,条数:" + messages.size());for (String message : messages) {System.out.println("插入消息:" + message);}// 实际数据库操作代码}/*** 订阅与取消订阅定时任务*/public void subscribeAndUnsubscribeTask() {// 从数据库获取所有主题,正常状态、删除状态// 正常状态:判断mqttTopicStore.topics中是否存在,不存在则订阅,并在mqttTopicStore.topics中增加// 删除状态: 判断mqttTopicStore.topics中是否存在,存在则取消订阅,并在mqttTopicStore.topics中删除// messageDrivenChannelAdapter.addTopic(t);}
}

以上是简单的对接步骤,部分类、方法可以根据实际情况进行合并处理!!!!

9、定时任务

@Slf4j
@Configuration
@EnableScheduling
public class MqttJob {@Value("${schedule.enable}")private boolean enable;@Autowiredprivate MqttMessageProcessingService mqttMessageProcessingService;/*** 定时订阅与取消订阅主题,从共享主题对象MqttTopicStore里面取出主题列表,然后进行订阅或取消订阅* 每分钟一次*/public void subscribeAndUnsubscribe() {if (!enable) return;mqttMessageProcessingService.subscribeAndUnsubscribeTask();}/*** 定时处理队列里面的订阅消息,会有丢失风险,宕机时会丢失队列里面的消息* 每分钟一次 要考虑一次消息处理的时间;也可先不使用队列,每次收到消息直接实时入库,有性能问题时在启用*/public void batchSaveSubscribeMessage() {}}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com