您的位置:首页 > 新闻 > 会展 > 免费网站软件下载安装_平台商城网站开发_在线网页服务器_优化资源配置

免费网站软件下载安装_平台商城网站开发_在线网页服务器_优化资源配置

2025/3/10 9:47:38 来源:https://blog.csdn.net/yweng18/article/details/146137541  浏览:    关键词:免费网站软件下载安装_平台商城网站开发_在线网页服务器_优化资源配置
免费网站软件下载安装_平台商城网站开发_在线网页服务器_优化资源配置

No17: 数据库连接与 ORM(SQLAlchemy 实战)


摘要
本文深入探讨SQLAlchemy在复杂场景下的高级应用,涵盖四大核心主题:

  1. 会话生命周期管理:通过事件钩子实现事务监控与审计追踪
  2. 混合继承映射:结合单表/连接表继承优势实现多态模型
  3. 高性能操作:批量插入与核心SQL构造优化大数据处理
  4. 异步支持:基于asyncmy的MySQL异步引擎实践

实战案例包含电商系统的分库分表实现策略与基于ORM的审计日志系统设计。扩展部分解析多租户架构的三种隔离方案及SQL注入防御的ORM层最佳实践。配套代码演示了从分片路由到异步操作的完整电商系统实现,帮助开发者掌握构建高并发、高安全性的企业级数据库应用能力。

在这里插入图片描述

核心概念

1. 会话生命周期管理(Session事件钩子)

通过事件钩子实现精细化的会话控制,典型场景包括事务边界管理、变更追踪和审计日志。

from sqlalchemy import event
from sqlalchemy.orm import sessionmakerSession = sessionmaker()# 事务提交前事件
@event.listens_for(Session, 'before_commit')
def before_commit(session):print("即将提交事务,当前待处理变更:", session.dirty)# 事务回滚后事件
@event.listens_for(Session, 'after_rollback')
def after_rollback(session):print("事务已回滚,清理临时状态")

2. 混合继承映射策略

结合单表继承和连接表继承的优势,实现灵活的多态查询:

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationshipBase = declarative_base()class Product(Base):__tablename__ = 'product'id = Column(Integer, primary_key=True)type = Column(String(20))  # 单表继承标识__mapper_args__ = {'polymorphic_on': type}class Book(Product):__tablename__ = 'book'id = Column(Integer, ForeignKey('product.id'), primary_key=True)isbn = Column(String(13))__mapper_args__ = {'polymorphic_identity': 'book'}class Electronic(Product):__tablename__ = 'electronic'id = Column(Integer, ForeignKey('product.id'), primary_key=True)warranty = Column(Integer)__mapper_args__ = {'polymorphic_identity': 'electronic'}

3. 批量操作与核心SQL构造

高效处理大数据量操作的两种方式:

# ORM批量插入
session.bulk_insert_mappings(User, [{"name": "Alice", "age": 30},{"name": "Bob", "age": 25}
])# 核心SQL构造
from sqlalchemy import select, table, column
users = table('users', column('id'), column('name'), column('age')
)
stmt = users.insert().values([{'name': 'Charlie', 'age': 35},{'name': 'David', 'age': 40}
])
connection.execute(stmt)

4. 异步引擎与greenlet整合

使用asyncmy驱动实现MySQL异步操作:

from sqlalchemy.ext.asyncio import create_async_engine
import asyncioasync_engine = create_async_engine("mysql+asyncmy://user:pass@localhost/db",pool_size=5, max_overflow=10
)async def fetch_users():async with async_engine.connect() as conn:result = await conn.execute("SELECT * FROM users")return result.fetchall()asyncio.run(fetch_users())

实战案例

1. 电商系统的分表分库实现

使用ShardedSession实现用户表水平分片:

from sqlalchemy.ext.horizontal_shard import ShardedSessionshards = {'shard1': create_engine('sqlite:///./shard1.db'),'shard2': create_engine('sqlite:///./shard2.db')
}def shard_chooser(mapper, instance, clause=None):if instance:return "shard%d" % (instance.id % 2 + 1)else:return 'shard1'session = ShardedSession(shards=shards,shard_chooser=shard_chooser,id_chooser=lambda *args: list(shards.keys())
)# 自动路由到对应分片
user = User(id=101, name="Alice")
session.add(user)  # 自动路由到shard2

2. 版本控制与审计日志实现

通过事件监听实现变更追踪:

from sqlalchemy import inspectaudit_logs = []@event.listens_for(Session, 'before_flush')
def track_changes(session, context, instances):for obj in session.dirty:state = inspect(obj)for attr in state.attrs:hist = attr.load_history()if hist.has_changes():audit_logs.append({'object': obj,'attribute': attr.key,'old': hist.deleted,'new': hist.added})

扩展思考

1. 多租户架构的数据库隔离方案

三种常见实现方式:

# 方案1:schema隔离
class TenantSpecificQuery(Query):def get(self, ident):return super().filter_by(tenant_id=current_tenant.id).get(ident)# 方案2:行级隔离
class BaseModel(Base):__abstract__ = Truetenant_id = Column(Integer, nullable=False)# 方案3:动态schema切换
@event.listens_for(Pool, 'connect')
def connect(dbapi_con, record):dbapi_con.execute(f"SET search_path TO tenant_{current_tenant.id}")

2. SQL注入防御的ORM层实践

安全操作示例:

# 错误方式(存在注入风险)
query = User.query.filter(f"username = '{input_username}'")# 正确方式
query = User.query.filter(User.username == input_username)# 动态查询安全构造
from sqlalchemy import text
stmt = text("SELECT * FROM users WHERE username = :username")
session.execute(stmt, {"username": input_username})

完整代码示例

包含分表分库、审计日志、异步操作的完整电商系统示例:

# 安装依赖
# pip install sqlalchemy aiosqlitefrom sqlalchemy import Column, Integer, String, Float, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import asyncio# 定义模型
Base = declarative_base()class Product(Base):__tablename__ = 'products'id = Column(Integer, primary_key=True)name = Column(String(50))price = Column(Float)shard_id = Column(Integer, nullable=False)# 创建异步引擎
engines = {'shard1': create_async_engine('sqlite+aiosqlite:///./shard1.db', echo=True),'shard2': create_async_engine('sqlite+aiosqlite:///./shard2.db', echo=True)
}# 创建异步会话工厂
async_sessions = {shard_id: sessionmaker(bind=engine,class_=AsyncSession,expire_on_commit=False)for shard_id, engine in engines.items()
}# 根据 shard_id 选择分片
def get_shard_id(shard_id):return f'shard{shard_id % 2 + 1}'# 异步操作示例
async def main():# 创建表for engine in engines.values():async with engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)# 插入数据product = Product(name="Laptop", price=999.99, shard_id=101)shard_id = get_shard_id(product.shard_id)async with async_sessions[shard_id]() as session:session.add(product)await session.commit()print(f"产品已添加到 {shard_id}")# 插入数据product = Product(name="手机", price=10000, shard_id=102)shard_id = get_shard_id(product.shard_id)async with async_sessions[shard_id]() as session:session.add(product)await session.commit()print(f"产品已添加到 {shard_id}")# 查询所有分片的数据all_products = []for shard_id, session_factory in async_sessions.items():async with session_factory() as session:result = await session.execute(select(Product))products = result.scalars().all()all_products.extend(products)print(f"从 {shard_id} 查询到 {len(products)} 个产品")# 显示所有产品print("\n所有产品:")for product in all_products:print(f"产品: {product.name}, 价格: {product.price}, 分片ID: {product.shard_id}")# 运行异步主函数
if __name__ == "__main__":asyncio.run(main())

输出:

d:\python_projects\jupyter_demo\sqlalchemy_demo.py:11: MovedIn20Warning: The ``declarative_base()`` function is now available as sqlalchemy.orm.declarative_base(). (deprecated since: 2.0) (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)Base = declarative_base()
2025-03-09 19:50:49,625 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,626 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("products")    
2025-03-09 19:50:49,626 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,627 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("products")    
2025-03-09 19:50:49,627 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,628 INFO sqlalchemy.engine.Engine
CREATE TABLE products (id INTEGER NOT NULL,name VARCHAR(50),price FLOAT,shard_id INTEGER NOT NULL,PRIMARY KEY (id)
)
2025-03-09 19:50:49,629 INFO sqlalchemy.engine.Engine [no key 0.00072s] ()
2025-03-09 19:50:49,636 INFO sqlalchemy.engine.Engine COMMIT
2025-03-09 19:50:49,638 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,639 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("products")    
2025-03-09 19:50:49,639 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,640 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("products")
2025-03-09 19:50:49,640 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,641 INFO sqlalchemy.engine.Engine
CREATE TABLE products (id INTEGER NOT NULL,name VARCHAR(50),price FLOAT,shard_id INTEGER NOT NULL,PRIMARY KEY (id)
)
2025-03-09 19:50:49,642 INFO sqlalchemy.engine.Engine [no key 0.00067s] ()
2025-03-09 19:50:49,647 INFO sqlalchemy.engine.Engine COMMIT
2025-03-09 19:50:49,649 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,650 INFO sqlalchemy.engine.Engine INSERT INTO products (name, price, shard_id) VALUES (?, ?, ?)
2025-03-09 19:50:49,650 INFO sqlalchemy.engine.Engine [generated in 0.00035s] ('Laptop', 999.99, 101)
2025-03-09 19:50:49,652 INFO sqlalchemy.engine.Engine COMMIT
产品已添加到 shard2
2025-03-09 19:50:49,657 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,658 INFO sqlalchemy.engine.Engine INSERT INTO products (name, price, shard_id) VALUES (?, ?, ?)
2025-03-09 19:50:49,659 INFO sqlalchemy.engine.Engine [generated in 0.00077s] ('手机', 10000.0, 102)
2025-03-09 19:50:49,661 INFO sqlalchemy.engine.Engine COMMIT
产品已添加到 shard1
2025-03-09 19:50:49,665 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,666 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
2025-03-09 19:50:49,665 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,666 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
2025-03-09 19:50:49,666 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
ducts.price, products.shard_id
FROM products
2025-03-09 19:50:49,667 INFO sqlalchemy.engine.Engine [generated in 0.00054s] ()
从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
FROM products
2025-03-09 19:50:49,667 INFO sqlalchemy.engine.Engine [generated in 0.00054s] ()
从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,667 INFO sqlalchemy.engine.Engine [generated in 0.00054s] ()
从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, pro从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
2025-03-09 19:50:49,670 INFO sqlalchemy.engine.Engine [generated in 0.00029s] ()
从 shard2 查询到 1 个产品
ducts.price, products.shard_id
FROM products
2025-03-09 19:50:49,670 INFO sqlalchemy.engine.Engine [generated in 0.00029s] ()
从 shard2 查询到 1 个产品
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
FROM products
2025-03-09 19:50:49,670 INFO sqlalchemy.engine.Engine [generated in 0.00029s] ()
从 shard2 查询到 1 个产品
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
从 shard2 查询到 1 个产品
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
所有产品:
产品: 手机, 价格: 10000.0, 分片ID: 102
所有产品:
产品: 手机, 价格: 10000.0, 分片ID: 102
产品: Laptop, 价格: 999.99, 分片ID: 101
产品: 手机, 价格: 10000.0, 分片ID: 102
产品: Laptop, 价格: 999.99, 分片ID: 101
产品: Laptop, 价格: 999.99, 分片ID: 101

通过本章学习,您应该能够:

  1. 熟练使用SQLAlchemy事件系统管理会话生命周期
  2. 实现复杂的继承映射策略
  3. 处理大规模数据操作的性能优化
  4. 构建高并发的异步数据库应用
  5. 设计企业级的数据库架构方案

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com