您的位置:首页 > 科技 > 能源 > 网络运营的岗位职责及任职要求_网络规划设计师教程第2版pdf百度网盘_开网站需要投资多少钱_全网搜索关键词查询

网络运营的岗位职责及任职要求_网络规划设计师教程第2版pdf百度网盘_开网站需要投资多少钱_全网搜索关键词查询

2024/12/24 2:41:48 来源:https://blog.csdn.net/hzether/article/details/144592217  浏览:    关键词:网络运营的岗位职责及任职要求_网络规划设计师教程第2版pdf百度网盘_开网站需要投资多少钱_全网搜索关键词查询
网络运营的岗位职责及任职要求_网络规划设计师教程第2版pdf百度网盘_开网站需要投资多少钱_全网搜索关键词查询

Python MySQL 进阶用法详解

1. 使用连接池

使用 DBUtils 实现连接池管理:

from dbutils.pooled_db import PooledDB
import pymysqlclass DBConnectionPool:_pool = None@staticmethoddef get_pool():if DBConnectionPool._pool is None:DBConnectionPool._pool = PooledDB(creator=pymysql,        # 使用pymysql作为连接器maxconnections=10,      # 连接池最大连接数mincached=2,           # 初始化时创建的空闲连接数maxcached=5,           # 连接池最大空闲连接数maxshared=3,           # 共享连接数blocking=True,         # 连接数达到最大时是否阻塞maxusage=None,         # 一个连接最多被使用的次数setsession=[],         # 开始会话前执行的命令ping=0,                # ping MySQL服务端确保连接有效host='localhost',port=3306,user='root',password='123456',database='test',charset='utf8mb4',cursorclass=pymysql.cursors.DictCursor)return DBConnectionPool._pool@staticmethoddef get_conn():return DBConnectionPool.get_pool().connection()# 使用示例
class UserDAO:def get_user(self, user_id):with DBConnectionPool.get_conn() as conn:with conn.cursor() as cursor:sql = "SELECT * FROM users WHERE id = %s"cursor.execute(sql, (user_id,))return cursor.fetchone()

2. 实现简单的 ORM 映射

from typing import Dict, Any, Type, TypeVar
from datetime import datetimeT = TypeVar('T', bound='BaseModel')class Field:def __init__(self, field_type: str, primary_key: bool = False):self.field_type = field_typeself.primary_key = primary_keyclass BaseModel:_table_name: str = ''_fields: Dict[str, Field] = {}def __init__(self, **kwargs):for key, value in kwargs.items():setattr(self, key, value)@classmethoddef from_db_dict(cls: Type[T], db_dict: Dict[str, Any]) -> T:"""从数据库字典创建对象"""return cls(**db_dict)def to_db_dict(self) -> Dict[str, Any]:"""转换为数据库字典"""result = {}for key in self._fields:if hasattr(self, key):result[key] = getattr(self, key)return resultclass User(BaseModel):_table_name = 'users'_fields = {'id': Field('BIGINT', primary_key=True),'username': Field('VARCHAR(50)'),'password': Field('VARCHAR(100)'),'age': Field('INT'),'create_at': Field('TIMESTAMP')}def __init__(self, username: str, password: str, age: int, id: int = None, create_at: datetime = None):self.id = idself.username = usernameself.password = passwordself.age = ageself.create_at = create_atclass UserRepository:def __init__(self, db_pool):self.db_pool = db_pooldef save(self, user: User) -> User:with self.db_pool.get_conn() as conn:with conn.cursor() as cursor:if user.id is None:# Insertsql = """INSERT INTO users (username, password, age) VALUES (%s, %s, %s)"""cursor.execute(sql, (user.username, user.password, user.age))user.id = cursor.lastrowidelse:# Updatesql = """UPDATE users SET username=%s, password=%s, age=%s WHERE id=%s"""cursor.execute(sql, (user.username, user.password, user.age, user.id))conn.commit()return user# 使用示例
user = User(username="张三", password="123456", age=25)
repo = UserRepository(DBConnectionPool())
saved_user = repo.save(user)

3. 读写分离实现

from enum import Enum
from typing import List
import randomclass DBType(Enum):MASTER = "master"SLAVE = "slave"class DBConfig:def __init__(self, host: str, port: int, db_type: DBType):self.host = hostself.port = portself.db_type = db_typeclass DBRouter:def __init__(self):self.master_config = DBConfig("master.mysql", 3306, DBType.MASTER)self.slave_configs: List[DBConfig] = [DBConfig("slave1.mysql", 3306, DBType.SLAVE),DBConfig("slave2.mysql", 3306, DBType.SLAVE),]# 创建连接池self.master_pool = self._create_pool(self.master_config)self.slave_pools = [self._create_pool(cfg) for cfg in self.slave_configs]def _create_pool(self, config: DBConfig):return PooledDB(creator=pymysql,maxconnections=10,host=config.host,port=config.port,user='root',password='123456',database='test',charset='utf8mb4')def get_connection(self, for_write: bool = False):if for_write:return self.master_pool.connection()# 随机选择一个从库slave_pool = random.choice(self.slave_pools)return slave_pool.connection()class UserService:def __init__(self, db_router: DBRouter):self.db_router = db_routerdef get_user(self, user_id: int):# 读操作从从库获取with self.db_router.get_connection(for_write=False) as conn:with conn.cursor() as cursor:sql = "SELECT * FROM users WHERE id = %s"cursor.execute(sql, (user_id,))return cursor.fetchone()def create_user(self, user: User):# 写操作使用主库with self.db_router.get_connection(for_write=True) as conn:with conn.cursor() as cursor:sql = """INSERT INTO users (username, password, age) VALUES (%s, %s, %s)"""cursor.execute(sql, (user.username, user.password, user.age))conn.commit()return cursor.lastrowid

4. 分库分表实现

from hashlib import md5
from typing import Tupleclass ShardingConfig:DB_COUNT = 2      # 数据库数量TABLE_COUNT = 4   # 每个库中的表数量class ShardingRouter:@staticmethoddef get_db_table(user_id: int) -> Tuple[int, int]:"""获取分库分表位置"""# 使用用户ID做hashhash_val = int(md5(str(user_id).encode()).hexdigest(), 16)db_index = hash_val % ShardingConfig.DB_COUNTtable_index = (hash_val // ShardingConfig.DB_COUNT) % ShardingConfig.TABLE_COUNTreturn db_index, table_indexdef get_connection(self, db_index: int):"""获取指定分库的连接"""# 这里简化处理,实际应该维护多个连接池config = {'host': f'mysql{db_index}.example.com','port': 3306,'user': 'root','password': '123456','database': f'test_{db_index}'}return pymysql.connect(**config)class ShardingUserRepository:def __init__(self):self.router = ShardingRouter()def get_user(self, user_id: int) -> Optional[Dict]:db_index, table_index = self.router.get_db_table(user_id)with self.router.get_connection(db_index) as conn:with conn.cursor() as cursor:sql = f"SELECT * FROM users_{table_index} WHERE id = %s"cursor.execute(sql, (user_id,))return cursor.fetchone()def create_user(self, user: User) -> int:# 这里使用用户名作为分片键hash_val = int(md5(user.username.encode()).hexdigest(), 16)db_index = hash_val % ShardingConfig.DB_COUNTtable_index = (hash_val // ShardingConfig.DB_COUNT) % ShardingConfig.TABLE_COUNTwith self.router.get_connection(db_index) as conn:with conn.cursor() as cursor:sql = f"""INSERT INTO users_{table_index} (username, password, age) VALUES (%s, %s, %s)"""cursor.execute(sql, (user.username, user.password, user.age))conn.commit()return cursor.lastrowid

5. 主从复制配置

5.1 主库配置 (my.cnf)

[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
sync_binlog = 1

5.2 从库配置 (my.cnf)

[mysqld]
server-id = 2
relay-log = slave-relay-bin
read_only = 1

5.3 主从复制设置

在主库执行:

-- 创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;-- 获取主库状态
SHOW MASTER STATUS;

在从库执行:

CHANGE MASTER TOMASTER_HOST='master_host',MASTER_USER='repl',MASTER_PASSWORD='password',MASTER_LOG_FILE='mysql-bin.000001',MASTER_LOG_POS=123;-- 启动从库复制
START SLAVE;-- 查看从库状态
SHOW SLAVE STATUS\G

5.4 Python 监控主从状态

class ReplicationMonitor:def __init__(self, master_pool, slave_pool):self.master_pool = master_poolself.slave_pool = slave_pooldef check_replication_status(self) -> Dict:master_status = self._get_master_status()slave_status = self._get_slave_status()return {'master': master_status,'slave': slave_status,'delay': self._calculate_delay(master_status, slave_status)}def _get_master_status(self) -> Dict:with self.master_pool.get_conn() as conn:with conn.cursor() as cursor:cursor.execute("SHOW MASTER STATUS")return cursor.fetchone()def _get_slave_status(self) -> Dict:with self.slave_pool.get_conn() as conn:with conn.cursor() as cursor:cursor.execute("SHOW SLAVE STATUS")return cursor.fetchone()def _calculate_delay(self, master_status: Dict, slave_status: Dict) -> int:# 计算主从延迟if not master_status or not slave_status:return -1return slave_status.get('Seconds_Behind_Master', -1)# 使用示例
monitor = ReplicationMonitor(master_pool, slave_pool)
status = monitor.check_replication_status()
print(f"主从延迟: {status['delay']} 秒")

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com