在 FastAPI 中使用 SQL 数据库可以使用多个 ORM 工具,例如 SQLAlchemy、Tortoise ORM 等,类似 Java 的 Mybatis 。这些 ORM 工具可以帮助我们方便地与关系型数据库进行交互,如 MySQL 、PostgreSQL等。本篇文章将介绍如何使用 SQLAlchemy 来完成数据库操作,以便让我们在 FastAPI 项目中方便地进行数据存储和查询。
目录
1 介绍
1.1 SQLAlchemy
1.2 文件结构
2 数据库连接
2.1 安装 mysqlclient
2.2 SQLAlchemy 使用
3 创建模型
3.1 数据库模型
3.2 Pydantic 模型
4 数据库操作 CRUD
4.1 查—读取数据
4.2 增—创建数据
4.3 改—修改数据
4.4 删—删除数据
5 接口创建及运行
5.1 接口创建
5.2 项目运行
📌 源码地址:
https://gitee.com/yinyuu/fast-api_study_yinyu
简单来说,SQLAlchemy 就是一个 ORM 工具,提供了灵活的数据模型定义和查询语法,支持多种数据库后端,比如:
在 FastAPI 中使用 SQLAlchemy,我们可以通过安装 SQLAlchemy 和相应的数据库驱动程序(如 mysqlclient,psycopg2 等)来连接到数据库,然后使用 SQLAlchemy 提供的模型类定义数据表和字段,以及使用查询语法进行数据操作。
本篇文章中,我将以 MySQL 为例,实现 SQLAlchemy 的数据库连接及操作。
ORM 具有在代码和数据库表中的对象之间转换的工具,简单来说就是将该数据表映射到项目代码中,然后你通常在 SQL 数据库中创建一个代表映射的类,该类的每个属性代表一个列,具有名称和类型。
项目中包含子目录 sql_app,本篇文章的文件结构如下:
. └── sql_app ├── __init__.py ├── crud.py ├── database.py ├── main.py ├── models.py └── schemas.py
文件 __init__.py 是一个空文件,不过它告诉 Python 其中 sql_app 的所有模块(Python 文件)都是一个包,可以拿来调用。
接下来,本文将以 database.py -> models.py -> schemas.py -> crud.py -> main.py 的顺序开始讲述~
涉及到文件 sql_app/database.py,数据库操作的第一步便是连接数据库。
因为需求连接到 mysql 数据库,因此需要预先安装 mysql 驱动,可直接使用如下命令,简单来说就是安装个 MySQL 第三方库:
pip install PyMySQL
具体代码如下 👇
#1.导入 SQLAlchemy 部件 from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker #2.为 SQLAlchemy 定义数据库 URL地址 SQLALCHEMY_DATABASE_URL = "mysql+pymysql://user:password@ip地址:端口/数据表名?charset=utf8mb4" #3.创建 SQLAlchemy 引擎 engine = create_engine( SQLALCHEMY_DATABASE_URL ) #4.创建一个SessionLocal 数据库会话 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) #5.创建一个Base类 Base = declarative_base()
📌 URL 地址
如果你想使用其他数据库,那么就需要对 SQLALCHEMY_DATABASE_URL 的值进行变更。
比如使用的是 PostgreSQL 数据库:"postgresql://user:password@postgresserver/db"。
📌 SessionLocal 类
它是一个本地线程存储(thread-local storage)的单例类,用来创建数据库会话。
简单来说,SessionLocal 类的主要作用是为每个请求创建一个数据库会话,并且确保这个会话在整个请求期间都是唯一的。这样,我们就可以在不同的函数中使用同一个会话,从而避免了在不同函数中反复创建会话的麻烦。
📌 declarative_base()
declarative_base() 是 SQLAlchemy 中提供的一个函数,用于创建一个基类,然后通过继承这个基类来定义数据表模型。它可以让我们更加方便地定义数据表模型,而不需要关注底层的SQL语句。具体作用:
接下来便是创建和数据表映射的数据库模型以及 Pydantic 模型,数据库模型用以对接数据表,Pydantic 模型则用来作为响应模型(response_model)及请求体。
以下是 DDL 语句,一共两个数据表,可直接拿来建表:
-- yinyu_db.items definition CREATE TABLE `items` ( `id` int(11) NOT NULL AUTO_INCREMENT, `title` varchar(64) DEFAULT NULL, `description` varchar(64) DEFAULT NULL, `owner_id` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- yinyu_db.users definition CREATE TABLE `users` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', `phone` varchar(64) DEFAULT NULL, `hashed_password` varchar(64) DEFAULT NULL, `is_active` tinyint(1) DEFAULT '1', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
涉及到文件 sql_app/models.py
具体代码如下 👇
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String from sqlalchemy.orm import relationship #1.用Base类来创建 SQLAlchemy 模型 from .database import Base class User(Base): __tablename__ = "users" #2.创建模型属性/列 id = Column(Integer, primary_key=True, index=True) phone = Column(String, unique=True, index=True) hashed_password = Column(String) is_active = Column(Boolean, default=True) #3.创建关系 items = relationship("Item", back_populates="owner") class Item(Base): __tablename__ = "items" id = Column(Integer, primary_key=True, index=True) title = Column(String, index=True) description = Column(String, index=True) owner_id = Column(Integer, ForeignKey("users.id")) owner = relationship("User", back_populates="items")
📌 用 Base 类来创建 SQLAlchemy 模型
Base 类也就是数据库连接时的 declarative_base(),从 database(来自上面的 database.py 文件)导入 Base,那么它将自动映射数据表和类属性(原因在前边)。
📌 __tablename__
该属性是给模型映射的数据表的名称,比如 User 类的__tablename__ 为 users,那么它映射的数据表名即为 users。
📌 模型属性/列
比如:
id = Column(Integer, primary_key=True, index=True) phone = Column(String, unique=True, index=True) hashed_password = Column(String) is_active = Column(Boolean, default=True)
Column 表示这些属性中的每一个都代表其相应数据库表中的一列,Column 中的第一个参数,如Integer、String 和 Boolean,它定义了数据库中的类型。
📌 relationship 关系
class User(Base): __tablename__ = "users" ... items = relationship("Item", back_populates="owner") class Item(Base): __tablename__ = "items" ... owner_id = Column(Integer, ForeignKey("users.id")) owner = relationship("User", back_populates="items")
这在 User 模型和 Item 模型之间定义了一个关系,使用 back_populates 参数建立了双向关系。具体来说,是在 User 模型中定义了一个名为 items 的属性,并在 Item 模型中定义了一个名为owner 的属性,这两个属性都与对方的模型相关联。
比如访问 User 中的属性 items 时,它将指向一个 Item 的 SQLAlchemy 模型列表(来自 items表),同时 Item 使用 ForeignKey("users.id") 来标记 owner_id 列为外键,并指定它与 User 模型中的 id 列相关联。
在使用 back_populates 参数时,需要注意以下几点:
涉及到文件 sql_app/schemas.py
具体代码如下 👇
from typing import List, Union from pydantic import BaseModel #1.创建一个 ItemBase 和 UserBase 的 Pydantic模型(或者我们说“schema”) class ItemBase(BaseModel): title: str description: Union[str, None] = None #2.ItemCreate 继承自 ItemBase,他们在创建或读取数据时具有共同的属性。 class ItemCreate(ItemBase): pass #3.Item 继承自 ItemCreate,增加 id 和 owner_id 字段 class Item(ItemCreate): id: int owner_id: int class Config: orm_mode = True #使其包含关系字段 class UserBase(BaseModel): phone: str #为了安全起见,password 不会出现在其他同类 Pydantic模型中,例如用户请求时不应该从 API 返回响应中包含它。 class UserCreate(UserBase): #字段名对不上会报错,所以单独搞个 password: str class User(UserBase): id: int is_active: bool items: List[Item] = [] class Config: orm_mode = True
注意,SQLAlchemy 模型和 Pydantic 声明属性的方式不一样,前者是 = ,而后者是 :。
📌 orm_mode
此类 Config 用于为 Pydantic 提供配置。
class Item(ItemCreate): id: int owner_id: int class Config: orm_mode = True #使其包含关系字段
Pydanticorm_mode 将告诉 Pydantic 模型读取数据,即它不是一个 dict,而是一个 ORM 模型。这样该 Pydantic 模型就会尝试从属性中获取它,如 id = data.id。
有了这个,Pydantic 模型与 ORM 兼容,您只需在路径操作 response_model 的参数中声明它,即可返回一个数据库模型,并从中读取数据。
SQLAlchemy 和许多其他默认情况下是“延迟加载”。这意味着,除非您尝试访问包含该数据的属性,否则它们不会从数据库中获取关系数据。
涉及到文件 sql_app/crud.py,在此文件中,我们将编写可重用的函数用来与数据库中的数据进行交互。
CRUD 分别为:增加、查询、更改和删除,即增删改查。
首先从 sqlalchemy.orm 中导入 Session,这将允许您声明 db 参数的类型,并在您的函数中进行更好的类型检查和完成。 然后导入之前的 models(SQLAlchemy 模型)和 schemas(Pydantic模型/模式)。
创建一些实用函数来完成:
from sqlalchemy.orm import Session from . import models, schemas #通过 ID 查询单个用户。 def get_user(db: Session, user_id: int): return db.query(models.User).filter(models.User.id == user_id).first() #通过电子邮件查询单个用户。 def get_user_by_email(db: Session, email: str): return db.query(models.User).filter(models.User.email == email).first() #查询多个用户 def get_users(db: Session, skip: int = 0, limit: int = 100): return db.query(models.User).offset(skip).limit(limit).all() #查询多个项目 def get_items(db: Session, skip: int = 0, limit: int = 100): return db.query(models.Item).offset(skip).limit(limit).all()
现在通过函数来创建数据,它的步骤是:
def create_user(db: Session, user: schemas.UserCreate): fake_hashed_password = user.password + "notreallyhashed" db_user = models.User(email=user.email, hashed_password=fake_hashed_password) db.add(db_user) db.commit() db.refresh(db_user) return db_user def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int): db_item = models.Item(**item.dict(), owner_id=user_id) db.add(db_item) db.commit() db.refresh(db_item) return db_item
此示例不安全,因为密码未经过哈希处理,可以查看 Security 登录认证 内容进行完善。
现在通过函数来修改数据,比如修改描述,它的步骤是:
def update_item_desc_by_id(db: Session, id: int, desc: str): db_item = db.query(models.Item).filter_by(id=id).first() db_item.description = desc db.commit() db.refresh(db_item) return db_item
现在通过函数来删除数据:
# 批量删除1 def delete_item_by_ownerId1(db: Session, owner_id: int): db.query(models.Item).filter_by(owner_id=owner_id).delete(synchronize_session=False) db.commit() return True # 批量删除2 def delete_item_by_ownerId2(db: Session, owner_id: int): db_items = db.query(models.Item).filter_by(owner_id=owner_id).all() [db.delete(item) for item in db_items] db.commit() return True
最后一步便是创建接口了,涉及到文件 sql_app/main.py,让我们集成和使用我们之前创建的所有其他部分:
from typing import List from fastapi import Depends, FastAPI, HTTPException from sqlalchemy.orm import Session from sql_app import crud, models, schemas from sql_app.database import SessionLocal, engine #预先创建数据表 models.Base.metadata.create_all(bind=engine) app = FastAPI() # Dependency def get_db(): db = SessionLocal() try: yield db finally: db.close() @app.post("/users/", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): db_user = crud.get_user_by_email(db, phone=user.phone) if db_user: raise HTTPException(status_code=400, detail="Email already registered") return crud.create_user(db=db, user=user) @app.get("/users/", response_model=List[schemas.User]) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): users = crud.get_users(db, skip=skip, limit=limit) return users @app.get("/users/{user_id}", response_model=schemas.User) def read_user(user_id: int, db: Session = Depends(get_db)): db_user = crud.get_user_by_id(db, user_id=user_id) if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user @app.post("/users/{user_id}/items/", response_model=schemas.Item) def create_item_for_user( user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db) ): return crud.create_user_item(db=db, item=item, user_id=user_id) @app.get("/items/", response_model=List[schemas.Item]) def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): items = crud.get_items(db, skip=skip, limit=limit) return items @app.put("/update_item/{item_id}/") def update_item(item_id: int,desc: str, db: Session = Depends(get_db)): db_item = crud.update_item_desc_by_id(db, id=item_id, desc=desc) return db_item @app.delete("/delete_item/{owner_id}/") def delete_item(owner_id: int, db: Session = Depends(get_db)): result = crud.delete_item_by_ownerId2(db, owner_id=owner_id) return result if __name__ == '__main__': import uvicorn uvicorn.run(app, host="127.0.0.1", port=8080)
这样,我们就可以直接从路径操作函数内部调用,如 crud.get_user 并使用该会话,来进行对数据库操作。
📌 get_db 创建依赖项
def get_db(): db = SessionLocal() try: yield db finally: db.close()
使用我们在 sql_app/database.py 文件中创建的 SessionLocal 来创建依赖项。 yield 的作用如该链接:tutorial/dependencies/dependencies-with-yield/
我们将 SessionLocal() 请求的创建和处理放在一个 try 块中。 然后我们在 finally 块中关闭它。 通过这种方式,我们确保数据库会话在请求后始终关闭。即使在处理请求时出现异常。
📌 Session
然后,当在路径操作函数中使用依赖项时,我们使用 Session,直接从 SQLAlchemy 导入的类型声明它。如:
db: Session = Depends(get_db)
这将为我们在路径操作函数中提供更好的编辑器支持,因为编辑器将知道 db 参数的类型 Session
一开始,编辑器并不真正知道提供了哪些方法。 但是通过将类型声明为Session,编辑器现在可以知道可用的方法(.add()、.query()、.commit()等)并且可以提供更好的支持。
📌 def 与 async def
本实例未使用 async def 异步,如需使用请参考:FastApi+sqlalchemy异步操作mysql
此时项目已经构建完成了,我们只需要在 main.py 文件中运行即可,我是使用 main 方式启动,也可采用命令行的方式启动项目。
if __name__ == '__main__': import uvicorn uvicorn.run(app, host="127.0.0.1", port=8000)
启动成功 👇
打开浏览器进入 http://127.0.0.1:8080/docs#/ 👇
这样的话你可以直接与你的 FastAPI 应用程序交互,从真实数据库中读取数据:
📌 建表脚本
因为你需要与数据库进行交互,那么就要创建相应的数据表,以下是对应的建表脚本:
CREATE TABLE `items` ( `id` int(11) NOT NULL AUTO_INCREMENT, `title` varchar(64) DEFAULT NULL, `description` varchar(64) DEFAULT NULL, `owner_id` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8
CREATE TABLE `users` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', `phone` varchar(64) DEFAULT NULL, `hashed_password` varchar(64) DEFAULT NULL, `is_active` tinyint(1) DEFAULT '1', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1
📌 创建中间件
可以添加中间件(只是一个函数)将为每个请求创建一个新的 SQLAlchemy SessionLocal,将其添加到请求中,然后在请求完成后关闭它。
@app.middleware("http") async def db_session_middleware(request: Request, call_next): response = Response("Internal server error", status_code=500) try: request.state.db = SessionLocal() response = await call_next(request) finally: request.state.db.close() return response # Dependency def get_db(request: Request): return request.state.db
request.state 是每个 Request 对象的属性。它用于存储附加到请求本身的任意对象,例如本例中的数据库会话。对于这种情况下,它帮助我们确保在所有请求中使用单个数据库会话,然后关闭。
使用 yield 依赖项与使用中间件,虽然效果类似,但也有一些区别:
yield 依赖项足以满足用例时,使用 yield 依赖项方法会更好。