这个组件单次只能订阅一个主题,比较局限因此在此组件的基础上升级成本组件
vue连接mqtt实现收发消息组件超级详细_vue mqtt-CSDN博客
<!--* @Author: mhf* @Date: 2024/10/14 15:18
-->
<template><div><mqttManyCompref="mqttManyComp":topics="topicArr":mqttUrl="mqttUrl":mqttOpts="mqttOpts"@message-received="getMqttMessage"@mqtt-close="mqttClose"/></div>
</template><script>
import mqttManyComp from "@/views/cockpit/modules/information/mqttManyComp.vue";export default {name: "drawCar",components: { mqttManyComp },mixins: [],props: {},computed: {},watch: {},filters: {},data() {return {topicArr: [],mqttUrl: {head: "ws",host: "",port: ,tailPath: "mqtt",},mqttOpts: {keepalive: 60,clientId: "clientId-" + Math.random().toString(16).substr(2, 8),username: "",password: "",connectTimeout: 10 * 1000,},radarList: [],radarInViewport: [],timer: null,};},methods: {/*** @Event 订阅雷达轨迹主题消息* @description:* @author: mhf* @time: 2024-10-15 10:18:33**/subscribeTopics() {function getTopic(code) {return code;}setTimeout(() => {this.$refs.mqttManyComp.initMqtt();}, 500);setTimeout(() => {this.mqttClose();}, 1000);},getMqttMessage(message) {setTimeout(() => {// todo 这里是 收到的数据}, 500);},mqttClose() {/* 关闭 */this.assetCodeArr.forEach((code) => {this.$refs.mqttManyComp.mqttPublish(`smart_trans/${code}/down/track`, {id: code,switch: "on",});});},mqttEnd() {setTimeout(() => {this.assetCodeArr.forEach((code) => {this.$refs.mqttManyComp.mqttPublish(`smart_trans/${code}/down/track`,{id: code,switch: "off",},);});}, 1000);setTimeout(() => {this.bus.$emit("information-remove-car-marker-drawCar");this.$refs.mqttManyComp.handleEnd();}, 1500);},listenFlowCanvasNewClose() {this.bus.$on("reconnection", () => {// this.$message.success("弹窗关闭,重建链接");this.mqttClose();});},},created() {},mounted() {this.listenFlowCanvasNewClose();},destroyed() {},
};
</script><style lang="scss" scoped></style>
使用方式
<template><div></div>
</template><script>
import mqtt from "mqtt";
import { refreshByTime } from "@/utils";export default {name: "mqttManyComp",props: {topics: {type: Array,default: () => [],}, // 订阅主题数组mqttUrl: {type: Object,default: () => ({head: "", // 必须是 ws 或 wsshost: "", // ip地址port: , // 服务端口tailPath: "", // 服务路径}),}, // 服务地址mqttOpts: {type: Object,default: () => ({keepalive: 60,clientId: "clientId-" + Math.random().toString(16).substr(2, 8),username: "你的用户名",password: "你的密码",connectTimeout: 10 * 1000,}),}, // 连接配置},data() {return {client: "",clientCount: 0,receivedMessage: null, // 用于存储接收到的消息refreshTimer: null, // 定时刷新};},watch: {topics(newTopics) {if (newTopics.length > 0 && this.client) {newTopics.forEach((topic) => {this.client.unsubscribe(topic);});newTopics.forEach((topic) => {this.client.subscribe(topic);});}},},methods: {async initMqtt() {console.log(this.topics, "mqttManyComp组件接收到的topics");let opts = JSON.parse(JSON.stringify(this.mqttOpts));this.client = mqtt.connect(`${this.mqttUrl.head}://${this.mqttUrl.host}:${this.mqttUrl.port}/${this.mqttUrl.tailPath}`,opts,);this.client.on("connect", this.handleConnect);this.client.on("message", this.handleMessage);this.client.on("reconnect", this.handleReconnect);this.client.on("error", this.handleError);},handleConnect() {this.topics.forEach((topic) => {console.log("mqtt_连接成功", topic);this.client.subscribe(topic);});},handleMessage(topic, message) {this.receivedMessage = JSON.parse(message?.toString() || {});this.$emit("message-received", { topic, message: this.receivedMessage });},handleReconnect(error) {// console.log(`正在第${this.clientCount}次重连`, error, this.topics);this.clientCount++;if (this.clientCount >= 10) {this.client.end();}},handleError(error) {// console.log("连接失败", error);},mqttPublish(topic, message) {if (!this.client.connected) {console.error("MQTT client is not connected");return;}console.log("mqtt_发送消息", topic, message);this.client.publish(topic, JSON.stringify(message));},refresh() {this.refreshTimer = refreshByTime(() => {this.$emit("mqtt-close");},false,1000 * 25,);},handleEnd() {this.client.end(true, {}, () => {// this.$message.success("MQTT连接已成功关闭");});},},async mounted() {await this.refresh();// await this.initMqtt();},beforeDestroy() {this.$emit("mqtt-close"); // 关闭mqtt链接需要的前置操作setTimeout(() => {this.client.end(true, {}, () => {console.log("MQTT连接已成功关闭");});}, 100);this.refresh();},
};
</script><style lang="scss" scoped></style>
refreshByTime
*** 定时刷新* @param {function} callback 定时函数* @param {boolean} isFirst 是否首次执行* @param {number} time 定时请求周期*/
export function refreshByTime(callback = () => {},isFirst = true,time = 1000 * 60 * 5,
) {console.error("refreshByTime 运行", timer);let timer = null;if (isFirst) {callback();}timer = setInterval(callback, time);return () => {if (timer) {console.error("refreshByTime 移除", timer);clearInterval(timer);timer = null;}};
}