server端
from quart import Quart, websocketapp = Quart(__name__)@app.websocket('/test_ws')
async def ws():while True:data = await websocket.receive()await websocket.send(f"server has received: {data}")if __name__ == '__main__':app.run(debug=True, port=8000)
proxy端
from quart import Quart, websocket
import websockets
import asyncioapp = Quart(__name__)async def connect_true_server():conn = await websockets.connect('ws://localhost:8000/test_ws')return connasync def client2server(client_conn, server_conn):while True:data = await client_conn.receive()await server_conn.send(f"before PROXY:\n{data}\n")async def server2client(client_conn, server_conn):while True:data = await server_conn.recv()await client_conn.send(f"after PROXY:\n{data}\n")@app.websocket('/test_proxy')
async def test_proxy():try:server_conn = await connect_true_server()except Exception as e:print(f"error while connect to server: {e}")returntry:await asyncio.gather(client2server(websocket, server_conn),server2client(websocket, server_conn))except Exception as e:print(f"wrror while pass info: {e}")if __name__ == '__main__':app.run(debug=True, port=8901)
client端
import asyncio
import websocketsasync def websocket_client():# 连接到服务器async with websockets.connect('ws://localhost:8901/test_proxy') as websocket:# 发送消息await websocket.send("Hello, I'm Caillou!")# 接收并打印来自服务器的消息response = await websocket.recv()print(f"Received from server: {response}")# 启动客户端
asyncio.run(websocket_client())
压测proxy
import asyncio
import websockets
import time# WebSocket 服务器地址
SERVER_URI = "ws://localhost:8901/test_proxy" # 替换为你的服务器地址
fail_num = 0# 定义客户端连接和消息发送的异步任务
async def websocket_client(client_id):try:async with websockets.connect(SERVER_URI) as websocket:# 模拟发送和接收消息message = f"Hello from client {client_id}"await websocket.send(message)response = await websocket.recv()print(f"Client {client_id} received: {response}")except Exception as e:print(f"Client {client_id} failed: {e}")# 启动多个 WebSocket 客户端来模拟压力
async def run_stress_test(num_clients):tasks = []for i in range(num_clients):tasks.append(websocket_client(i))start_time = time.time()await asyncio.gather(*tasks)end_time = time.time()print(f"Test completed in {end_time - start_time:.2f} seconds for {num_clients} clients.")# 运行测试,模拟 100 个客户端连接
asyncio.run(run_stress_test(100))