SQLAIchemy 异步DBManager封装-01入门理解

前言

SQLAlchemy 是一个强大的 Python SQL 工具包和对象关系映射(ORM)系统,是业内比较流行的ORM,设计非常优雅。随着其2.0版本的发布,SQLAlchemy 引入了原生的异步支持,这极大地增强了其在处理高并发和异步I/O场景下的能力。通过结合像greenlet、gevent这样的协程库,SQLAlchemy 使得异步数据库操作成为可能,从而提高了应用程序的性能和响应速度。

这里我将基于SQLAlchemy的异步支持,封装一些常用的增删改查(CRUD)操作到 https://github.com/HuiDBK/py-tools 中,以便在项目开发中更加便捷地使用。

Github: https://github.com/sqlalchemy/sqlalchemy

2.0文档:https://docs.sqlalchemy.org/en/20/index.html

简单使用

封装前,先简单介绍下如何使用 SQLAIchemy。

具体细节可以参考官网文档:https://docs.sqlalchemy.org/en/20/orm/quickstart.html

安装依赖

pip install sqlalchemy[asyncio]==2.0.20
pip install aiomysql==0.2.0

这里安装了 sqlalchemy 2.0版本,以及 aiomysql 异步数据库驱动,进行演示。

创建异步数据库引擎

from sqlalchemy.ext.asyncio import create_async_engine  # db_uri = "{protocol}://{user}:{password}@{host}:{port}/{db}"db_engine = create_async_engine("mysql+aiomysql://root:123456@127.0.0.1:3306/demo")

声明数据库表映射模型

from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_columnclass BaseOrmTable(DeclarativeBase):"""SQLAlchemy Base ORM Model"""__abstract__ = Trueid: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, comment="主键ID")class UserTable(BaseOrmTable):"""用户表"""__tablename__ = "user"username: Mapped[str] = mapped_column(String(30), default="", comment="用户昵称")password: Mapped[str] = mapped_column(String(30), default="", comment="用户密码")phone: Mapped[str] = mapped_column(String(11), default="", comment="手机号")email: Mapped[str] = mapped_column(String(30), default="", comment="邮箱")avatar: Mapped[str] = mapped_column(String(100), default="", comment="头像")

简单db操作


from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column# db_uri = "{protocol}://{user}:{password}@{host}:{port}/{db}"db_engine = create_async_engine("mysql+aiomysql://root:123456@127.0.0.1:3306/hui-demo")Session = async_sessionmaker(db_engine)async def create_tables():# 根据映射创建库表async with db_engine.begin() as conn:await conn.run_sync(BaseOrmTable.metadata.create_all)async def main():await create_tables()async with Session.begin() as session:# 添加用户new_user = UserTable(username='hui', email='huidbk@163.com')session.add(new_user)await session.flush()   # 刷新table 对象属性,获取新增的idprint(new_user.id)print("add user", new_user.__dict__)# 获取用户user = await session.get(UserTable, new_user.id)print("get user", user.__dict__)# 更新用户user.email = 'hui@163.com'await session.merge(user)print("updated user", user.__dict__)# 删除用户await session.delete(user)if __name__ == '__main__':# 运行主函数asyncio.run(main())

常用DB操作封装

SQLAlchemyManager

class SQLAlchemyManager(metaclass=SingletonMetaCls):DB_URL_TEMPLATE = "{protocol}://{user}:{password}@{host}:{port}/{db}"def __init__(self,host: str = "localhost",port: int = 3306,user: str = "",password: str = "",db_name: str = "",pool_size: int = 30,pool_pre_ping: bool = True,pool_recycle: int = 600,log: Union[logging.Logger] = None,):self.host = hostself.port = portself.user = userself.password = passwordself.db_name = db_nameself.pool_size = pool_sizeself.pool_pre_ping = pool_pre_pingself.pool_recycle = pool_recycleself.log = log or loggerself.db_engine: AsyncEngine = Noneself.async_session_maker: async_sessionmaker = Nonedef get_db_url(self, protocol: str = "mysql+aiomysql"):db_url = self.DB_URL_TEMPLATE.format(protocol=protocol, user=self.user, password=self.password, host=self.host, port=self.port, db=self.db_name)return db_urldef init_db_engine(self, protocol: str):"""初始化db引擎Args:protocol: 驱动协议类型Returns:self.db_engine"""db_url = self.get_db_url(protocol=protocol)self.log.info(f"init_db_engine => {db_url}")self.db_engine = create_async_engine(url=db_url, pool_size=self.pool_size, pool_pre_ping=self.pool_pre_ping, pool_recycle=self.pool_recycle)self.async_session_maker = async_sessionmaker(bind=self.db_engine, expire_on_commit=False)return self.db_enginedef init_mysql_engine(self, protocol: str = "mysql+aiomysql"):"""初始化mysql引擎Args:protocol: 驱动协议类型Returns:self.db_engine"""return self.init_db_engine(protocol=protocol) 

SQLAlchemyManager 主要封装一些数据库账户配置信息、连接池信息。

pool_size(连接池大小): 指定连接池中允许保持的最大连接数。当应用程序需要访问数据库时,连接池会维护一定数量的数据库连接,以便快速地响应请求。通常情况下,pool_size 的值应该根据应用程序的并发访问量和数据库的性能来进行调整。

pool_pre_ping(预检查连接): 指定是否在数据库连接被使用前对连接进行预检查。预检查可以确保连接处于活动状态,并且可以自动重新连接到数据库服务器,以防止连接由于长时间空闲而失效。启用预检查可以提高应用程序对数据库的可靠性和稳定性。

pool_recycle(连接回收时间): 指定数据库连接在被重新使用之前的最大空闲时间。当连接空闲时间超过 pool_recycle 设置的值时,连接将被关闭并重新创建,以防止连接长时间处于空闲状态而导致的连接问题。pool_recycle 的值通常设置为一个较小的时间间隔,以确保连接能够及时地得到回收和重建,从而提高连接的健壮性和性能。

init_db_engine 方法则是初始化数据库引擎,内部根据数据库配置信息

  • 构造异步的数据库引擎 db_engine
  • 维护一个 async_session_maker 数据库会话工厂

BaseORMTable 映射库表封装

from datetime import datetime
from sqlalchemy import func
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_columnclass BaseOrmTable(AsyncAttrs, DeclarativeBase):"""SQLAlchemy Base ORM Model"""__abstract__ = Trueid: Mapped[int] = mapped_column(primary_key=True, comment="主键ID")def __repr__(self):return str(self.to_dict())def to_dict(self, alias_dict: dict = None, exclude_none=True) -> dict:"""数据库模型转成字典Args:alias_dict: 字段别名字典 eg: {"id": "user_id"}, 把id名称替换成 user_idexclude_none: 默认排查None值Returns: dict"""alias_dict = alias_dict or {}if exclude_none:return {alias_dict.get(c.name, c.name): getattr(self, c.name)for c in self.__table__.columns if getattr(self, c.name) is not None}else:return {alias_dict.get(c.name, c.name): getattr(self, c.name, None)for c in self.__table__.columns}class TimestampColumns(AsyncAttrs, DeclarativeBase):"""时间戳相关列"""__abstract__ = Truecreated_at: Mapped[datetime] = mapped_column(default=datetime.now, comment="创建时间")updated_at: Mapped[datetime] = mapped_column(default=datetime.now, onupdate=datetime.now, comment="更新时间")deleted_at: Mapped[datetime] = mapped_column(nullable=True, comment="删除时间")class BaseOrmTableWithTS(BaseOrmTable, TimestampColumns):__abstract__ = True

创建一些基础的 ORM 类,以便后续的映射类可以继承并且共享一些公有属性和方法。

  1. BaseOrmTable 类:

    1. 定义了一个基础的 ORM 模型类,继承了 AsyncAttrsDeclarativeBase。这样做使得 BaseOrmTable 类具有了异步属性访问的能力,为异步编程提供便利,特别是在异步环境中访问具有延迟加载或者异步加载特性的属性。
    2. 提供了一个 to_dict 方法,用于将数据库模型转换为字典。它支持通过参数 alias_dict 指定字段别名,并且可以选择是否排除值为 None 的属性。
  2. TimestampColumns 类:

    1. 定义了一个包含时间戳相关列的抽象基类。这些列通常在很多数据库表中都会有,用于记录数据的创建时间、更新时间和删除时间。
    2. 这些列被设置为默认值,比如 created_atupdated_at 默认使用 datetime.now 函数来自动记录当前时间,deleted_at 则允许为空,用于标记数据的删除时间(可用作于逻辑删除)
  3. BaseOrmTableWithTS 类:

    1. 继承了 BaseOrmTableTimestampColumns,实际上是一个组合类,集成了基础的 ORM 功能和时间戳相关的列。
    2. 这个类进一步封装了 BaseOrmTableTimestampColumns,使得后续的映射类只需要继承这个类,就能够拥有基础的 ORM 功能和时间戳相关的列。

通过这种封装,你可以在后续的数据库映射类中更加专注于业务逻辑的实现,而不需要重复编写基础的 ORM 功能和时间戳相关的列,提高了代码的重用性和可维护性。

DBManager 数据库通用操作封装

前置封装说明

from typing import Any, List, Type, TypeVar, Union
from py_tools.connections.db.mysql import BaseOrmTable
from py_tools.meta_cls import SingletonMetaCls# 泛指 BaseOrmTable 所有子类实例对象类型   
T_BaseOrmTable = TypeVar("T_BaseOrmTable", bound=BaseOrmTable)
T_Hints = TypeVar("T_Hints")  # 用于修复被装饰的函数参数提示,让IDE有类型提示def with_session(method) -> T_Hints:"""兼容事务会话Args:method: orm 的 crudNotes:方法中没有带事务连接则,则构造Returns:"""@functools.wraps(method)async def wrapper(db_manager, *args, **kwargs):session = kwargs.get("session") or Noneif session:return await method(db_manager, *args, **kwargs)else:async with db_manager.transaction() as session:kwargs["session"] = sessionreturn await method(db_manager, *args, **kwargs)return wrapper

这里我提供了一个 with_session 装饰器,用于在需要数据库会话(事务)的数据库操作方法中自动开启事务,由于 sqlaichemy 官方推荐每个数据库操作都手动开启事务会话(自动提交),装饰器的设计没有时则构造,有则共享,这样不但可以减少冗余 async with db_manager.transaction() as session 的代码,也可以兼容多个操作共享同一个 session 有问题时进行事务回滚。

由于给方法加了通用的装饰器导致一些版本的IDE无法识别方法真实的签名,使用时会出现不知道方法的入参是什么,对于开发者来说是极其不方便的。

使用 typing 的 TypeVar 自定义类型来构造一个通用的泛型来当作函数返回的类型,进而修复。

from typing import TypeVar
T_Hints = TypeVar("T_Hints")  # 用于修复被装饰的函数参数提示,让IDE有类型提示def with_session(method) -> T_Hints:...

这里PyCharm 2023.2.4 版本升级到 2024.1 就有提示了,IDE修复了,可以不用 T_Hints 了。

一些旧版本构造 sqlaichemy 的库表对象时也会出现不知道类对象属性入参提示,升级到最新版本都解决了。

from contextlib import asynccontextmanagerclass DBManager(metaclass=SingletonMetaCls):DB_CLIENT: SQLAlchemyManager = Noneorm_table: Type[BaseOrmTable] = None@classmethoddef init_db_client(cls, db_client: SQLAlchemyManager):cls.DB_CLIENT = db_clientreturn cls.DB_CLIENT@classmethod@asynccontextmanagerasync def transaction(cls):"""事务上下文管理器"""async with cls.DB_CLIENT.async_session_maker.begin() as session:yield session@classmethod@asynccontextmanagerasync def connection(cls) -> AsyncIterator[AsyncConnection]:"""数据库引擎连接上下文管理器"""async with cls.DB_CLIENT.db_engine.begin() as conn:yield conn
  • init_db_client 方法用于初始化数据库客户端(引擎)。
  • transaction 则是简单的通过 contextlib 中 asynccontextmanager 封装一个异步的上下文管理器方便简洁的开启一个数据库会话(事务)进行数据库相关操作。
  • connection 数据库引擎连接上下文管理器。
  • orm_table 是具体继承 DBManager 的子类进行指定的,用于操作具体的库表(orm_table)。
  • DBManager 通过 SingletonMetaCls 元类实现单例模式。具体单例模式可以了解 https://juejin.cn/post/7272006755265380367 这篇文章有详细的介绍。

DB添加操作封装

    
class DBManager(metaclass=SingletonMetaCls):DB_CLIENT: SQLAlchemyManager = Noneorm_table: Type[BaseOrmTable] = None@with_sessionasync def bulk_add(self,table_objs: List[Union[T_BaseOrmTable, dict]],*,orm_table: Type[BaseOrmTable] = None,flush: bool = False,session: AsyncSession = None) -> List[T_BaseOrmTable]:"""批量插入Args:table_objs: orm映射类实例列表eg.[UserTable(username="hui", age=18), ...] or [{"username": "hui", "age": 18}, ...]orm_table: orm表映射类flush: 刷新对象状态,默认不刷新session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务Returns:成功插入的对象列表"""orm_table = orm_table or self.orm_tableif all(isinstance(table_obj, dict) for table_obj in table_objs):# 字典列表转成orm映射类实例列表处理table_objs = [orm_table(**table_obj) for table_obj in table_objs]session.add_all(table_objs)if flush:await session.flush(table_objs)return table_objs@with_sessionasync def add(self,table_obj: [T_BaseOrmTable, dict],*,orm_table: Type[BaseOrmTable] = None,session: AsyncSession = None) -> int:"""插入一条数据Args:table_obj: orm映射类实例对象, eg. UserTable(username="hui", age=18) or {"username": "hui", "age": 18}orm_table: orm表映射类session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务Returns: 新增的idtable_obj.id"""orm_table = orm_table or self.orm_tableif isinstance(table_obj, dict):table_obj = orm_table(**table_obj)session.add(table_obj)await session.flush(objects=[table_obj])  # 刷新对象状态,获取新增的idreturn table_obj.id

这里就是用 session.add 与 add_all 方法封装了数据库添加、批量添加的操作,封装的点主要在于除了 orm_table 实例对象入参还支持字典入参,内部还是转换成库表映射类实例来操作,最后通过 session.flush 方法,单个添加返回新增的主键id,批量添加则是返回实例对象列表。

设计的方法中有一个 * 号是参数的分隔符,它的作用是将其前面的参数声明为位置参数,而将 * 后面的参数声明为关键字参数,* 号后面的参数入参只能使用关键字形式的入参,我在很多的开源库中都看到了这样的设计,可以把一些函数语义连贯、常用必传的参数设置为位置参数,其他的则是关键字参数。这样可以明确参数的作用、提高函数的可读性、防止参数错误等。

具体看下使用案例:

import asynciofrom sqlalchemy import String
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_columnfrom py_tools.connections.db.mysql import BaseOrmTableWithTS, BaseOrmTable, DBManager, SQLAlchemyManagerclass UserTable(BaseOrmTableWithTS):"""用户表"""__tablename__ = "user"username: Mapped[str] = mapped_column(String(30), default="", comment="用户昵称")password: Mapped[str] = mapped_column(String(30), default="", comment="用户密码")phone: Mapped[str] = mapped_column(String(11), default="", comment="手机号")email: Mapped[str] = mapped_column(String(30), default="", comment="邮箱")avatar: Mapped[str] = mapped_column(String(100), default="", comment="头像")async def create_tables():# 根据映射创建库表(异步)# async with db_engine.begin() as conn:#    await conn.run_sync(BaseOrmTable.metadata.create_all)async with DBManager.connection() as conn:await conn.run_sync(BaseOrmTable.metadata.create_all)async def init_orm_manager():db_client = SQLAlchemyManager(host="127.0.0.1",port=3306,user="root",password="123456",db_name="hui-demo",)db_client.init_mysql_engine()DBManager().init_db_client(db_client)async def manager_crud():user = {"username": "hui", "email": "huidbk.163.com"}user_id = await DBManager().add(table_obj=user, orm_table=UserTable)print("user_id", user_id)users = [{"username": "zack", "email": "zack.163.com"},{"username": "wang", "email": "wang.163.com"}]add_users = await DBManager().bulk_add(table_objs=users, orm_table=UserTable)add_user_ids = [user.id for user in add_users]print("add_user_ids", add_user_ids)async def main():await create_tables()# await normal_crud()await init_orm_manager()await manager_crud()if __name__ == '__main__':# 运行主函数asyncio.run(main())

在程序启动时初始化好DBManager 的 DB_CLIENT 就可以直接使用封装的方法,主要就是 DB_CLIENT 作为类属性,后面DBManager 实例与子类实例对象都可以共享这个数据库引擎。但我这里还是不推荐上面的写法,DBManager 是一些通用的DB操作,而具体一些业务操作还是单独封装一些DB业务Manager类来进行会比较好,更利于扩展维护与复用。


class UserManager(DBManager):orm_table = UserTableasync def get_name_by_email(self, email):username = await self.query_one(cols=["username"], conds=[self.orm_table.email == email], flat=True)return usernameasync def manager_crud():# demo 2 (推荐)user = UserTable(username="hui-test01", email="hui-test01.163.com")user_id = await UserManager().add(table_obj=user)print("user_id", user_id)users = [UserTable(username="hui-test02", email="hui-test02.163.com"),UserTable(username="hui-test03", email="hui-test03.163.com"),]add_users = await UserManager().bulk_add(table_objs=users)add_user_ids = [user.id for user in add_users]print("add_user_ids", add_user_ids)username = await UserManager().get_name_by_email(email="huidbk.163.com")print("username", username)>>> out
user_id 4
add_user_ids [5, 6]
username hui

这里 UserManager 单独封装的 get_name_by_email 的方法就是业务中常用查询操作通过邮件获取用户名称,这里就是举一个简单的例子,具体DB业务具体封装而不是全部写在逻辑层,这样别人要用的时候就不用重新组织条件参数、上下文,而是简单传递业务参数进行复用获取数据。

UserManager 调用 add、bulk_add 等方法时也不用像 DBManager 指定 orm_table 参数,使用起来更简洁。具体是因为 UserManager 类指定了 类属性 orm_table = UserTable,再封装时有一句 orm_table = orm_table or self.orm_table 意思就是优先选择入参的orm_table,没有则是 self.orm_table (具体实例对象的orm_table)。这样写也体现出 封装、继承的灵活性。

这里也引出了另一个封装方法 query_one 查询单条数据。由于介绍了一些Demo如果把所有的封装方法混合到一起篇幅就太长,故而我准备分成三篇进行分别介绍,这样也更好阅读。

  1. SQLAIchemy 异步DBManager封装-01入门理解
  2. SQLAIchemy 异步DBManager封装-02熟悉掌握
  3. SQLAIchemy 异步DBManager封装-03得心应手

Github源代码

源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。

HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/636835.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AD高速板设计-DDR(笔记)

【一】二极管 最高工作频率: 定义:二极管的最高工作频率,即二极管在电路中能够正常工作的最高频率。常见的硅二极管的最高工作频率通常在几十MHz到几百MHz之间。在高频下,二极管可能无法有效地阻止反向电流,但也不会…

入门1-顺序结构

Hello,World! #include<iostream> using namespace std; int main(){ cout<<"Hello,World!"; return 0; }输出字符菱形 #include<iostream> using namespace std; int main() {// 上半部分菱形for(int i 1; i < 3; i){for(int j 1; j < 3…

智慧煤矿/智慧矿区视频汇聚存储与安全风险智能分析平台建设思路

一、建设背景 目前我国非常重视煤矿安全生产&#xff0c;并投入大量资金用于煤矿安全综合远程监控系统的研发。视频监控系统作为实现煤矿智能化无人开采的关键系统与煤矿安全生产的多系统协同分析与处理的关键信息源&#xff0c;在智慧矿山管控平台的建设中发挥着重要的作用。…

Linux学习之路 -- 进程篇 -- 环境变量

本文将介绍环境变量的相关内容&#xff0c;以及相关操作 目录 一、main 二、环境变量 1.为什么要有环境变量&#xff1f; 2.环境变量的特性以及命令行操作 <1>命令行操作&#xff1a; <2>特性&#xff1a; 一、main 介绍一下main函数的参数。 在一些教材上…

17. map和set的基本使用

1. 关联式容器 在初阶阶段&#xff0c;我们已经接触过STL中的部分容器&#xff0c;比如&#xff1a;vector、list、deque等&#xff0c;这些容器统称为序列式容器&#xff0c;因为其底层为线性序列的数据结构&#xff0c;里面存储的是元素本身。 那什么是关联式容器&#xff…

数据输入输出流(I/O)

文章目录 前言一、数据输入输出流是什么&#xff1f;二、使用方法 1.DataInputStream类2.DataOutoutStream类3.实操展示总结 前言 数据输入输出流也是将文件输入输出流打包后使用的对象。相比于文件输入输出流&#xff0c;数据输入输出流提供了简单易用的方法去操作不同类型的数…

Text2sql的一些技巧

最近看到了一篇关于text2sql的文章&#xff0c;以及一些论文。对使用模型做text2sql给了一些不错的建议。 参考文章&#xff1a;24年大模型潜力方向&#xff1a;大浪淘沙后的Text-to-SQL和Agent - 知乎 论文&#xff1a;https://arxiv.org/pdf/2403.09732.pdf 关于模型的建议 …

红外接收器的原理以及在STM32和51单片机中的应用

基本介绍&#xff1a; 红外接收器是一种用于接收红外线信号的装置&#xff0c;常见于各种电子设备中&#xff0c;如电视遥控器、空调遥控器等。它能够接收来自发射器发送的红外信号&#xff0c;并将其转换成电信号&#xff0c;以便设备进行相应的操作。红外接收器通常包含红外光…

AI大模型日报#0420:开源模型击败GPT-4、西湖大学蛋白质通用大模型、GPT的七条经验

导读&#xff1a; 欢迎阅读《AI大模型日报》&#xff0c;内容基于Python爬虫和LLM自动生成。目前采用“文心一言”生成了每条资讯的摘要。 标题: 开源模型打败GPT-4&#xff01;LLM竞技场最新战报&#xff0c;Cohere Command R上线 摘要: GPT-4在LLM竞技场被开源模型Cohere的…

9.Godot数组|遍历|静态变量|对象|调试

数组和字典的遍历 数组的概念 数组是一组数据的集合。在程序中负责批量处理数据。数组中的元素可以包括各个类型的数据&#xff0c;也可以对数组内数据类型进行限定。可以通过 数组名【数字】 的形式来访问数组元素&#xff0c;数字 0 代表数组的第一个元素。数组可以通过调用…

GPT状态和原理 - 解密OpenAI模型训练

目录 1 如何训练 GPT 助手 1.1 第一阶段 Pretraining 预训练 1.2 第二阶段&#xff1a;Supervised Finetuning有监督微调 1.3 第三阶段 Reward Modeling 奖励建模 1.4 第四阶段 Reinforcement Learning 强化学习 1.5 总结 2 第二部分&#xff1a;如何有效的应用在您的应…

Linux——匿名管道

为什么要有进程间通信&#xff1f; 在操作系统中&#xff0c;进程是独立运行的程序&#xff0c;多个进程之间要想互相协作完成任务&#xff0c;就需要进程间通信。 什么是进程间通信&#xff1f; 数据传输&#xff1a;一个进程需要将它的数据发送给另一个进程资源共享&#…