您的位置:首页 > 房产 > 建筑 > 哈尔滨安康养老院收费标准_深圳外贸网站怎么建_网站关键词排名优化系统_谷歌推广开户多少费用

哈尔滨安康养老院收费标准_深圳外贸网站怎么建_网站关键词排名优化系统_谷歌推广开户多少费用

2024/12/27 19:10:25 来源:https://blog.csdn.net/qq_43347021/article/details/144053361  浏览:    关键词:哈尔滨安康养老院收费标准_深圳外贸网站怎么建_网站关键词排名优化系统_谷歌推广开户多少费用
哈尔滨安康养老院收费标准_深圳外贸网站怎么建_网站关键词排名优化系统_谷歌推广开户多少费用

FastAPI 是一个现代、快速(高性能)的 Web 框架,能够与异步数据库库(如 SQLAlchemy)完美结合。本篇文章将通过一个 MySQL 数据库的示例,介绍如何在 FastAPI 中配置数据库连接、定义数据库模型、管理数据库会话,并演示如何进行数据库的增删查改和事务操作。

环境准备

首先,我们需要安装 FastAPI、SQLAlchemy 和异步数据库支持库。假设我们使用 MySQL 作为数据库,所需的依赖库如下:

pip install fastapi
pip install uvicorn
pip install sqlalchemy
pip install mysql-connector-python
pip install asyncmy  # MySQL 的异步驱动

1. 数据库配置

数据库配置部分与前面相同,保持不变:

DATABASE_URL = "mysql+asyncmy://user:password@localhost/db_name"

2. 引擎和会话的配置

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker# 创建异步引擎
engine = create_async_engine(DATABASE_URL, echo=True)# 创建异步 Session 类
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False
)

3. 定义数据库模型

继续使用 SQLAlchemy ORM 定义 Item 模型:

from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base# 基类
Base = declarative_base()# 定义 Item 模型
class Item(Base):__tablename__ = "items"id = Column(Integer, primary_key=True, index=True)name = Column(String, index=True)description = Column(String)

4. 获取数据库会话

from fastapi import Depends# 获取数据库会话
async def get_db():async with AsyncSessionLocal() as session:yield session

5. 数据库初始化

在应用启动时初始化数据库,确保表格的创建:

from fastapi import FastAPIapp = FastAPI()# 初始化数据库
@app.on_event("startup")
async def on_startup():async with engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)

6. 数据库的增删查改(CRUD)

创建数据

将原始的 SQL 查询方式改为 SQLAlchemy ORM 的增操作:

from fastapi import HTTPException@app.post("/items/")
async def create_item(item: Item, db: AsyncSession = Depends(get_db)):db.add(item)  # 添加物品await db.commit()  # 提交事务await db.refresh(item)  # 刷新数据return item

查询数据

改用 SQLAlchemy ORM 的查询方法(filter)来替代 SQL 查询语句:

@app.get("/items/{item_id}")
async def get_item(item_id: int, db: AsyncSession = Depends(get_db)):# 使用 SQLAlchemy ORM 的 filter 查询item = await db.execute(select(Item).filter(Item.id == item_id))item = item.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")return item
  • select(Item):查询 Item 表。
  • filter(Item.id == item_id):根据 item_id 查找特定物品。
  • scalars().first():获取查询结果中的第一个对象。

更新数据

使用 SQLAlchemy ORM 完成数据的更新:

from sqlalchemy.future import select@app.put("/items/{item_id}")
async def update_item(item_id: int, updated_item: Item, db: AsyncSession = Depends(get_db)):# 使用 SQLAlchemy ORM 的 filter 查询result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")# 更新字段item.name = updated_item.nameitem.description = updated_item.descriptionawait db.commit()  # 提交更新await db.refresh(item)  # 刷新数据return item
  • 通过 select(Item).filter(Item.id == item_id) 查询物品。
  • 修改查询结果的字段后,通过 db.commit() 提交更新。

删除数据

将 SQL 查询语句改为使用 SQLAlchemy ORM 的 delete 方法:

@app.delete("/items/{item_id}")
async def delete_item(item_id: int, db: AsyncSession = Depends(get_db)):result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")# 删除物品await db.delete(item)await db.commit()  # 提交删除return {"message": "Item deleted successfully"}

7. 事务的使用

使用 SQLAlchemy 的事务管理功能来确保多个数据库操作的一致性:

@app.post("/create_multiple_items/")
async def create_multiple_items(items: list[Item], db: AsyncSession = Depends(get_db)):async with db.begin():  # 使用事务for item in items:db.add(item)await db.commit()return {"message": "Items created successfully"}
  • async with db.begin() 启动一个事务,所有的数据库操作都将在该事务内进行,要么全部成功,要么全部失败。

完整代码示例

from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy import Column, Integer, String, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_baseDATABASE_URL = "mysql+asyncmy://user:password@localhost/db_name"# 创建异步引擎
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)Base = declarative_base()# 定义 Item 模型
class Item(Base):__tablename__ = "items"id = Column(Integer, primary_key=True, index=True)name = Column(String, index=True)description = Column(String)# FastAPI 实例
app = FastAPI()# 获取数据库会话
async def get_db():async with AsyncSessionLocal() as session:yield session# 初始化数据库
@app.on_event("startup")
async def on_startup():async with engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)# 增加数据
@app.post("/items/")
async def create_item(item: Item, db: AsyncSession = Depends(get_db)):db.add(item)await db.commit()await db.refresh(item)return item# 查询数据
@app.get("/items/{item_id}")
async def get_item(item_id: int, db: AsyncSession = Depends(get_db)):result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")return item# 更新数据
@app.put("/items/{item_id}")
async def update_item(item_id: int, updated_item: Item, db: AsyncSession = Depends(get_db)):result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")item.name = updated_item.nameitem.description = updated_item.descriptionawait db.commit()await db.refresh(item)return item# 删除数据
@app.delete("/items/{item_id}")
async def delete_item(item_id: int, db: AsyncSession = Depends(get_db)):result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")await db.delete(item)await db.commit()return {"message": "Item deleted successfully"}# 批量创建数据
@app.post("/create_multiple_items/")
async def create_multiple_items(items: list[Item], db: AsyncSession = Depends(get_db)):async with db.begin():  # 使用事务for item in items:db.add(item)await db.commit()return {"message": "Items created successfully"}

总结

本文详细讲解了如何在 FastAPI 中使用 SQLAlchemy ORM 来完成 MySQL 数据库的增删查改操作,利用异步的数据库引擎和会话机制,让应用在高并发场景下保持高效。通过 SQLAlchemy ORM,我们不仅可以简化数据库操作,还能提高代码的可读性和维护性。在实际开发中,采用 ORM 方式进行数据访问,是开发者处理数据库交互的常见做法。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com