目录
基础功能
简单封装
源码等资料获取方法
基础功能
import pymysql
from pymysql.cursors import DictCursor # 导入字典类型的游标对象# 连接数据库
db = pymysql.connect(host='192.168.3.109', # 数据库IP地址port=3306, # 数据库端口号user='root', # 数据库用户名password='123456', # 数据库用户密码db='test') # 连接的数据库名称# cursor() 方法可以创建一个游标对象
# 如果不指定游标返回的数据,则返回的数据默认为元组类型
# cursor = db.cursor()
# 指定为dict类型游标,则返回的数据为字典类型
cursor = db.cursor(DictCursor)# execute() 方法可以执行所有的原生SQL,成功执行则结果返回数字
# 比如:创建表格,则返回0
ret = cursor.execute("CREATE TABLE testsheet (name VARCHAR(20), gender CHAR(1),age int(2))") # 创建表格
print(F"创建表格:{ret}")# 比如:插入数据,则返回插入记录数
ret = cursor.execute("INSERT INTO testsheet (name,gender,age) VALUES('张三', '男', 22),('三二一', '女', 22)")
print(F"插入数据:{ret}")# 比如:查询数据,则返回记录数
ret = cursor.execute("select * from testsheet")
print(F"记录数:{ret}")# 比如:删除表格,则返回0
ret = cursor.execute("DROP TABLE testsheet")
print(F"删除表格:{ret}")# 使用fetchall()和fetchone()可以提取查询的数据,提取后数据删除
# fetchall() 提取所有数据,如果查询的数据为空,则返空,比如(),[];一次查询,多次提取,则返空
cursor.execute("select * from info")
ret = cursor.fetchall()
print(F"fetchall提取数据:{ret}")
ret = cursor.fetchall()
print(F"fetchall再次提取数据:{ret}")# fetchone() 提取第一条数据,如果查询的数据为空,则返回None;一次查询,多次提取,则返回提取后的第一条数据,直到提取完,则返None。
cursor.execute("select * from info")
ret = cursor.fetchone()
print(ret)
ret = cursor.fetchone()
print(ret)# 强调:使用execute()执行有参数的sql语句时,如果sql语句直接使用字符串初始化的方法存在SQL注入风险;需要使用execute(sql, 参数)的方式执行,防止SQL注入
# 比如查询ID为1的数据
id = 1
sql = "select * from info where id='%s'" % (id,)
cursor.execute(sql)
ret = cursor.fetchall()
print(F"查询ID为1的数据结果:{ret}")# 如果传入的值被恶意修改成下面的值,那么数据就存在脱库的安全问题
id = "1' or '1=1"
sql = "select * from info where id='%s'" % (id,)
cursor.execute(sql)
ret = cursor.fetchall()
print(F"SQL注入的查询结果:{ret}")# 防SQL注入的执行方式,如果使用上面的id参数,则会报SQL语句错误
sql = "select * from info where id='%s'"
try:cursor.execute(sql, (id,))ret = cursor.fetchall()
except:ret = "执行报错"
print(F"防SQL注入的查询结果:{ret}")# rollback() 回滚数据。只要没有commit,可以使用 rollback 回滚所有对数据库进行了修改的操作(增、删、改)
# db.rollback()# 提交游标的操作到数据库。
db.commit()# 关闭游标
cursor.close()
# 关闭数据库连接
db.close()
简单封装
import pymysql
from pymysql.cursors import DictCursor # 导入游标类型对象class Mysql(object):def __init__(self, host, user, password, port, db):# 连接数据库self.database = pymysql.connect(host=host, user=user, password=password, port=port, db=db)print("连接数据库")# 获取游标对象self.cursor = self.database.cursor() # 使用该游标返回的数据为元组类型print(self.cursor)self.cursor = self.database.cursor(DictCursor) # 使用该游标返回的数据为字典类型print(self.cursor)def __del__(self):"""对象销毁进行资源回收"""# 关闭游标self.cursor.close()# 关闭数据库连接self.database.close()print("__del__被执行")def execute(self, query, args=None, force=None):"""执行SQL:param query: sql语句:param args: sql语句中的参数:param force: 是否跳过对sql语句"where"的检查:return:返回游标对象"""# 对query传入的sql语句校验是否包含"where",若包含,args又没传参数则抛异常。校验的目的是防SQL注入if force is None:query = query.lower()if "where" in query and args is None:raise("检查到SQL中包含条件语句,但args参数为空,存在SQL注入的风险,若仍要执行,请将参数force的值修改为True")self.cursor.execute(query, args)# 返回游标对象就可以采用 execute().fetchone()的方式获取值return self.cursordef commit(self):"""提交数据,提交失败则回滚"""try:self.database.commit()return 0except Exception as e:self.database.rollback()return -1if __name__ == '__main__':db = Mysql(host="192.168.0.58", user='root', password='123456', port=3306, db="test")_id = "1 or 1=1"code = "000036"# SQL注入执行方式sql = "select * from info WHERE id=%s or code=%s" % (_id, code)# ret = db.execute(sql).fetchall()ret = db.execute(sql, force=True).fetchall()print(ret)print(F"SQL注入获取的数据数:{len(ret)}")# 防SQL注入的执行方式sql = "select * from info where id=%s or code=%s"ret = db.execute(sql, (_id, code)).fetchall()print(ret)print(F"防SQL注入获取的数据数:{len(ret)}")print("="*50)# 更新数据_id = 1code = "000011"sql = "UPDATE `d`.`info` SET `code` = %s WHERE `id` = %s"db.execute(sql, (code, _id))ret = db.commit()if ret == 0:print("数据更新成功")else:print("数据更新失败")# 获取指定字段值_id = 1sql = "select * from info where id=%s"ret = db.execute(sql, (_id,)).fetchone()print(f"获取id为{_id}的code字段值:{ret.get('code')}")
执行结果
源码等资料获取方法
各位想获取源码等教程资料的朋友请点赞 + 评论 + 收藏,三连!
三连之后我会在评论区挨个私信发给你们~