您的位置:首页 > 文旅 > 美景 > 抖音的商业营销手段_石家庄飞数科技_网页设计用什么软件_武汉网站推广优化

抖音的商业营销手段_石家庄飞数科技_网页设计用什么软件_武汉网站推广优化

2025/2/24 7:55:14 来源:https://blog.csdn.net/mengmengz07/article/details/144721759  浏览:    关键词:抖音的商业营销手段_石家庄飞数科技_网页设计用什么软件_武汉网站推广优化
抖音的商业营销手段_石家庄飞数科技_网页设计用什么软件_武汉网站推广优化

server端

from quart import Quart, websocketapp = Quart(__name__)@app.websocket('/test_ws')
async def ws():while True:data = await websocket.receive()await websocket.send(f"server has received: {data}")if __name__ == '__main__':app.run(debug=True, port=8000)

proxy端

from quart import Quart, websocket
import websockets
import asyncioapp = Quart(__name__)async def connect_true_server():conn = await websockets.connect('ws://localhost:8000/test_ws')return connasync def client2server(client_conn, server_conn):while True:data = await client_conn.receive()await server_conn.send(f"before PROXY:\n{data}\n")async def server2client(client_conn, server_conn):while True:data = await server_conn.recv()await client_conn.send(f"after PROXY:\n{data}\n")@app.websocket('/test_proxy')
async def test_proxy():try:server_conn = await connect_true_server()except Exception as e:print(f"error while connect to server: {e}")returntry:await asyncio.gather(client2server(websocket, server_conn),server2client(websocket, server_conn))except Exception as e:print(f"wrror while pass info: {e}")if __name__ == '__main__':app.run(debug=True, port=8901)

client端

import asyncio
import websocketsasync def websocket_client():# 连接到服务器async with websockets.connect('ws://localhost:8901/test_proxy') as websocket:# 发送消息await websocket.send("Hello, I'm Caillou!")# 接收并打印来自服务器的消息response = await websocket.recv()print(f"Received from server: {response}")# 启动客户端
asyncio.run(websocket_client())

压测proxy

import asyncio
import websockets
import time# WebSocket 服务器地址
SERVER_URI = "ws://localhost:8901/test_proxy"  # 替换为你的服务器地址
fail_num = 0# 定义客户端连接和消息发送的异步任务
async def websocket_client(client_id):try:async with websockets.connect(SERVER_URI) as websocket:# 模拟发送和接收消息message = f"Hello from client {client_id}"await websocket.send(message)response = await websocket.recv()print(f"Client {client_id} received: {response}")except Exception as e:print(f"Client {client_id} failed: {e}")# 启动多个 WebSocket 客户端来模拟压力
async def run_stress_test(num_clients):tasks = []for i in range(num_clients):tasks.append(websocket_client(i))start_time = time.time()await asyncio.gather(*tasks)end_time = time.time()print(f"Test completed in {end_time - start_time:.2f} seconds for {num_clients} clients.")# 运行测试,模拟 100 个客户端连接
asyncio.run(run_stress_test(100))

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com