一、安装依赖
pip install pycryptodome
pip install requests
二、公钥读取
def public_key(key):"""处理公钥公钥格式key,处理成以-----BEGIN PUBLIC KEY-----开头,-----END PUBLIC KEY-----结尾的格式: param key:pem格式的公钥,无-----BEGIN PUBLIC KEY-----开头,-----END PUBLIC KEY-----结尾: return:"""start = '-----BEGIN PUBLIC KEY-----\n'end = '-----END PUBLIC KEY-----'result = ''# 分割key,每64位长度换一行divide = int(len(key) / 64)divide = divide if (divide > 0) else divide + 1line = divide if (len(key) % 64 == 0) else divide + 1for i in range(line):result += key[i * 64:(i + 1) * 64] + '\n'result = start + result + endreturn result
三、私钥读取
def private_pem(pem):"""处理私钥私钥格式pem,处理成以-----BEGIN PRIVATE KEY-----开头,-----END PRIVATE KEY-----结尾的格式: param key:pem格式的公钥,无-----BEGIN PUBLIC KEY-----开头,-----END PUBLIC KEY-----结尾: return:"""start = '-----BEGIN PRIVATE KEY-----\n'end = '-----END PRIVATE KEY-----'result = ''# 分割key,每64位长度换一行divide = int(len(pem) / 64)divide = divide if (divide > 0) else divide + 1line = divide if (len(pem) % 64 == 0) else divide + 1for i in range(line):result += pem[i * 64:(i + 1) * 64] + '\n'result = start + result + endreturn result
四、普通加密
def token_encrypt(key, content):"""ras 加密[公钥加密]: param key: 无BEGIN PUBLIC KEY头END PUBLIC KEY尾的pem格式key: param content:待加密内容: return:"""pub_key = public_key(key)pub = RSA.import_key(pub_key)cipher = PKCS1_v1_5.new(pub)encrypt_bytes = cipher.encrypt(content.encode(encoding='utf-8'))result = base64.b64encode(encrypt_bytes)result = str(result, encoding='utf-8')return result
五、普通解密
def token_decrypt(pem, content):"""rsa 解密[私钥解密]: param pem:: param content:: return:"""pri_pem = private_pem(pem)pri = RSA.import_key(pri_pem)cipher = PKCS1_v1_5.new(pri)random_generator = Random.new().readtext = cipher.decrypt(base64.b64decode(content), random_generator)return text.decode('utf8')
六、分段式加密
def resource_decrypt(pem, content):"""rsa 解密[私钥解密--分段式]: param pem:: param content:: return:"""msg = base64.b64decode(content)length = len(msg)default_length = 128pri_pem = private_pem(pem)pri = RSA.import_key(pri_pem)cipher = PKCS1_v1_5.new(pri)if length < default_length:return b''.join(cipher.decrypt(msg, b'xyz'))# 进行解密offset = 0res = []while length - offset > 0:if length - offset > default_length:res.append(cipher.decrypt(msg[offset:offset + default_length], b'xyz'))else:res.append(cipher.decrypt(msg[offset:], b'xyz'))offset += default_lengthreturn json.loads(b''.join(res).decode('utf8'))
七、分段式解密
def resource_encrypt(key, content):"""rsa 加密[公钥--分段式]: param key:: param content:: return:"""content = content.encode('utf-8')length = len(content)default_length = 117pub_key = public_key(key)pub = RSA.import_key(pub_key)cipher = PKCS1_v1_5.new(pub)# 长度不用分段if length < default_length:return base64.b64encode(cipher.encrypt(content))# 需要分段offset = 0res = []while length - offset > 0:if length - offset > default_length:res.append(cipher.encrypt(content[offset:offset + default_length]))else:res.append(cipher.encrypt(content[offset:]))offset += default_lengthbyte_data = b''.join(res)data = base64.b64encode(byte_data)return str(data, encoding='utf-8')
八、POST请求
def post_url(url,authorization,data):"""post 请求: param url: 请求地址: param auth: 授权加密值: param data: 发送数据: return:"""# 设置授权头信息headers = {'User-Agent': 'Mozilla/5.0', # 设置User-Agent信息'Content-Type': 'application/json','Authorization': authorization # 设置授权信息}# 发送请求return requests.post(url, data=data, headers=headers)
九、测试
if __name__ == '__main__':# 公钥数据public_data = "您的公钥内容"# 未加密Authorization数据Authorization_data = "98798c31-15da-4abf-9b2f-887808fff4d1"# 针对Authorization数据加密Authorization_encrypt = token_encrypt(public_data, Authorization_data)# 组合要发送的数据data = {'phone': ['15000000000', '150000000001', '15000000002'], # 需要检测的手机号'callback_url': 'http://xxx.xxx.com/verify.phone/get_increase' # 回调地址}# 将要发送的数据进行分段式加密sendData = resource_encrypt("您的公钥内容",json.dumps(data))# 调用函数发送请求response = post_url("http://xxx.xxx.com/verify.phone/get_increase", Authorization_encrypt,json.dumps({"data":sendData}))# 处理请求响应res = response.json()# 解密返回数据resdata = resource_decrypt("您的私钥内容",res)if resdata['code'] == 20000:print("服务接口:", resdata['data'])else:print("本次提交的错误", resdata['message'])