SQLAlchemy 使用封装实例

类封装

database.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-import sys
import json
import logging
from datetime import datetimefrom core.utils import classlock, parse_bool
from core.config import (MYSQL_HOST,MYSQL_PORT,MYSQL_USER,MYSQL_PASS,MYSQL_DATABASE,MYSQL_TIMEOUT
)from sqlalchemy import create_engine, Column, desc, not_, func
from sqlalchemy import Integer, String, Boolean, DateTime, Text, Enum     # Text存储大不固定长的字符串
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import SQLAlchemyErrorBase = declarative_base()log = logging.getLogger("log")SCHEMA_VERSION = "1.0.0"class User(Base):__tablename__ = "user"id = Column(Integer(), primary_key=True)file_size = Column(Integer(), nullable=False)      # nullable 不可为空md5 = Column(String(32), nullable=False)crc32 = Column(String(8), nullable=False)sha1 = Column(String(40), nullable=False)sha256 = Column(String(64), nullable=False)sha512 = Column(String(128), nullable=False)memory = Column(Boolean, nullable=False, default=False)ssdeep = Column(String(255), nullable=True)start_time = Column(DateTime(timezone=False), nullable=True, default=datetime.now)def __repr__(self):   # 查询返回的结果return "<User('{0}','{1}')>".format(self.id, self.sha256)def to_dict(self):"""将对象转换为dict.@return: dict"""d = {}for column in self.__table__.columns:d[column.name] = getattr(self, column.name)return ddef to_json(self):"""将对象转换为JSON.@return: JSON data"""return json.dumps(self.to_dict())class Version(Base):"""用于确定实际数据库架构发布的表."""__tablename__ = "version"version_num = Column(String(32), nullable=False, primary_key=True)class Database(object):"""分析队列数据库此类处理为内部队列创建数据库用户经营它还提供了一些与之交互的功能"""def __init__(self, schema_check=True, echo=False):"""@param dsn: 数据库连接字符串.@param schema_check: 禁用或启用数据库架构版本检查.@param echo: echo sql 查询."""self._lock = Noneself.schema_check = schema_checkself.echo = echodef connect(self, schema_check=None, dsn=None, create=True):"""连接到数据库后端."""if schema_check is not None:self.schema_check = schema_checkif not dsn:dsn = "mysql://{0}:{1}@{2}:{3}/{4}".format(MYSQL_USER, MYSQL_PASS, MYSQL_HOST, MYSQL_PORT, MYSQL_DATABASE)#dsn = "mysql://{username}:{password}@{hostname}:{port}/{database}"self._connect_database(dsn)# 禁用SQL日志记录。打开它进行调试.self.engine.echo = self.echo# 连接超时.self.engine.pool_timeout = MYSQL_TIMEOUT# 获取数据库会话.self.Session = sessionmaker(bind=self.engine)if create:self._create_tables()def _create_tables(self):"""创建所有数据库表等."""try:Base.metadata.create_all(self.engine)except SQLAlchemyError as e:raise ("无法创建或连接到数据库: %s" % e)# 处理架构版本控制.# TODO: it's a little bit dirty, needs refactoring.tmp_session = self.Session()if not tmp_session.query(Version).count():# 设置数据库架构版本.tmp_session.add(Version(version_num=SCHEMA_VERSION))try:tmp_session.commit()except SQLAlchemyError as e:raise ("无法设置架构版本: %s" % e)tmp_session.rollback()finally:tmp_session.close()else:# 检查数据库版本是否为预期版本.last = tmp_session.query(Version).first()tmp_session.close()if last.version_num != SCHEMA_VERSION and self.schema_check:log.warning("数据库架构版本不匹配:找到 %s,应为 %s.",last.version_num, SCHEMA_VERSION)log.error("(可选)进行备份,然后通过运行migrate应用最新的数据库迁移。")sys.exit(1)def __del__(self):"""断开连接池."""self.engine.dispose()def _connect_database(self, connection_string):"""连接到数据库.@param connection_string: 指定数据库的连接字符串"""try:if connection_string.startswith("sqlite"):# 使用“check_same_thread”在多个线程上禁用sqlite安全检查.self.engine = create_engine(connection_string, connect_args={"check_same_thread": False})elif connection_string.startswith("postgres"):# 禁用SSL模式以避免使用sqlalchemy和多进程时出现一些错误.# See: http://www.postgresql.org/docs/9.0/static/libpq-ssl.html#LIBPQ-SSL-SSLMODE-STATEMENTS# TODO 检查这是否仍然相关。特别是假设我们不再使用多处理.self.engine = create_engine(connection_string, connect_args={"sslmode": "disable"})else:self.engine = create_engine(connection_string)except ImportError as e:lib = str(e).split()[-1].strip("'")if lib == "MySQLdb":log.error("缺少MySQL数据库驱动程序(在Linux上使用 `pip install mysql-python` 安装,或在Windows上使用 `pip-install mysqlclient`)")if lib == "psycopg2":log.error("缺少PostgreSQL数据库驱动程序 (使用 `pip install psycopg2`)")log.error("缺少未知的数据库驱动程序,无法导入 %s" % lib)sys.exit(-1)@classlockdef add_user(self, file_size, md5, crc32, sha1, sha256, sha512, memory, ssdeep=None):session = self.Session()# 将空字符串和None值转换为有效的int# if not timeout:#     timeout = 0# if not priority:#     priority = 1#try:memory = parse_bool(memory)except ValueError:memory = False## try:#     enforce_timeout = parse_bool(enforce_timeout)# except ValueError:#     enforce_timeout = Falseuser = User()user.file_size = file_sizeuser.md5 = md5user.crc32 = crc32user.sha1 = sha1user.sha256 = sha256user.sha512 = sha512user.memory = memoryuser.ssdeep = ssdeepsession.add(user)try:session.commit()except SQLAlchemyError as e:log.error("数据库添加 user 错误: {0}".format(e))return Falsefinally:session.close()return True@classlockdef select_user(self, id=None):session = self.Session()try:search = session.query(User)if id:search = search.filter_by(id=id)# 排序# search = search.order_by(id)# search = search.order_by(desc(User.id))  倒叙tasks = search.all()return tasksexcept SQLAlchemyError as e:log.error("数据库查看所有 user 错误: {0}".format(e))return []finally:session.close()@classlockdef update_user(self, id, new_file_size):session = self.Session()try:search = session.query(User).filter(User.id == id).first()search.file_size = new_file_size# session.query(User).filter(User.id == 1).update({'file_size': 'new_file_size'})session.commit()except SQLAlchemyError as e:log.error("数据库更新 user 错误: {0}".format(e))return Falsefinally:session.close()return True@classlockdef delete_user(self, id):session = self.Session()try:search = session.query(User).filter(User.id == id).first()if search:session.delete(search)session.commit()except SQLAlchemyError as e:log.error("数据库删除 user 错误: {0}".format(e))return Falsefinally:session.close()return True

调用运行

merage.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-import time
import loggingfrom core.database import Databaselog = logging.getLogger("log")class Merge(object):def __init__(self):db = Database()db.connect()res = db.add_user(32, "11", "22", "33", "44", "55", "off")if not res:print("添加错误")print(db.select_user())# res = db.update_user(1, 50)# if not res:#     print("更新错误")# res = db.delete_user(2)# if not res:#     print("删除错误")if __name__ == '__main__':merge = Merge()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/133078.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

黑马JVM总结(三十二)

&#xff08;1&#xff09;类加载器-线程上下文1 使用的应用程序类加载器来完成类的加载&#xff0c;不是用的启动类加载器&#xff0c;jdk在某些情况下要打破&#xff0c;双亲委派的模式&#xff0c;有时候需要调用应用程序类加载器来完成类的加载&#xff0c;否则有些类它是找…

从读不完一篇文章,到啃下20万字巨著,大模型公司卷起“长文本”

点击关注 文丨郝 鑫 编丨刘雨琦 4000到40万token&#xff0c;大模型正在以“肉眼可见”的速度越变越“长”。 长文本能力似乎成为象征着大模型厂商出手的又一新“标配”。 国外&#xff0c;OpenAI经过三次升级&#xff0c;GPT-3.5上下文输入长度从4千增长至1.6万token&…

黑马JVM总结(三十一)

&#xff08;1&#xff09;类加载器-概述 启动类加载器-扩展类类加载器-应用程序类加载器 双亲委派模式&#xff1a; 类加载器&#xff0c;加载类的顺序是先依次请问父级有没有加载&#xff0c;没有加载自己才加载&#xff0c;扩展类加载器在getParent的时候为null 以为Boots…

STM32 CubeMX PWM三种模式(互补,死区互补,普通)(HAL库)

STM32 CubeMX PWM两种模式&#xff08;HAL库&#xff09; STM32 CubeMX STM32 CubeMX PWM两种模式&#xff08;HAL库&#xff09;一、互补对称输出STM32 CubeMX设置代码部分 二、带死区互补模式STM32 CubeMX设置代码 三、普通模式STM32 CubeMX设置代码部分 总结 一、互补对称输…

运维大数据平台的建设与实践探索

随着企业数字化转型的推进&#xff0c;运维管理面临着前所未有的挑战和机遇。为应对日益复杂且严峻的挑战&#xff0c;数字免疫系统和智能运维等概念应运而生。数字免疫系统和智能运维作为新兴技术&#xff0c;正引领着运维管理的新趋势。数字免疫系统和智能运维都借助大数据运…

基本微信小程序的购物商城系统

项目介绍 随着互联网的趋势的到来&#xff0c;各行各业都在考虑利用互联网将自己的信息推广出去&#xff0c;最好方式就是建立自己的平台信息&#xff0c;并对其进行管理&#xff0c;随着现在智能手机的普及&#xff0c;人们对于智能手机里面的应用购物平台小程序也在不断的使…

vscode 资源管理器移动到右边

目录 vscode 资源管理器移动到右边 vscode 资源管理器移动到右边 点击 文件》首选项》设置》工作台》外观》 找到这个配置下拉选择左右

神秘的锦衣卫

在看明朝电视剧经常听到的一句台词&#xff1a;锦衣卫办案&#xff0c;闲杂人等速速离开。锦衣卫是明朝特务机构&#xff0c;直接听命于皇帝&#xff0c;是亲军卫之一&#xff0c;也是最重要的一卫。 1、卫所制 卫所制是明代最主要的军事制度&#xff0c;其目标是寓兵于农、屯…

Java数据结构第十七章、手撕位图

给40亿个不重复的无符号整数&#xff0c;没排过序。给一个无符号整数&#xff0c;如何快速判断一个数是否在这40亿个数中。【腾讯】 1. 遍历&#xff0c;时间复杂度O(N) 2. 排序(O(NlogN))&#xff0c;利用二分查找: logN 3. 位图解决 数据是否在给定的整形数据中&#xff0c;结…

C++学习——“面向对象编程”的涵义

以下内容源于C语言中文网的学习与整理&#xff0c;非原创&#xff0c;如有侵权请告知删除。 类是一个通用的概念&#xff0c;C、Java、C#、PHP 等很多编程语言中都支持类&#xff0c;都可以通过类创建对象。我们可以将类看做是结构体的升级版&#xff0c;C语言的晚辈们看到了C…

《理解深度学习》2023最新版本+习题答案册pdf

刚入门深度学习或者觉得学起来很困难的同学看过来了&#xff0c;今天分享的这本深度学习教科书绝对适合你。 就是这本已在外网获13.1万次下载的宝藏教科书《理解深度学习》。本书由巴斯大学计算机科学教授Simon J.D. Prince撰写&#xff0c;全书共541页&#xff0c;目前共有21…

天锐绿盾加密软件——企业数据防泄密-CAD图纸、文档、源代码加密管理系统@德人合科技

天锐绿盾是一款专门为企业提供数据防泄密和文档加密管理的软件。该软件通过加密技术保护企业的核心数据&#xff0c;防止数据泄露和侵权行为&#xff0c;同时提供了全方位的文档加密管理系统&#xff0c;实现了对企业数据的安全保障和有效管理。 PC访问地址&#xff1a; isite…