- 官方文档:https://sqlmodel.tiangolo.com/
- FastAPI文档(SQLModel): https://fastapi.tiangolo.com/zh/tutorial/sql-databases/
起步
安装
pip install sqlmodel# mysql驱动
pip install pymysql
# postgresql驱动
pip install psycopg2-binary
连接引擎
from sqlmodel import SQLModel, create_engine# 内存
db_url = "sqlite+pysqlite:///:memory:"
# sqlite
db_url = "sqlite+pysqlite:///db_demo.sqlite"
# mysql
db_url = "mysql+pymysql://username:password@127.0.0.1:3306/db_name"
# postgresql
db_url = "postgresql://username:password@127.0.0.1:5432/db_name"# 创建引擎
engine = create_engine(db_url, echo=True)# 创建表格
SQLModel.metadata.create_all(engine)
数据表
import time
from sqlmodel import SQLModel, create_engine
from sqlmodel import Field, BigInteger, Integer, String, Boolean# 直接创建
class User(SQLModel, table=True):__tablename__ = "user"id: int | None = Field(sa_type=BigInteger, default=None, primary_key=True)name: str = Field(sa_type=String(32), description="用户名", sa_column_kwargs={"comment": "用户名"})# 使用基类
class BaseTable(SQLModel):id: int | None = Field(sa_type=BigInteger, default=None, primary_key=True)create_timestamp: int = Field(sa_type=BigInteger,default_factory=lambda: time.time_ns() // 1000000, # lambda使用default_factorydescription="创建时间戳",sa_column_kwargs={"comment": "创建时间戳"},)class UseBaseUser(BaseTable, table=True):__tablename__ = "use_base_user"name: str = Field(sa_type=String(32), sa_column_kwargs={"comment": "用户名"})age: int = Field(sa_type=Integer, default=0, ge=0, le=200, sa_column_kwargs={"comment": "年龄"})
字段 | 作用 | 示例 | 备注 |
---|---|---|---|
tablename | 自定义表名 | "user" | 可省略 |
sa_type | 数据表类型 | BigInteger | sa即SqlAlchemy |
sa_column_kwargs | 数据表参数 | sa即SqlAlchemy,可填写数据库的字段备注 文档:https://github.com/fastapi/sqlmodel/issues/492 |
|
default | 默认值 | None | |
default_factory | 默认值工厂 | lambda: ... | |
primary_key | 主键 | True | |
title | 标题 | 用于Swagger等 | |
description | 标题 | 用于Swagger等 | |
gt/ge/lt/le | 范围 | pydantic的校验 |
JSON数据
- sa_type为JSON,保存和获取都能使用对应的类型
from sqlmodel import SQLModel
from sqlmodel import Field, BigInteger, Integer, String, Boolean, JSON# 直接创建
class User(SQLModel, table=True):__tablename__ = "user"id: int | None = Field(sa_type=BigInteger, default=None, primary_key=True)name: str = Field(sa_type=String(32), description="用户名", sa_column_kwargs={"comment": "用户名"})# JSON格式,数据表中为json格式,存取都是对应的类型(list、dict)addrs: list[str] = Field(sa_type=JSON, default=[], sa_column_kwargs={"comment": "地址列表"})other: dict = Field(sa_type=JSON, default={}, sa_column_kwargs={"comment": "其他信息"})# 插入数据库不需要特殊处理
user = User(name="李四", addrs=["福州"], other={"a": 1, "b": 2})
with Session(engine) as session:session.add(user)session.commit()session.refresh(entity)# 读取结果也能自动转换
with Session(engine) as session:statement = select(User)result = session.exec(statement).all()for i in result:print(type(i.addrs), i.addrs)# <class 'list'> ['福州']print(type(i.other), i.other)# <class 'dict'> {'a': 1, 'b': 2}
枚举类
- 当前版本枚举类较为鸡肋,无法自动转换,需要转换成字符串,否则报错
- 而且不会校验字符串是否是枚举类
from enum import Enum
from sqlmodel import SQLModel
from sqlmodel import Field, BigInteger, String# 枚举类型
class SexEnum(Enum):MALE = "MALE"FEMALE = "FEMALE"UNKNOWN = "UNKNOWN"class User(SQLModel, table=True):__tablename__ = "user"id: int | None = Field(sa_type=BigInteger, default=None, primary_key=True)name: str = Field(sa_type=String(32), default="")# 枚举字段;数据库类型为字符串;不会自动转换,传入参数要加上.value转成字符串(default也一样)sex: SexEnum = Field(sa_type=String(32), default=SexEnum.UNKNOWN.value)# sex参数需要转换成字符串
user = User(name="zhangsan", age=18, sex=SexEnum.FEMALE.value)
SQL语句
- User实体类
from sqlmodel import SQLModel, Field, BigInteger, String, Integerclass User(SQLModel, table=True):id: int | None = Field(sa_type=BigInteger,default=None,primary_key=True,sa_column_kwargs={"comment": "自增id"},)name: str = Field(sa_type=String, sa_column_kwargs={"comment": "用户名"})age: int = Field(sa_type=Integer, default=0, sa_column_kwargs={"comment": "年龄"})
sql语句
- 执行语句中的
from sqlmodel import Session, text# 使用sql语句
def sql():with Session(engine) as session:statement = text('SELECT * FROM "user";')result = session.exec(statement).all() # .all()表示多个返回值return result# 带参数
def sql_with_params():with Session(engine) as session:statement = text('SELECT * FROM "user" WHERE id=:id;')result = session.exec(statement, params={"id": 2}).one() # .one()表示单个返回值return resultsession.exec(statement).all() # 返回多个值
session.exec(statement).one() # 返回单一值(返回0个或多个都会报错)
session.exec(statement).one_or_none() # 返回一个或None
session.exec(statement).first() # 返回一个或None(性能不好,语句是查询所有,然后返回第一条)
- 下面的语句实际都可以转换成sql
def to_sql():with Session(engine) as session:statement = select(User)print(statement)# SELECT "user".id, "user".name, "user".age FROM "user"result = session.exec(statement).all()return result
CRUD
新增
- 最佳实践
from sqlmodel import Sessiondef insert_entity(entity: User) -> None:with Session(engine) as session:session.add(entity)session.commit()# 重新绑定实体session.refresh(entity)user = User(name="张三", age=18)
insert_entity(user)
print(user) # 可获取完整数据,包括id
- 其他写法
from sqlmodel import Session, insertdef insert_entity_1() -> None:with Session(engine) as session:statement = insert(User).values(name="李四", age=10)session.exec(statement)session.commit()def insert_entity_2() -> None:with Session(engine) as session:statement = insert(User).values({"name": "王五", "age": 20})session.exec(statement)session.commit()def insert_entity_3(entity: User) -> None:with Session(engine) as session:statement = insert(User).values(entity.model_dump(exclude_none=True)) # 使用pydantic转换成字典(排除None是避免id传入None报错)session.exec(statement)session.commit()
更新
- 最佳实践
from sqlmodel import Sessiondef update_entity(entity: User) -> User:with Session(engine) as session:db_entity = session.get(User, entity.id)db_entity.sqlmodel_update(entity.model_dump())session.add(db_entity)session.commit()# 重新绑定实体session.refresh(db_entity)return db_entityuser = User(id=3, name="李四", age=20)
update_entity(user)
- 其他更新语句
- 无法返回更新后真正结果
from sqlmodel import Session, update# 配合where语句可用于批量更新
def update_with_args() -> None:with Session(engine) as session:statement = update(User).where(User.id == 2).values(name="args")session.exec(statement)session.commit()def update_with_kwargs() -> None:with Session(engine) as session:statement = update(User).where(User.id == 2).values({"name": "test"})session.exec(statement)session.commit()def update_entity(user: User) -> None:with Session(engine) as session:statement = update(User).where(User.id == user.id).values(user.model_dump())session.exec(statement)session.commit()
删除
- 最佳实践
from sqlmodel import Session, deletedef delete_by_id(id: int) -> None:with Session(engine) as session:statement = delete(User).where(User.id == id)session.exec(statement)session.commit()
- 其他用法
from sqlmodel import Sessiondef delete_by_id(id: int) -> None:with Session(engine) as session:db_entity = session.get(User, id)session.delete(db_entity)session.commit()
简单查询
- 使用id查询
from sqlmodel import Sessiondef select_by_id(id: int) -> User | None:with Session(engine) as session:user = session.get(User, id)return user
- 使用where语句
from sqlmodel import Session, select# 根据条件查询
def select_by_where(name: str, age: int) -> list[User]:with Session(engine) as session:statement = select(User).where(User.name == name, User.age == age)result = session.exec(statement).all()return result# 根据可选条件查询
def select_by_condition(name: str | None = None, age: int | None = None) -> list[User]:with Session(engine) as session:statement = select(User)if name is not None:statement = statement.where(User.name == name)if age is not None:statement = statement.where(User.age == age)result = session.exec(statement).all()return result
- like
from sqlmodel import Session, select# like语句
def select_by_like(name: str) -> list[User]:with Session(engine) as session:statement = select(User).where(User.name.like(f"%{name}%"))result = session.exec(statement).all()return result
复杂查询
- 获取数量
from sqlmodel import Session, select, func# 数量
def select_count():with Session(engine) as session:statement = select(func.count()).select_from(User)# 增加筛选# statement = statement.where(User.age > 18)result = session.exec(statement).one()return result
- 分页
from sqlmodel import Session, selectdef select_page() -> list[User]:with Session(engine) as session:statement = select(User).offset(0).limit(3)result = session.exec(statement).all()return result# 分页查询包括总数量
def select_page_with_count(offset: int = 0, limit: int = 10, name: str | None = None, age: int | None = None
):with Session(engine) as session:conditions = []if name is not None:conditions.append(User.name.like(f"%{name}%"))if age is not None:conditions.append(User.age == age)# 拼接条件(第一个参数True作用是避免条件为空)whereclause = and_(True, *conditions)# 语句count_statement = select(func.count()).select_from(User).where(whereclause)list_statement = select(User).where(whereclause).offset(offset).limit(limit)return {"count": session.exec(count_statement).one(),"list": session.exec(list_statement).all(),}
- 排序
from sqlmodel import Session, select, asc, desc# 数量
def select_by_order():with Session(engine) as session:statement = select(User).order_by(desc(User.age))result = session.exec(statement).all()return result
FastAPI
- response_model中如果设置了UserPageResp
- data的list[Any]会自动转成list[UserItem]
# 分页响应表格结果
class TablePageResp(PageResp):total: int = Field(default=0, title="总数量")data: list[Any] = Field(default=[], title="数据")# 响应结果
class UserPageResp(PageResp):total: int = Field(default=0, title="总数量")data: list[UserItem] = Field(default=[], title="数据")