Python MySQL 进阶用法详解
1. 使用连接池
使用 DBUtils 实现连接池管理:
from dbutils.pooled_db import PooledDB
import pymysqlclass DBConnectionPool:_pool = None@staticmethoddef get_pool():if DBConnectionPool._pool is None:DBConnectionPool._pool = PooledDB(creator=pymysql, # 使用pymysql作为连接器maxconnections=10, # 连接池最大连接数mincached=2, # 初始化时创建的空闲连接数maxcached=5, # 连接池最大空闲连接数maxshared=3, # 共享连接数blocking=True, # 连接数达到最大时是否阻塞maxusage=None, # 一个连接最多被使用的次数setsession=[], # 开始会话前执行的命令ping=0, # ping MySQL服务端确保连接有效host='localhost',port=3306,user='root',password='123456',database='test',charset='utf8mb4',cursorclass=pymysql.cursors.DictCursor)return DBConnectionPool._pool@staticmethoddef get_conn():return DBConnectionPool.get_pool().connection()# 使用示例
class UserDAO:def get_user(self, user_id):with DBConnectionPool.get_conn() as conn:with conn.cursor() as cursor:sql = "SELECT * FROM users WHERE id = %s"cursor.execute(sql, (user_id,))return cursor.fetchone()
2. 实现简单的 ORM 映射
from typing import Dict, Any, Type, TypeVar
from datetime import datetimeT = TypeVar('T', bound='BaseModel')class Field:def __init__(self, field_type: str, primary_key: bool = False):self.field_type = field_typeself.primary_key = primary_keyclass BaseModel:_table_name: str = ''_fields: Dict[str, Field] = {}def __init__(self, **kwargs):for key, value in kwargs.items():setattr(self, key, value)@classmethoddef from_db_dict(cls: Type[T], db_dict: Dict[str, Any]) -> T:"""从数据库字典创建对象"""return cls(**db_dict)def to_db_dict(self) -> Dict[str, Any]:"""转换为数据库字典"""result = {}for key in self._fields:if hasattr(self, key):result[key] = getattr(self, key)return resultclass User(BaseModel):_table_name = 'users'_fields = {'id': Field('BIGINT', primary_key=True),'username': Field('VARCHAR(50)'),'password': Field('VARCHAR(100)'),'age': Field('INT'),'create_at': Field('TIMESTAMP')}def __init__(self, username: str, password: str, age: int, id: int = None, create_at: datetime = None):self.id = idself.username = usernameself.password = passwordself.age = ageself.create_at = create_atclass UserRepository:def __init__(self, db_pool):self.db_pool = db_pooldef save(self, user: User) -> User:with self.db_pool.get_conn() as conn:with conn.cursor() as cursor:if user.id is None:# Insertsql = """INSERT INTO users (username, password, age) VALUES (%s, %s, %s)"""cursor.execute(sql, (user.username, user.password, user.age))user.id = cursor.lastrowidelse:# Updatesql = """UPDATE users SET username=%s, password=%s, age=%s WHERE id=%s"""cursor.execute(sql, (user.username, user.password, user.age, user.id))conn.commit()return user# 使用示例
user = User(username="张三", password="123456", age=25)
repo = UserRepository(DBConnectionPool())
saved_user = repo.save(user)
3. 读写分离实现
from enum import Enum
from typing import List
import randomclass DBType(Enum):MASTER = "master"SLAVE = "slave"class DBConfig:def __init__(self, host: str, port: int, db_type: DBType):self.host = hostself.port = portself.db_type = db_typeclass DBRouter:def __init__(self):self.master_config = DBConfig("master.mysql", 3306, DBType.MASTER)self.slave_configs: List[DBConfig] = [DBConfig("slave1.mysql", 3306, DBType.SLAVE),DBConfig("slave2.mysql", 3306, DBType.SLAVE),]# 创建连接池self.master_pool = self._create_pool(self.master_config)self.slave_pools = [self._create_pool(cfg) for cfg in self.slave_configs]def _create_pool(self, config: DBConfig):return PooledDB(creator=pymysql,maxconnections=10,host=config.host,port=config.port,user='root',password='123456',database='test',charset='utf8mb4')def get_connection(self, for_write: bool = False):if for_write:return self.master_pool.connection()# 随机选择一个从库slave_pool = random.choice(self.slave_pools)return slave_pool.connection()class UserService:def __init__(self, db_router: DBRouter):self.db_router = db_routerdef get_user(self, user_id: int):# 读操作从从库获取with self.db_router.get_connection(for_write=False) as conn:with conn.cursor() as cursor:sql = "SELECT * FROM users WHERE id = %s"cursor.execute(sql, (user_id,))return cursor.fetchone()def create_user(self, user: User):# 写操作使用主库with self.db_router.get_connection(for_write=True) as conn:with conn.cursor() as cursor:sql = """INSERT INTO users (username, password, age) VALUES (%s, %s, %s)"""cursor.execute(sql, (user.username, user.password, user.age))conn.commit()return cursor.lastrowid
4. 分库分表实现
from hashlib import md5
from typing import Tupleclass ShardingConfig:DB_COUNT = 2 # 数据库数量TABLE_COUNT = 4 # 每个库中的表数量class ShardingRouter:@staticmethoddef get_db_table(user_id: int) -> Tuple[int, int]:"""获取分库分表位置"""# 使用用户ID做hashhash_val = int(md5(str(user_id).encode()).hexdigest(), 16)db_index = hash_val % ShardingConfig.DB_COUNTtable_index = (hash_val // ShardingConfig.DB_COUNT) % ShardingConfig.TABLE_COUNTreturn db_index, table_indexdef get_connection(self, db_index: int):"""获取指定分库的连接"""# 这里简化处理,实际应该维护多个连接池config = {'host': f'mysql{db_index}.example.com','port': 3306,'user': 'root','password': '123456','database': f'test_{db_index}'}return pymysql.connect(**config)class ShardingUserRepository:def __init__(self):self.router = ShardingRouter()def get_user(self, user_id: int) -> Optional[Dict]:db_index, table_index = self.router.get_db_table(user_id)with self.router.get_connection(db_index) as conn:with conn.cursor() as cursor:sql = f"SELECT * FROM users_{table_index} WHERE id = %s"cursor.execute(sql, (user_id,))return cursor.fetchone()def create_user(self, user: User) -> int:# 这里使用用户名作为分片键hash_val = int(md5(user.username.encode()).hexdigest(), 16)db_index = hash_val % ShardingConfig.DB_COUNTtable_index = (hash_val // ShardingConfig.DB_COUNT) % ShardingConfig.TABLE_COUNTwith self.router.get_connection(db_index) as conn:with conn.cursor() as cursor:sql = f"""INSERT INTO users_{table_index} (username, password, age) VALUES (%s, %s, %s)"""cursor.execute(sql, (user.username, user.password, user.age))conn.commit()return cursor.lastrowid
5. 主从复制配置
5.1 主库配置 (my.cnf)
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
sync_binlog = 1
5.2 从库配置 (my.cnf)
[mysqld]
server-id = 2
relay-log = slave-relay-bin
read_only = 1
5.3 主从复制设置
在主库执行:
-- 创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;-- 获取主库状态
SHOW MASTER STATUS;
在从库执行:
CHANGE MASTER TOMASTER_HOST='master_host',MASTER_USER='repl',MASTER_PASSWORD='password',MASTER_LOG_FILE='mysql-bin.000001',MASTER_LOG_POS=123;-- 启动从库复制
START SLAVE;-- 查看从库状态
SHOW SLAVE STATUS\G
5.4 Python 监控主从状态
class ReplicationMonitor:def __init__(self, master_pool, slave_pool):self.master_pool = master_poolself.slave_pool = slave_pooldef check_replication_status(self) -> Dict:master_status = self._get_master_status()slave_status = self._get_slave_status()return {'master': master_status,'slave': slave_status,'delay': self._calculate_delay(master_status, slave_status)}def _get_master_status(self) -> Dict:with self.master_pool.get_conn() as conn:with conn.cursor() as cursor:cursor.execute("SHOW MASTER STATUS")return cursor.fetchone()def _get_slave_status(self) -> Dict:with self.slave_pool.get_conn() as conn:with conn.cursor() as cursor:cursor.execute("SHOW SLAVE STATUS")return cursor.fetchone()def _calculate_delay(self, master_status: Dict, slave_status: Dict) -> int:# 计算主从延迟if not master_status or not slave_status:return -1return slave_status.get('Seconds_Behind_Master', -1)# 使用示例
monitor = ReplicationMonitor(master_pool, slave_pool)
status = monitor.check_replication_status()
print(f"主从延迟: {status['delay']} 秒")