用于服务端和客户端通信,服务端主动给客户端发送消息
前提:
确保安装了socket库:
pip install flask-socketio python-socketio
服务端代码
from flask import Flask
from flask_socketio import SocketIO
import threading
import timeapp = Flask(__name__)
socketio = SocketIO(app)# 全局变量
t = 0
list230 = []def read230():global tglobal list230while True:try:list230 = [1, 1, 1, 0, 0]socketio.emit("update_data", {"data": list230})except Exception as exc:list230 = [0, 0, 0, 100, 100]time.sleep(1)time.sleep(0.1)if __name__ == "__main__":t1 = threading.Thread(target=read230, name="read230")t1.start()socketio.run(app, host="0.0.0.0", port=5000)
客户端是另一个程序,你可以使用 SocketIO 客户端库来接收数据
import socketio# 创建一个 SocketIO 客户端
sio = socketio.Client()@sio.event
def connect():print("连接成功")@sio.event
def update_data(msg):print("接收到数据:", msg['data'])@sio.event
def disconnect():print("断开连接")if __name__ == '__main__':# 连接到服务器sio.connect('http://127.0.0.1:5000')try:# 保持程序运行while True:passexcept KeyboardInterrupt:sio.disconnect()
结果