您的位置:首页 > 房产 > 建筑 > windows搭建mqtt服务器,并配置DTU收集传感器数据java处理(下)

windows搭建mqtt服务器,并配置DTU收集传感器数据java处理(下)

2025/3/10 15:11:22 来源:https://blog.csdn.net/qq_34178998/article/details/140204253  浏览:    关键词:windows搭建mqtt服务器,并配置DTU收集传感器数据java处理(下)

前言:上一篇文章,我们已经搭建好MQTT服务器了,并且DTU设备。链接好传感器也能直接往MQTT消息发送收集的HEX格式的温湿度数据了。所以下一步我们需要写个程序把这个消息给接收到,然后对消息进行处理、本人使用的是java程序,对MQTT的消息进行处理

介绍:搭建使用的是若依基本的脚手架,前后端分离的版本,可自行搜索下载,这里就不在多做介绍

1、添加mqtt依赖

        <!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.9</version></dependency>

2、配置一下yml配置文件信息

spring:mqtt:username: MQTT2 # 用户名password: 123456 # 密码hostUrl: tcp://mqtt服务器ip:1883 # tcp://服务器ip:1883clientId: client_java # 客户端名称(名称不能命名一样的)defaultTopic: dtutopic # 订阅主题timeout: 100 # 超时时间 (单位:秒)keepalive: 60 # 心跳 (单位:秒)enabled: true # 是否使能mqtt功能millis: 300 # 间隔多长时间发布一组数据 (毫秒)

3、MqttConfing类配置信息

package com.ruoyi.framework.config;import com.ruoyi.framework.web.mqtt.MqttPushClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {@Autowiredprivate MqttPushClient mqttPushClient;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;/*** mqtt功能使能*/@Value("${spring.mqtt.enabled}")private boolean enabled;public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getHostUrl() {return hostUrl;}public void setHostUrl(String hostUrl) {this.hostUrl = hostUrl;}public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public String getDefaultTopic() {return defaultTopic;}public void setDefaultTopic(String defaultTopic) {this.defaultTopic = defaultTopic;}public int getTimeout() {return timeout;}public void setTimeout(int timeout) {this.timeout = timeout;}public int getKeepalive() {return keepalive;}public void setKeepalive(int keepalive) {this.keepalive = keepalive;}public boolean isEnabled() {return enabled;}public void setEnabled(boolean enabled) {this.enabled = enabled;}@Beanpublic MqttPushClient getMqttPushClient() {if(enabled == true){//单个主题订阅mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//连接mqttPushClient.subscribe(defaultTopic, 0);//订阅主题}return mqttPushClient;}
}

注意:我这里订阅了单个主题,所以这个这么写的,如果订阅多个主题要对应修改

if(enabled == true){String mqtt_topic[] = StringUtils.split(defaultTopic, ",");mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//连接for(int i=0; i<mqtt_topic.length; i++){mqttPushClient.subscribe(mqtt_topic[i], 0);//订阅主题}}

4、订阅MQTT消息的类MqttPushClient 和MqttMsgCallback

package com.ruoyi.framework.web.mqtt;import com.ruoyi.framework.web.domain.AjaxResult;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** Mqtt推送客户端*/@Component
public class MqttPushClient {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate MqttMsgCallback mqttMsgCallback;private static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client = client;}/*** 客户端连接** @param host      ip+端口* @param clientID  客户端Id* @param username  用户名* @param password  密码* @param timeout   超时时间* @param keepalive 保留数*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);MqttPushClient.setClient(client);try {client.setCallback(mqttMsgCallback);client.connect(options);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 发布** @param qos         连接方式* @param retained    是否保留* @param topic       主题* @param pushMessage 消息体*/public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);if (null == mTopic) {logger.error("topic not exist");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();return AjaxResult.success();} catch (MqttPersistenceException e) {e.printStackTrace();return AjaxResult.error();} catch (MqttException e) {e.printStackTrace();return AjaxResult.error();}}/*** 订阅某个主题** @param topic 主题* @param qos   连接方式*/public void subscribe(String topic, int qos) {logger.info("开始订阅主题:" + topic);try {MqttPushClient.getClient().subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}}

MqttMsgCallback

package com.ruoyi.framework.web.mqtt;import com.ruoyi.framework.config.MqttConfig;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** Mqtt订阅端*/@Component
public class MqttMsgCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate MqttConfig mqttConfig;private static MqttClient client;@Overridepublic void connectionLost(Throwable throwable) {logger.info("连接断开,进行重连");// 连接丢失后,一般在这里面进行重连if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {// 订阅后得到的消息会执行到这里面logger.info("接收消息主题 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());byte[] payload = mqttMessage.getPayload();// 打印读取的数据String data3 = bytesToHex(payload).trim();//去除空格String[] hexBytes = data3.split(" ");byte[] bytes = new byte[hexBytes.length];for (int i = 0; i < hexBytes.length; i++) {bytes[i] = (byte) Integer.parseInt(hexBytes[i], 16);}// 偏置地址为1的温度数据float temperature = bytes[3] << 8 | (bytes[4] & 0xFF); // bytes[3]是高字节,bytes[4]是低字节// 偏置地址为2的湿度数据float humidity = bytes[5] << 8 | (bytes[6] & 0xFF); // bytes[5]是高字节,bytes[6]是低字节// 输出结果logger.info("温度数据: " + temperature/10+"℃");logger.info("湿度数据: " + humidity/10+"%");}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}// 将字节数组转换成十六进制字符串,方便查看private static String bytesToHex(byte[] bytes) {StringBuilder sb = new StringBuilder();for (byte b : bytes) {sb.append(String.format("%02X ", b));}return sb.toString();}}

之后就配置结束啦,然后开始启动若依项目,项目启动的时候,就回去订阅我们指定的topic了。

5、打开,MQTTX客户端工具,对设备进行订阅监听,过几分钟后,我们的客户端工具已经成功的收到设备发到MQTT的数据了

6、看一下MQTT主页(见上一篇博客),有几个客户端连接,通过每个客户端的链接ID(之前强调的是每个客户端连接ID不能相同),可以看到,目前三个客户端都正常链接着在。DTU设备,向主题发消息,java程序和MQTTX订阅消费这个主题消息,具体的clientId,可以看下我之前的博客,都是对应的上的

这是再看下控制台已经打印出来的了,和客户端工具相同的数据,因为客户端工具,和我们的程序是同时订阅MQTTd的dtutopic主题的,所以能收到相同的数据,说明我们的消息是通了。

之后的步骤,就自己对这些数据进行处理了,是入库还是做其他操作,都在MqttMsgCallback类中进行处理

 上篇文章:windows搭建mqtt服务器,并配置DTU收集传感器数据(上)

文章参考:springboot整合mqtt-CSDN博客

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com