参考官方文档:https://passlib.readthedocs.io/en/stable/
基本用法
from passlib.hash import pbkdf2_sha256hash = pbkdf2_sha256.hash("toomanysecrets")
print(pbkdf2_sha256.verify("toomanysecrets", hash))
print(pbkdf2_sha256.verify("joshua", hash))
基本用法的封装
from passlib.hash import pbkdf2_sha256def hash_256(secret_str):return pbkdf2_sha256.hash(secret_str)def verify_256(secret_str, hash_str):return pbkdf2_sha256.verify(secret_str, hash_str)if __name__ == '__main__':secret_str = "toomanysecrets"hash_str = hash_256(secret_str)print(hash_str)print(verify_256(secret_str, hash_str))print(verify_256(secret_str * 2, hash_str))
实战:改造fastapi的登录注册接口案例
import time
import fastzdp_login
from fastapi import FastAPI, Depends, HTTPException, status, Body
from sqlalchemy.orm import Session as SASession
from sqlmodel import SQLModel, Field, Session, create_engine, select
from typing import Optionalclass User(SQLModel, table=True):id: Optional[int] = Field(default=None, primary_key=True)username: str = Field(index=True)password: str# 创建数据库引擎
sqlite_url = "mysql+pymysql://root:zhangdapeng520@127.0.0.1:3306/zdppy_demo?charset=utf8mb4"
engine = create_engine(sqlite_url, echo=True)# 确保表存在
SQLModel.metadata.drop_all(engine)
SQLModel.metadata.create_all(engine)app = FastAPI()# 伪造一个密钥,实际使用时应该使用安全的方式存储
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"# 令牌有效期
ACCESS_TOKEN_EXPIRE_MINUTES = 30# 假设你有一个获取用户的函数
def get_user(db: SASession, username: str):return db.exec(select(User).where(User.username == username)).first()# 依赖项,为每个请求提供数据库会话
def get_db():db = Session(engine)try:yield dbfinally:db.close()@app.post("/register", status_code=status.HTTP_201_CREATED)
def register_user(username: str = Body(str, min_length=2, max_length=36),password: str = Body(str, min_length=6, max_length=128),db: SASession = Depends(get_db),
):# 检查用户名是否已存在user = db.exec(select(User).where(User.username == username)).first()if user:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists")# 创建新用户new_user = User(username=username, password=fastzdp_login.hash_256(password))db.add(new_user)try:db.commit()db.refresh(new_user)except Exception as e:print(e)db.rollback()raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Operation failed")return {"message": "User registered successfully", "user_id": new_user.id}# 登录接口
@app.post("/login", status_code=status.HTTP_200_OK)
async def login_for_access_token(username: str = Body(str, min_length=2, max_length=36),password: str = Body(str, min_length=6, max_length=128),db: SASession = Depends(get_db),
):user = get_user(db, username)if not user:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")if not fastzdp_login.verify_256(password, user.password):raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")data = {"username": user.username, "id": user.id, "time": time.time(), "expired": ACCESS_TOKEN_EXPIRE_MINUTES * 60}access_token = fastzdp_login.get_jwt(data, SECRET_KEY, ALGORITHM)return {"access_token": access_token, "token_type": "bearer"}if __name__ == '__main__':import uvicornuvicorn.run(app, host='0.0.0.0', port=8000)