您的位置:首页 > 科技 > IT业 > 专业的网页制作服务好_谷歌网站提交入口_中国四大软件外包公司_天津seo管理平台

专业的网页制作服务好_谷歌网站提交入口_中国四大软件外包公司_天津seo管理平台

2024/10/5 11:21:35 来源:https://blog.csdn.net/qq_43155814/article/details/142605160  浏览:    关键词:专业的网页制作服务好_谷歌网站提交入口_中国四大软件外包公司_天津seo管理平台
专业的网页制作服务好_谷歌网站提交入口_中国四大软件外包公司_天津seo管理平台

发送 TCP/UDP/MULTICAST 数据并接收响应。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socketclass ClientSocket:def __init__(self, *, protocol: str, ip: str, port: int, recv_timeout: float = 1.5):"""客户端套接字发送 TCP/UDP/MULTICAST 数据并接收响应。Args:protocol (str): 协议ip (str): ip 地址port (int): 端口号recv_timeout (float, optional): 接收超时时间. Defaults to 1.5.Raises:ValueError: 无效的端口号, 应为 [1-65535]ValueError: 无效的协议类型, 应为 [TCP, UDP, MULTICAST]"""if port < 1 or port > 65535:raise ValueError(f'ServerSocket 无效的端口号 "{port}"')if protocol not in ['TCP', 'UDP', 'MULTICAST']:raise ValueError(f'ServerSocket 无效的协议类型 "{protocol}"')self.protocol = protocolself.ip = ipself.port = portself.recv_timeout = recv_timeoutself.sock: socket.socket | None = Noneself.__connected = Falsedef __str__(self) -> str:return f"ClientSocket({self.ip}:{self.port})"def connect(self) -> bool:if not self.__connected:try:match self.protocol:case 'TCP':self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.sock.connect((self.ip, self.port))case 'UDP':self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)if self.ip.endswith('.255'): # 设置 SO_BROADCAST 为 1, 允许发送广播数据包self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)case 'MULTICAST':self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1)self.sock.bind(('', self.port))self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self.ip) + socket.inet_aton('0.0.0.0'))self.sock.settimeout(self.recv_timeout)self.__connected = Trueexcept ConnectionRefusedError:print(f'{self} 无法连接: {self.ip}:{self.port}')except Exception as e:print(f'{self} 创建失败: {e}')return self.__connecteddef send(self, data: bytes) -> None:try:print(f'{self} 发送数据: {data}')if self.protocol == 'TCP':self.sock.sendall(data)else:self.sock.sendto(data, (self.ip, self.port))except OSError as e:if e.errno == 10057:print(f'{self} 发送失败连接未建立')else:print(f'{self} 发送失败: {e}')def recv(self) -> bytes | None:try:data, _ = self.sock.recvfrom(1024)print(f'{self} 收到数据: {data}')return dataexcept TimeoutError:return Noneexcept OSError as e:if e.errno == 10057:print(f'{self} 接收失败连接未建立')else:print(f'{self} 接收失败: {e}')def close(self) -> bool:if self.__connected:try:self.sock.shutdown(socket.SHUT_RDWR)self.sock.close()self.__connected = Falseexcept Exception as e:print(f'{self} 关闭失败: {e}')return not self.__connectedif __name__ == '__main__':from time import sleep# client = ClientSocket(protocol='TCP', ip='127.0.0.1', port=60000)# client = ClientSocket(protocol='UDP', ip='127.0.0.1', port=60000)client = ClientSocket(protocol='MULTICAST', ip='224.1.1.1', port=65000)client.connect()times = 3while times > 0:times -= 1client.send(b'hello')client.recv()sleep(1)client.send(b'q')client.recv()client.close()

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com