您的位置:首页 > 科技 > IT业 > 爱网站推广优化_什么是网络营销中的广告联盟_东莞做网站哪个公司好_网络营销平台排名

爱网站推广优化_什么是网络营销中的广告联盟_东莞做网站哪个公司好_网络营销平台排名

2025/2/24 12:23:51 来源:https://blog.csdn.net/weixin_38126537/article/details/145808637  浏览:    关键词:爱网站推广优化_什么是网络营销中的广告联盟_东莞做网站哪个公司好_网络营销平台排名
爱网站推广优化_什么是网络营销中的广告联盟_东莞做网站哪个公司好_网络营销平台排名

新建PLC类 PLC.py

import json
import time
from threading import Threadfrom HslCommunication import SiemensS7Net, SiemensPLCS
from PySide6.QtCore import QThread, Signal, QObjectfrom tdm.MsgType import MSG_TYPE_LOG, MSG_TYPE_MSGBOX# 自定义信号类,用于与主线程进行交互
class WorkSingle(QObject):msg = Signal(str)class PLC(QThread):def __init__(self, address, port):super().__init__()self.siemens = None  # 西门子对象self.connected_plc = None  # 是否连接上了PLCself.signal = WorkSingle() # 信道,与主线程通信self.address = address # PLC IP地址self.port = port # PLC 连接端口# 发送日志消息def send_log_msg(self, msg):data = {"type": MSG_TYPE_LOG,"msg": msg,}self.signal.msg.emit(json.dumps(data))# 发送弹窗类消息def send_dialog_msg(self, msg):data = {"type": MSG_TYPE_MSGBOX,"msg": msg,}self.signal.msg.emit(json.dumps(data))def init_plc_connect(self):self.siemens = SiemensS7Net(SiemensPLCS.S1200, self.address)self.siemens.port = int(self.port)connect = self.siemens.ConnectServer()if not connect.IsSuccess:self.connected_plc = False# print('初始化 PLC 连接失败: ' + connect.ToMessageShowString())self.send_log_msg('初始化 PLC 连接失败: ' + connect.ToMessageShowString())else:self.connected_plc = Trueprint("初始化 PLC 连接成功!")self.send_log_msg("初始化 PLC 连接成功!")def run(self):self.init_plc_connect()  # 初始化PLC连接if not self.connected_plc:print("PLC连接失败,不能连续读取")self.send_log_msg("PLC连接失败,不能连续读取")returnprint("TODO 开始连续读取PLC值")self.send_dialog_msg("AAAAA")Thread(target=self.read_plc_value).start()print("即将执行耗时操作,读取完毕1")self.send_log_msg("即将执行耗时操作,开启新的线程去执行")self.send_dialog_msg("CCCCC")def read_plc_value(self):time.sleep(10)print(self.port)self.send_dialog_msg("BBBBBBBB")print("耗时操作,读取完毕")self.send_log_msg("耗时操作,读取完毕")

创建 TDMSocketClient.py

import asyncio
import json
import time
import traceback
from threading import Threadimport websockets
from PyQt6.QtCore import QThread
from PySide6.QtCore import QObject, Signalfrom tdm.MsgType import MSG_TYPE_LOG# 自定义信号类,用于与主线程进行交互
class WorkSingle(QObject):msg = Signal(str)class TDMSocketClient(QThread):def __init__(self, ip, port):QThread.__init__(self)self.ip = ipself.port = portself.signal = WorkSingle()def run(self):# 启动 WebSocket 客户端连接服务器,用于接受服务器发来的试验通知Thread(target=self.runWSClient).start()def send_log_msg(self, msg):data = {"type": MSG_TYPE_LOG,"msg": msg,}self.signal.msg.emit(json.dumps(data))def runWSClient(self):# 下面的语句替换上面这一句,就不会报错啦,成功率高new_loop = asyncio.new_event_loop()asyncio.set_event_loop(new_loop)loop = asyncio.get_event_loop()task = asyncio.ensure_future(self.ws_client())loop.run_until_complete(asyncio.wait([task]))st = task.result()async def ws_client(self):try:uri = 'ws://' + self.ip + ':' + self.port + '/tdm/websocket/JZ076'async with websockets.connect(uri) as websocket:# localClient.setSuccessStatus("websocket 连接成功!")print('Connected to ' + self.ip + ':' + self.port + "success")self.send_log_msg('Connected to ' + self.ip + ':' + self.port + "success")while True:result = await websocket.recv()msg = json.loads(result)try:if msg["cmd"] == "startTest":print("开始试验")# 开始试验# localClient.startTest(msg['testInsId'])elif msg["cmd"] == "pauseTest":# 暂停试验# localClient.testPause()print("暂停试验")elif msg["cmd"] == "stopTest":print("停止试验")# 停止试验# localClient.testStop()except:# 报错的话自动重连traceback.print_exc()except:# 报错的话自动重连traceback.print_exc()time.sleep(1)await self.ws_client()

主类 TDMClient.py

'''
TDM 客户端
'''
import json
from typing import Finalfrom PyQt6.QtWidgets import QMessageBox
from PySide6.QtCore import QFile
from PySide6.QtUiTools import QUiLoader
from PySide6.QtWidgets import QApplicationfrom tdm.MsgType import MSG_TYPE_LOG, MSG_TYPE_MSGBOX
from tdm.PLC import PLC
from tdm.TDMSocketClient import TDMSocketClientPLC_ADDRESS: Final = "192.168.1.83"
PLC_PORT: Final = "102"WS_SERVICE_ADDRESS: Final = "localhost"
WS_SERVICE_PORT: Final = "8080"# 显示弹窗
def show_msg_box(msg):mb = QMessageBox()mb.setText(msg)mb.exec()class TDMClient:def __init__(self):# 获取设计文件qf = QFile('ui/TDMLocalClient.ui')qf.open(QFile.ReadOnly)qf.close()# 将设计文件加载为窗口self.ui = QUiLoader().load(qf)# PLC 读取相关self.plc = PLC(PLC_ADDRESS, PLC_PORT)self.plc.signal.msg.connect(self.receive_signal)self.ws = TDMSocketClient(WS_SERVICE_ADDRESS, WS_SERVICE_PORT)self.ws.signal.msg.connect(self.receive_signal)def run(self):self.ui.show()  # 显示客户端self.plc.run()  # 获取试验器的点位列表,并连续读取self.ws.run()  # 启动 WebSocket 客户端服务,用于接收服务器发来的 试验开始、试验暂停、试验停止等信息# 收到子线程信号是触发(唯一方法入口)def receive_signal(self, msg):msg_obj = json.loads(msg)if msg_obj['type'] == MSG_TYPE_LOG:self.ui.retView.appendPlainText(str(msg_obj['msg']))elif msg_obj['type'] == MSG_TYPE_MSGBOX:show_msg_box(str(msg_obj['msg']))app = QApplication([])
tdmClient = TDMClient()
tdmClient.run()
app.exec()

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com