在Python中对MySQL的操作通常通过mysql-connector-python
或PyMySQL
库进行,以下是一个简单的对MySQL操作的封装示例,使用mysql-connector-python
。你可以根据实际需求选择适合的库。
1. 安装 mysql-connector-python
库
pip install mysql-connector-python
2. MySQL操作工具类封装
import mysql.connector
from mysql.connector import Error
from mysql.connector import poolingclass MySQLDatabase:def __init__(self, host, user, password, database, pool_size=5):"""初始化 MySQL 数据库操作类:param host: 数据库地址:param user: 用户名:param password: 密码:param database: 数据库名:param pool_size: 连接池大小"""self.host = hostself.user = userself.password = passwordself.database = databaseself.pool_size = pool_size# 初始化连接池self.pool = pooling.MySQLConnectionPool(pool_name="mysql_pool",pool_size=self.pool_size,host=self.host,user=self.user,password=self.password,database=self.database)def _get_connection(self):"""获取连接池中的连接"""return self.pool.get_connection()def _execute_query(self, query, params=None, fetchone=False):"""执行查询操作(查询、插入、更新等)"""connection = Nonecursor = Nonetry:connection = self._get_connection()cursor = connection.cursor(dictionary=True)cursor.execute(query, params)connection.commit() # 提交事务if fetchone:return cursor.fetchone() # 获取单条记录return cursor.fetchall() # 获取所有记录except Error as e:print(f"Error: {e}")return Nonefinally:if cursor:cursor.close()if connection:connection.close()def query(self, query, params=None):"""执行查询操作,返回查询结果"""return self._execute_query(query, params)def insert(self, query, params):"""执行插入操作"""return self._execute_query(query, params)def update(self, query, params):"""执行更新操作"""return self._execute_query(query, params)def delete(self, query, params):"""执行删除操作"""return self._execute_query(query, params)def execute(self, query, params=None):"""执行任意SQL语句"""return self._execute_query(query, params)
3. 使用示例
if __name__ == "__main__":# 创建MySQL数据库操作类的实例db = MySQLDatabase(host="localhost", user="root", password="password", database="testdb")# 执行查询操作query = "SELECT * FROM users WHERE age = %s"params = (25,)results = db.query(query, params)print("Query Result:", results)# 执行插入操作insert_query = "INSERT INTO users (name, age) VALUES (%s, %s)"insert_params = ("Alice", 30)db.insert(insert_query, insert_params)print("Inserted successfully!")# 执行更新操作update_query = "UPDATE users SET age = %s WHERE name = %s"update_params = (31, "Alice")db.update(update_query, update_params)print("Updated successfully!")# 执行删除操作delete_query = "DELETE FROM users WHERE name = %s"delete_params = ("Alice",)db.delete(delete_query, delete_params)print("Deleted successfully!")
4. 代码说明
-
MySQLDatabase 类:
- 该类封装了数据库连接池和常用的操作(查询、插入、更新、删除)方法。
- 通过
mysql.connector.pooling.MySQLConnectionPool
创建连接池,使得多个数据库连接能有效复用,提高性能。
-
_get_connection 方法:
- 获取连接池中的一个连接对象,每次进行数据库操作时都通过连接池获取一个连接,避免了频繁地打开和关闭连接,减少了连接的开销。
-
_execute_query 方法:
- 用于执行SQL查询、插入、更新、删除等操作。支持事务提交。
- 可以通过
fetchone
参数控制是否只获取单条记录。
-
query, insert, update, delete 方法:
- 这些方法分别对应不同类型的SQL操作:
query
:执行SELECT查询。insert
:执行INSERT语句。update
:执行UPDATE语句。delete
:执行DELETE语句。
- 这些方法分别对应不同类型的SQL操作:
-
execute 方法:
- 这是一个通用的执行方法,支持任何类型的SQL操作。
5. 连接池
连接池是为了提高数据库连接的重用率,避免了每次操作都创建和销毁数据库连接的性能问题。你可以在初始化MySQLDatabase
类时指定pool_size
来控制连接池的大小。
6. 异常处理
- 使用
try-except
块捕获数据库操作中的错误(如连接问题、SQL语法错误等),并打印出相应的错误信息。 - 数据库连接和游标都通过
finally
语句确保被关闭,避免连接泄漏。
7. 扩展功能
你可以根据需要扩展更多功能,例如:
- 事务支持:在多个操作中保持一致性,添加事务的开始、提交、回滚操作。
- 数据库连接池参数:根据负载情况可以动态调整连接池大小。
- 日志记录:为每个操作添加日志记录,方便追踪SQL执行情况。
例如,如果需要支持事务,可以在insert
、update
、delete
方法中添加事务的管理:
def begin_transaction(self):connection = self._get_connection()connection.start_transaction()return connectiondef commit_transaction(self, connection):connection.commit()def rollback_transaction(self, connection):connection.rollback()
通过这种封装方式,数据库操作变得更加简单、规范,并且能有效提高性能。