import tkinter as tk
from tkinter import ttk
import threading
import queue
import json
import time
import websocket
from SignUtil import SignUtil # 请确保SignUtil.py在同一目录
class WebSocketClient:
def init(self, params, msg_queue, stop_event):
self.params = params
self.msg_queue = msg_queue
self.stop_event = stop_event
self.ws = None
self.heartbeat_interval = 30
def on_message(self, message):self.msg_queue.put(("info", f"接收消息: {message}"))def run(self):try:# 生成签名sign_util = SignUtil()timestamp = str(int(time.time() * 1000))sign = sign_util.get_sign(self.params["system_id"],self.params["system_key"],timestamp)# 构造连接URLurl = f"{self.params['url']}systemId={self.params['system_id']}×tamp={timestamp}&sign={sign}"self.ws = websocket.create_connection(url)# 发送订阅请求topics = [t.strip() for t in self.params["topics"].split(",")]subscription = json.dumps({"cmd": "subscribe", "topics": topics})self.ws.send(subscription)self.msg_queue.put(("info", "订阅请求已发送"))# 主循环last_heartbeat = time.time()while not self.stop_event.is_set():try:message = self.ws.recv()self.on_message(message)# 心跳处理if time.time() - last_heartbeat > self.heartbeat_interval:self.ws.send(json.dumps({"cmd": "keepAlive", "data": "heartbeat"}))last_heartbeat = time.time()except websocket.WebSocketConnectionClosedException:self.msg_queue.put(("error", "连接已关闭"))breakexcept Exception as e:self.msg_queue.put(("error", f"接收错误: {str(e)}"))breakexcept Exception as e:self.msg_queue.put(("error", f"连接失败: {str(e)}"))finally:if self.ws:self.ws.close()self.stop_event.set()
class WebSocketGUI:
def init(self, root):
self.root = root
self.root.title(“WebSocket 自动化平台”)
# 控制变量self.is_connected = Falseself.stop_event = threading.Event()self.msg_queue = queue.Queue()# 创建UIself.create_widgets()# 启动消息检查self.root.after(100, self.process_messages)def create_widgets(self):# 输入框框架input_frame = ttk.LabelFrame(self.root, text="连接参数")input_frame.pack(padx=10, pady=5, fill=tk.X)# URLttk.Label(input_frame, text="URL:").grid(row=0, column=0, sticky=tk.W)self.url_entry = ttk.Entry(input_frame, width=50)self.url_entry.grid(row=0, column=1, padx=5, pady=2)# System IDttk.Label(input_frame, text="System ID:").grid(row=1, column=0, sticky=tk.W)self.system_id_entry = ttk.Entry(input_frame, width=50)self.system_id_entry.grid(row=1, column=1, padx=5, pady=2)# System Keyttk.Label(input_frame, text="System Key:").grid(row=2, column=0, sticky=tk.W)self.system_key_entry = ttk.Entry(input_frame, width=50)self.system_key_entry.grid(row=2, column=1, padx=5, pady=2)# Topicsttk.Label(input_frame, text="Topics (逗号分隔):").grid(row=3, column=0, sticky=tk.W)self.topics_entry = ttk.Entry(input_frame, width=50)self.topics_entry.grid(row=3, column=1, padx=5, pady=2)# 按钮框架btn_frame = ttk.Frame(self.root)btn_frame.pack(pady=5)self.start_btn = ttk.Button(btn_frame, text="开始", command=self.start_connection)self.start_btn.pack(side=tk.LEFT, padx=5)self.stop_btn = ttk.Button(btn_frame, text="停止",command=self.stop_connection,state=tk.DISABLED)self.stop_btn.pack(side=tk.LEFT, padx=5)# 日志输出log_frame = ttk.LabelFrame(self.root, text="日志输出")log_frame.pack(padx=10, pady=5, fill=tk.BOTH, expand=True)self.log_text = tk.Text(log_frame, height=15, state=tk.DISABLED)self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)def start_connection(self):params = {"url": self.url_entry.get(),"system_id": self.system_id_entry.get(),"system_key": self.system_key_entry.get(),"topics": self.topics_entry.get()}if not all(params.values()):self.display_message("error", "所有字段必须填写")returnself.stop_event.clear()self.client_thread = threading.Thread(target=WebSocketClient(params,self.msg_queue,self.stop_event).run,daemon=True)self.client_thread.start()self.start_btn.config(state=tk.DISABLED)self.stop_btn.config(state=tk.NORMAL)self.display_message("info", "正在建立连接...")def stop_connection(self):self.stop_event.set()self.start_btn.config(state=tk.NORMAL)self.stop_btn.config(state=tk.DISABLED)self.display_message("info", "正在断开连接...")def process_messages(self):while not self.msg_queue.empty():msg_type, message = self.msg_queue.get()self.display_message(msg_type, message)self.root.after(100, self.process_messages)def display_message(self, msg_type, message):self.log_text.config(state=tk.NORMAL)tag = "error" if msg_type == "error" else "info"self.log_text.insert(tk.END, message + "\n", tag)self.log_text.see(tk.END)self.log_text.config(state=tk.DISABLED)# 配置标签样式self.log_text.tag_config("error", foreground="red")self.log_text.tag_config("info", foreground="black")
if name == “main”:
root = tk.Tk()
app = WebSocketGUI(root)
root.geometry(“800x600”)
root.mainloop()
打包命令:pyinstaller --onefile --windowed WebsocketTest.py