Flask框架
简介
特点:
- 微框架,间接,给开发者提供很大的扩展性
- Flask和相应的插件写得很好,用起来很爽。
- 开发效率非常高,比如使用 SQLAlchemy 的 ORM 操作数据库可以节省开发者大量书写 sql 的时 间。
文档地址
-
中文文档(http://docs.jinkan.org/docs/flask/)
-
英文文档(http://flask.pocoo.org/docs/1.0/)
Flask本身相当于一个内核,其他几乎所有的功能都要用到扩展(邮件扩展Flask-Mail
,用户认证Flask-Login
,数据库Flask-SQLAIchemy
)都需要用第三方插件实现。
扩展列表
- Flask-SQLaIchemy: 操作数据库
- Flask-script:插入脚本
- Flask-migrate:管理迁移数据库
- Flask-Session:Session存储方式指定
- Flask-WTF:表单
- Flask-Mail:邮件
- Flask-Bable:提供国际化和本地化支持,翻译
- Flask-Login:认证用户状态
- Flask-OpenID:认证
- Flask-RESTful:开发REST API的工具
- Flask-Bootstrap:集成前端Twitter Bootstrap框架
- Flask-Moment:本地化日期和时间
- Flask-Admin:简单而可扩展的管理接口的框架
第一个Flask程序
安装Flask
pip install flask
#从flask包中导入Flask类
from flask import Flask
#创建一个Flask对象
app = Flask(__name__)
#@app.route:是一个装饰器
#@app.route('/')就是将url中 / 映射到hello_world
设个视图函数上面
#以后你访问我这个网站的 / 目录的时候 会执行
hello_world这个函数,然后将这个函数的返回值返回给浏
览器
@app.route('/')
def hello_world():return '尚学堂'
#启动这个WEB服务
if __name__ == '__main__':#默认为5000端口app.run() #app.run(port=8000)
启动运行
python helloworld.py
通过对象运行
运行程序时,可以指定运行的主机ip地址,端口
app.run(host='0.0.0.0',port=5000)
参数解释
host
- 主机IP地址,可以不传
- 默认localhost
port
- 端口号,可以不传
- 默认5000
Debug模式与配置参数加载
好处
- 可以热加载
- 可以将错误信息显示在控制台
# 引入flask应用
from flask import Flask# 创建对象
app = Flask(__name__)# 路由地址
@app.route('/')
def index():# return 代表将数据返回给浏览器return 'world 'if __name__ == '__main__':# 默认端口是5000app.run(debug=True)
开启debug模式
-
运行时传递参数
app.run(debug=True)
-
通过app.debug参数配置
app.debug = True
app.run()
通过修改配置参数
app.config.update(DEBUG = True)
app.config['DEBUG'] = True
app.run()
通过mapping加载
app.config.from_mapping({'DEBUG':True})
app.run()
通过配置对象设置config
class Config:
DEBUG = True
app.config.from_object(config)
app.run()
通过配置文件设置config
config.py
DEBUG = True
config.json
app.py
app.config.from_pyfile('config.py')
app.config.from_json('config.json')
通过环境变量
DEBUG = True
app.config.from_envvar('DEBUG')
URL 与函数的映射(动态路由)
URL路径参数
比如,有一个请求访问的接口地址为 /users/11001 ,其中 11001 实际上为 具体的请求参数,表明请求 11001 号用户的信息。
@app.route('/users/<user_id>')
def user_info(user_id):print(type(user_id))return 'hello user{}'.format(user_id)
其中 ,尖括号是固定写法,语法为 , variable 默认的 数据类型是字符串。
如需要指定类型,则要写成converter:variable,其中converter就是类型名称,可以有以下几种
- string:如果没有指定具体的数据类型,那么默认就是使用 string 数据类型。
- int:数据类型只能传递 int 类型。
- float:数据类型只能传递 float 类型。
- path:数据类型和 string 有点类似,都是可以接收任意的字符串,但是 path 可以接收路径,也就 是说可以包含斜杠。
- uuid:数据类型只能接收符合 uuid 的字符串。 uuid 是一个全宇宙都唯一的字符串,一般可以用来 作为表的主键。
- any:数据类型可以在一个 url 中指定多个路径。例如:
将上面的例子以整型匹配数据,可以如下使用:
@app.route('/users/<int:user_id>')
def user_info(user_id):print(type(user_id))return f'正在获取 ID {user_id} 的用户信息'@app.route('/users/<int(min=1):user_id>')
def user_info(user_id):print(type(user_id))return f'hello user {user_id}'
注意:
若是数据与设置的类型不能匹配,则会返回Not Found
自定义装换器
- 创建转换器类,保存匹配是的正则表达式
from werkzeug.routing import BaseConverterclass MobileConverter(BaseConverter):""" 手机号格式"""regex = r'1[3-9]\d{9}'# 注意 regex 名字是固定的
- 将自定义的转换器告知Flask应用
app = Flask(__name__)
# 将自定义的转换器添加到转换器字典中,并指定转换器使用时名字为:mobile
if __name__ == '__main__':# 将自定义转换器添加到转换器字典中,并指定转换器使用名字:mobileapp.url_map.converters['mobile'] = MobileConverter
- 在使用转换器的地方定义使用
请求
查询参数获取
# 获取请求参数
@app.route('/')
def index():uname = request.args.get('uname')pwd = request.args.get('pwd')pwd2 = request.values.get('pwd')return '{}:{}'.format(uname,pwd)
请求体参数
# 接收表单参数
@app.route('/', methods=['POST'])
def index2():uname = request.form.get('uname')pwd = request.values.get('pwd')age = request.form.get('age')return f"Hello! {uname} == {pwd} =={age}"
上传文件
# 上传文件
@app.route('/upload', methods=['POST'])
def upload():f = request.files['pic']# 获取文件名fname = f.filename# with open('./demo.png', 'wb') as new_file:# new_file.write(f.read())f.save('./demo.png')return '上传成功'
其他参数
@app.route('/args')
def args():cookies = request.cookies.get('uid')headers = request.headers.get('Content_Type')url = request.urlmethod = request.methodreturn f'上传成功!! {cookies} =={headers} =={url} == {method}'
url_for函数
根据函数,获取对应url,url_for 函数可以实现这个功能
注意
url_for 函数可以接收1个及以上的参数,他接收函数名作为第 一个参数
如果还出现其他的参数,则会添加到 URL 的后面作为查询参 数。
@app.route('/post/list/<page>/')
def my_list(page):return 'my list'@app.route('/')
def hello_world():return url_for('my_list',page=2,num=8)# return "/post/list/2?num=8"
使用url_for函数的原因
- 将来如果修改了url,但没有修改该URL对应的函数名,就不用到处去替换URL了
- url_for()函数会转义一些特殊字符和unicode字符串,这些事情url_for会自动的帮我们
@app.route('/')
def hello_world():return url_for('login', next='/')# /login/?next=/# 会自动的将/编码,不需要手动去处理。# url=/login/?next=%2F
技巧
在定义url的时候,一定要记得在最后面加一个斜杠
- 如果不加斜杠,那么在浏览器中访问这个url的时候,如果最 后加了斜杠,那么就访问不到。这样用户体验不太好。
- 搜索引擎会将不加斜杠的和加斜杠的视为两个不同的url。而 其实加和不加斜杠的都是同一个url,那么就会给搜索引擎造成 一个误解。加了斜杠,就不会出现没有斜杠的情况。
响应
响应_重定向
-
永久性重定向
http的状态码是301,多用于旧网址被废弃要转到一个新的网址确保用户的访问
-
暂时重定向
http的状态码是302,表示页面的暂时性跳转
比如:访问一个需要权限的网址,如果用户没有登录,应该重定向到登录页面
flask中重定向
重定向是通过 redirect(location,code=302) 这个函数来实现的, location表示 需要重定向到的 URL, 应该配合之前讲的 url_for() 函数来使用, code 表示采用哪个重定向,默认是 302 也即 暂时性重定向, 可以 修改成 301 来实现永久性重定向
from flask import
Flask,request,url_for,redirect
app = Flask(__name__)
@app.route('/')
def hello_world():return 'Hello World!'
@app.route('/login/')
def login():return '这是登录页面'
#falsk中重定向
@app.route('/profile/')
def proflie():if request.args.get('name'):return '个人中心页面'else:# return redirect(url_for('login'))returnredirect(url_for('login'),code=302)
if __name__ == '__main__':app.run(debug=True)
响应_响应内容
返回字符串
from flask import redirectd
@app.route('/return_str')
def return_str():return "你好,少年"
返回json
from flask import jsonifyapp.config['JSON_AS_ASCII'] = False@app.route('/return_json1')
def return_json1():json_dict = {"msg_int": 10,"msg_str": "你好,少年"}return jsonify(json_dict)
@app.route('/return_json2')
def return_json2():json_dict = {"msg_int": 10,"msg_str": "你好,少年"}return json_dict
元组方式
可以返回一个元组,元组中必须至少包含一个项目,且项目应当由 (response, status) 、 (response, headers) 或者 (response, status, headers) 组成。 status 的值会重载状态代码, headers 是一个由额外头部值组成的列表 或字 典
status 值会覆盖状态代码, headers 可以是一个列表或字典,作 为额外的消息标头值。
@app.route('/demo1')
def demo1():# return '状态码为 666', 666# return '状态码为 666', 666,
[('itbaizhan', 'Python')]return '状态码为 666', 666, {'itbaizhan':
'Python'}
响应_自定义响应
创建response
from flask import Response
@app.route('/return_str')
def return_str():return Response("你好,少年")
make_response方式
@app.route('/demo2')
def demo2():resp = make_response('make response测试')resp.headers['itbaizhan'] = 'Python'resp.status = '404 not found'return resp
模板
模板Template
MVT 设计模式中的 T , Template
M全拼为Model,与MVC中的M功能相同,负责和数据库交 互,进行数据处理。
V全拼为View,与MVC中的C功能相同,接收请求,进行业务处 理,返回应答。
T全拼为Template,与MVC中的V功能相同,负责封装构造要返 回的html。
模板的使用
在 Flask中,配套的模板是 Jinja2,Jinja2的作者也是Flask的作者。 这个模板非常的强大,并且执行效率高。
使用步骤:
- 创建模板
- 在应用
同级目录
下创建模板文件夹templates
,文件名夹名称固定写法 - 在
templates
文件夹下,创建应用
同名文件夹. 例, Book - 在
应用
同名文件夹下创建网页模板
文件. 例 : index.htm
- 在应用
- 设置模板查找路径
- 模板处理数据
from flask import Flask, render_templateapp = Flask(__name__,render_template('templates'))
# 默认使用templates
# 如果想要修改模板的目录,可以设置template_folder参数@app.route('/')
def index():return render_template('index_03.html')if __name__ == '__main__':app.run(debug=True)
模板_传参
在使用render_template
渲染模版的时候,可以传递关键字参数(命名参数)。
from flask import Flask,render_template
app = Flask(__name__)
@app.route('/')
def hello_world():return
render_template('index.html',uname='sxt')
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SXT</title>
</head>
<body>从模版中渲染的数据<br>{{ uname}}
</body>
</html>
小技巧
如果你的参数项过多,那么可以将所有的参数放到一个字典中,
然后在传这个字典参数的时候,使用两个星号,将字典打散成关键字参数(也叫命名参数)
@app.route('/')
def hello_world():context = {'uname': 'momo','age': 18,'country': 'china','childrens': {'name': 'mjz','height': '62cm'}}return
render_template('index.html',**context)
获取方式是: {{childrens.name}}
或者 {{childrens['name']}}
模板使用url_for函数
提示
在模板中使用函数,需要在函数左右两边加上2个{{}}
例如: {{ url_for(func) }}
@app.route('/accounts/login/<name>/')
def login(name):print(name)return '通过URL_FOR定位过来的!!!'
<a href="{{url_for('login',p1='abc',p2='ddd',name='尚学堂') }}">登录</a>
注意
无论是 路径参数 还是查询参数 都可以直接传递
过滤器介绍
在模版中,过滤器相当于是一个函数,把当前的变量传入到过滤器 中,然后过滤器根据自己的功能,再返回相应的值,之后再将结果渲染到页面中
@app.route('/')
def hello_world():return render_template('index.html',postion=-1)
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SXT</title>
</head>
<body><h3>过滤器的基本使用</h3><p>位置的绝对值为[未使用过滤器]:{{ postion}}</p><p>位置的绝对值为[使用过滤器]:{{postion|abs}}</p>
</body>
</html>
Jinja模板自带过滤器
过滤器是通过管道符号|
使用的,例如: { name|length }} 将返回name的 长度。
过滤器相当于是一个函数,把当前的变量传入到过滤器中,然后过滤器根据自己的功能,再返回相应的值,之后再将结果渲染到页面中。
Jinja2中内置了许多过滤器
https://jinja.palletsprojects.com/en/3.0.x/templates/#filters
defalut过滤器
<body><h1>default过滤器</h1>过滤前的昵称数据是:{{nick_name}}<br>过滤后的昵称数据是:{{nick_name | default('用户1',boolean=true)}}<br>过滤后的昵称数据是:{{nick_name or '用户2'}}
<br>
</body>
转义字符
<body><h1>转义字符过滤器</h1><!-- 模板中默认 做了转义字符的效果 -->转义前的数据是:{{ info | safe }} <!-- 不转义:不将特殊字符转换成 <类似的数据 -->{% autoescape true %} <!-- false代表不再转义特殊字符 / true 转义特殊字符 <-->{{info }} <!-- 转义:将特殊字符转换成 <类似的数据 -->{% endautoescape %}
</body>
其他过滤器
<body><h1>其它过滤器</h1>绝对值:{{ -6 | abs }}<br>小数: {{ 6 | float }}<br>字符串:{{ 6 | string }}<br>格式化:{{'%s--%s' | format('我','你')}}<br>长度:{{'我是九,你是三,除了你,还是你'|length}}<br>最后一个:{{'我是九,你是三,除了你,还是你'|last}}<br>第一个:{{'我是九,你是三,除了你,还是你'|first}}<br>统计次数: {{'我是九,你是三,除了你,还是你' |wordcount }}<br>替换:{{'===我是九,你是三,除了你,还是你====' |replace('我是九,你是三,除了你,还是你','拿着,这个无限额度的黑卡,随便刷')}}
</body>
小提示
jinja2模板 默认全局开启了自动转义功能
- safe 过滤器:可以关闭一个字符串的自动转义
- escape 过滤器:对某一个字符串进行转义
- autoescape 标签,可以对他包含的代码块关闭或开启自动转义
- {% autoescape true/false %} 代码块
自定义过滤器
如果在模版中调用这个过滤器,那么就会将这个变量的值作为第一 个参数传给过滤器这个函数,然后函数的返回值会作为这个过滤器的返回值。需要使用到一个装饰器: @app.template_filter('过滤器名称')
自定义数据替换过滤器
例如:将新闻中出现的 所有“ 我是九你是三,除了你还是你” 换成 "你不用多好,我喜欢就好
#将模版设置为自动加载模式
app.config['TEMPLATES_AUTO_RELOAD']=True
@app.template_filter('cut')
def cut(value):value=value.replace("我是九你是三,除了你还是你",'你不用多好,我喜欢就好')return value
<p>使用自定义过滤器:{{新闻内容值|cut}}</p>
自定义时间过滤器
例如:操作发布新闻 与现在的时间间隔
# 需求:操作发布新闻 与现在的时间间隔
@app.template_filter('handle_time')
def handle_time(time):"""time距离现在的时间间隔1. 如果时间间隔小于1分钟以内,那么就显示“刚刚”2. 如果是大于1分钟小于1小时,那么就显示“xx分钟前”3. 如果是大于1小时小于24小时,那么就显示“xx小时前”4. 如果是大于24小时小于30天以内,那么就显示“xx天前”5. 否则就是显示具体的时间 2030/10/2016:15"""if isinstance(time, datetime):now = datetime.now()timestamp = (now - time).total_seconds()if timestamp < 60:return "刚刚"elif timestamp >= 60 and timestamp < 60 * 60:minutes = timestamp / 60return "%s分钟前" % int(minutes)elif timestamp >= 60 * 60 and timestamp < 60 * 60 * 24:hours = timestamp / (60 * 60)return '%s小时前' % int(hours)elif timestamp >= 60 * 60 * 24 and timestamp < 60 * 60 * 24 * 30:days = timestamp / (60 * 60 * 24)return "%s天前" % int(days)else:return time.strftime('%Y/%m/%d %H:%M')else:return time
(其他逻辑语法知识未整理)
静态文件
静态文件:css文件 js文件 图片文件等文件
加载静态文件使用的是 url_for 函数。然后第一个参数为 static ,第二个参数为一个关键字参数filename='路径' 。
语法
{{ url_for("static",filename='xxx') }}
注意
路径查找,要以当前项目的static目录作为根目录
视图
add_url_rule 与 app.route
-
add_url_rule
add_url_rule(rule,endpoint=None,view_func=None)
:这个方法用来添加url与视图函数的映射。如果没有填写
endpoint
,那么默认会使用view_func
的名字作为endpoint
。 以后在使用url_for
的时候,就要看在映射的时候有没有传递endpoint
参数,如果传递了,那么就应该使用endpoint
指定的字符串,如果没有传递,那么就应该使用view_func
的名字。def my_list():return "我是列表页" app.add_url_rule('/list/',endpoint='sxt',view_func=my_list)
-
app_route原理剖析
这个装饰器底层,其实也是使用
add_url_rule
来实现url与函数映射的from flask import Flask,url_forapp = Flask(__name__) @app.route('/',endpoint='index') def index():print(url_for('show'))print(url_for('index'))return "Hello" def show_me():return "这个介绍信息!!"# endpoint 没有设置,url_for中就写函数的名字,如果设置了,就写endpoint的值 app.add_url_rule('/show_me',view_func=show_me,endpoint='show') # @app.route 底层就是使用的 add_url_rule if __name__ =='__main__':app.run(debug=True)
类视图
类视图的好处是支持继承,但类视图不能和函数视图一样,写完类视图还需要通过
app.add_url_rule(url_rule,view_func)
来进行注册
标准类视图使用步骤
-
必须继承自
flask.views.View
-
必须实现
dispatch_request
方法,以后请求过来后,都会执行这个方法这个方法的返回值就相当于是之前的视图函数一样。也必须返回
Response
或者子类的对象,或者是字符串,或者是元组。 -
必须通过
app.add_url_rule(rule,endpoint,view_func)
来做url与视图的映射。view_func
这个参数,需要使用类视图下的as_view
类方法类转换: ListView.as_view('list') 。 -
如果指定了
endpoint
,那么在使用url_for
反转的时候就必须使用endpoint
指定的那个值。如果没有指定endpoint
,那么就可以使用as_view(视图名字)
中指定的视图名字来作为反转。from flask import Flask,url_for from flask.views import View app= Flask(__name__) @app.route('/') def index():# print(url_for('mylist'))print(url_for('my'))return 'Hello' class ListView(View):def dispatch_request(self):return '返回了一个List的内容!!' # app.add_url_rule('/list',view_func=ListView.as_view('mylist'))app.add_url_rule('/list',endpoint='my',view_func=ListView.as_view('mylist'))# 用于测试 with app.test_request_context():print(url_for('my'))if __name__ =='__main__':app.run(debug=True)
装饰器
python装饰器就是用于拓展原来函数功能的一种函数,这个函数的特殊之处在于它的返回值也是一个函数,
- 在视图函数中使用自定义装饰器,那么自定义装饰器必须放在
app.rute
下面,否则这个装饰器就起不到任何作用
定义装饰器
def login_required(func):@wraps(func)def wrapper(*arg,**kwargs):uname = request.args.get('uname')pwd = request.args.get('pwd')if uname == 'zs' and pwd == '123':logging.info(f'{uname}:登录成功')return func(*arg,**kwargs)else:logging.info(f'{uname}:尝试登录,但没成功')return '请先登录'return wrapper
使用装饰器
@app.route('/settings/')
@login_requierd
def settings(): return '这是设置界面'
- 在类视图中使用装饰器,需要重写类视图的一个类属性
decorators
这个类属性是一个列表或元组都可以,里面装的就是所有的装饰器
class ProfileView(views.View): decorators = [login_requierd] def dispatch_request(self): return '这是个人中心界面'
app.add_url_rule('/profile/',view_func=ProfileView.as_view('profile'))
蓝图Blueprint
可以理解为是存储一组视图方法的容器对象,其具有如下特点:
- 一个应用具有多个Blueprint
- 可以将一个Blueprint注册到任何一个未使用的URL下比如 “/user” 、 “/goods”
- Blueprint可以单独具有自己的模板、静态文件或者其它的通用操作方法,它并不是必须要实现应用的视图和函数的
- 在一个应用初始化时,就应该要注册需要使用的Blueprint
注意
Blueprint并不是一个完整的应用,它不能独立与应用运行,而必须要注册到一个应用中
使用方式
-
创建一个蓝图对象
user_bp=Blueprint('user',__name__)
-
在这个蓝图对象上
@user_bp.route('/') def user_profile():return 'user_profile'
-
在应用对象上注册这个蓝图对象
app.register_blueprint(user_bp)
单文件蓝图
可以将创建蓝图与定义视图放到一个文件中
import logging
from flask.blueprints import Blueprint
from flask import Flaskapp = Flask(__name__)logging.basicConfig(level=logging.INFO)@app.route('/')
def index():logging.info('输出了Hello!!')return 'Hello'user = Blueprint('user', __name__)@user.route('/user')
def index():return '用户模板'app.register_blueprint(user)if __name__ =='__main__':app.run(debug=True)
指定蓝图url前缀
app.register_blueprint(user_bp,url_prefix='/user')
app.register_blueprint(goods_bp,url_prefix='/goods')
蓝图的目录结构
根据功能模块
对于一个打算包含多个文件的蓝图,通常将创建蓝图对象放到 Python包的 init.py 文件中
--------- project # 工程目录|------ main.py # 启动文件|------ user #用户蓝图| |--- __init__.py # 此处创建蓝图对象| |--- view.py | |--- ...|------ goods # 商品蓝图| |--- __init__.py| |--- ...|...
根据技术模块
--------- project # 工程目录|------ main.py # 启动文件|------ view #用户蓝图| |--- user.py # 此处创建蓝图对象| |--- item.py | |--- view.py| |--- ...|...
例:
# main.py
from flask import Flask
import logging
app = Flask(__name__)logging.basicConfig(level=logging.INFO)@app.route('/')
def index():logging.info('输出了Hello!!')return 'Hello'from user import userapp.register_blueprint(user)if __name__ =='__main__':app.run(debug=True)# __init__.py
from flask.blueprints import Blueprint
user = Blueprint('user', __name__)
from user import view# view.py
from user import user
@user.route('/user')
def index():return '用户模板'
蓝图中模板文件
寻找规则
- 如果项目中的templates文件夹中有相应的模版文件,就直接使用了。
- 如果项目中的templates文件夹中没有相应的模版文件,那么就到在定义蓝图的时候指定的路径中寻找。
- 并且蓝图中指定的路径可以为相对路径,相对的是当前这个蓝图文件所在的目录
因为这个蓝图文件是在user/view.py,那么就会到blueprints这个 文件夹下的user_page文件夹中寻找模版文件。
小总结:
常规:蓝图文件在查找模版文件时,会以templates为根目录进行查找
user_bp = Blueprint('user',__name__,url_prefix='/user',template_folder='user_page')
Flask高级部分
Cookie
Flask设置Cookie
设置cookie是在response对象上设置
flask.Response 对象有一个 set_cookie 方法,可以通过这个方法来设置 cookie 信息。
from flask import Flask, make_response
from flask import requestapp = Flask(__name__)
@app.route('/cookie')
def set_cookie():resp = make_response('set cookie ok')resp.set_cookie('uname', 'itbaizhan')return resp# 查看cookie
@app.route('/get_cookie')
def get_cookie():resp = request.cookies.get('uname')return resp# 删除cookie
@app.route('/delete_cookie')
def delete_cookie():response = make_response('helloworld')response.delete_cookie('uname')return response
cookie过期时间
- 默认的过期时间:如果没有显示的指定过期时间,那么这个cookie 将会在浏览器关闭后过期。
- max_age:以秒为单位,距离现在多少秒后cookie会过期。
- expires:为datetime类型。这个时间需要设置为格林尼治时间, 相对北京时间来说会自动+8小时
- 如果max_age和expires都设置了,那么这时候以max_age为标准。
from flask import Flask,Responseapp = Flask(__name__)@app.route('/')
def index():return 'Hello!!'@app.route('/create_cookie/defualt/')
def create_cookie1():resp = Response('通过默认值,设置cookie有效期')# 如果没有设置有效期,默认会在浏览器关闭的时候,让cookie过期resp.set_cookie('uname','sxt')return resp@app.route('/create_cookie/max_age/')
def create_cookie2():resp = Response('通过max_age,设置cookie有效期')# max_age以秒为单位设置cookie的有效期age = 60*60*2resp.set_cookie('uname','itbaizhan',max_age=age)return respfrom datetime import datetime@app.route('/create_cookie/expires/')
def create_cookie3():resp = Response('通过expires,设置cookie有效期')# expires 以指定时间为cookie的有效期# 16+8 == 24tmp_time = datetime(2021, 11,11,hour=18,minute=0,second=0)resp.set_cookie('uname','python',expires=tmp_time)return respfrom datetime import timedelta
@app.route('/create_cookie/expires2/')
def create_cookie4():resp = Response('通过expires,设置cookie有效期')# expires 以指定时间为cookie的有效期tmp_time = datetime.now() +
timedelta(days=2)resp.set_cookie('uname','python_sql',expires=tmp_time)return resp
@app.route('/create_cookie/exp_max/')
def create_cookie5():resp = Response('通过expires与max_age,设置cookie有效期')# expires 与max_age同时设置了,会以max_age为准tmp_time = datetime.now() + timedelta(days=2)resp.set_cookie('uname','python_sql',expires=tmp_time,max_age = 60*60*2)return respif __name__ == '__main__':app.run(debug=True)
Session
flask中使用Session
需要设置SECRET_KEY
class DefaultConfig(object):SECRET_KEY = 'fih9fh9eh9gh2'
app.config.from_object(DefaultConfig)# 或者直接设置
app.secret_key='xihwidfw9efw'
设置,修改
from flask import session@app.route('/set_session/')
def set_session():session['username'] = 'itbaizhan'return 'set session ok'
读取
@app.route('/get_session/')
def get_session():username = session.get('username')return 'get session username{}'.format(username)
删除
@app.route('/del_session/')
def delete_session():#删除指定的key的session# session.pop('uname')#删除session中的所有的key 【删除所有】session.clear()return '删除成功'
Flask设置Session的有效期
如果没有设置session的有效期。那么默认就是浏览器关闭后过期。
如果设置session.permanent=True,那么就会默认在31天后过期。
- session.permanent=True
- 可以设置
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hour=2)
在两个小时后过期。
from flask import Flask,session
from datetime import timedeltaapp = Flask(__name__)
app.secret_key = 'sdfdfdsfsss'
app.config['PERMANENT_SESSION_LIFETIME'] =timedelta(days=2)@app.route('/')
def index():return 'Hello!!'@app.route('/set_session/')
def set_session():# 设置session的持久化,默认是增加了31天session.permanent = Truesession['uname'] = '10001'return '设置一个Session的信息'@app.route('/get_session/')
def get_session():# 如果服务器关闭掉了,session的有效期,依然是之前系统保存日期# 如果secret_key设置是一个固定的值,那么服务器重启不会影响session的有效器# 如果secret_key设置不是一个固定的值,那么服务器之前设置的session将全部过期return session.get('uname')if __name__ == '__main__':app.run(debug=True)
Local对象
需求:
- 要实现并发效果, 每一个请求进来的时候我们都开启一个进程, 这显然是不合理的, 于是就可以使用线程
- 那么线程中数据互相不隔离,存在修改数据的时候数据不安全的问题
在Flask中,类似于 request 对象,其实是绑定到了一个 werkzeug.local.Local
对象上。
这样,即使是同一个对象,那么在多个线程中都是隔离的。类似的对象还有 session 对象。
from werkzeug.local import Local
#flask=werkzeug + sqlalchemy + jinja2
ThreadLocal变量
Python提供了ThreadLocal变量,它本身是一个全局变量,但是每个线程却可以利用它来保存属于自己的私有数据,这些私有数据对其他线程也是不可见的。
from threading import Thread,locallocal =local()
local.request = '具体用户的请求对象'
class MyThread(Thread):def run(self):local.request = 'sxt'print('子线程:',local.request)
mythread = MyThread()mythread.start()
mythread.join()print('主线程:',local.request)
from werkzeug.local import Local
local = Local()
local.request = '具体用户的请求对象'
class MyThread(Thread):def run(self):local.request = 'sxt'print('子线程:',local.request)
mythread = MyThread()
mythread.start()
mythread.join()
print('主线程:',local.request)
总结
只要满足绑定到"local"或"Local"对象上的属性,在每个线程中都是 隔离的,那么他就叫做 ThreadLocal 对象,也叫'ThreadLocal'变量。
Flask_app上下文
注意
在视图函数中,不用担心应用上下文的问题。因为视图函数要执行,那么肯定是通过访问url的方式执行的, 那么这种情况下,Flask底层就已经自动的帮我们把应用上下文都推入到了相应的栈中。
如果想要在视图函数外面执行相关的操作,
比如: 获取当前的app名称,那么就必须要手动推入应用上下文
第一种方式: 便于理解的写法
from flask import Flask,current_appapp = Flask(__name__)#app上下文
app_context = app.app_context()
app_context.push()print(current_app.name)
@app.route('/')
def hello_world():print(current_app.name) #获取应用的名称return 'Hello World!'if __name__ == '__main__':app.run(debug=True)
第二种方式: 用with语句
from flask import Flask,current_app
app = Flask(__name__)#app上下文
#换一种写法
with app.app_context():print(current_app.name)@app.route('/')
def hello_world():print(current_app.name) #获取应用的名称return 'Hello World!'if __name__ == '__main__':app.run(debug=True)
Flask_request上下文详解
注意
在视图函数中,不用担心请求上下文的问题。因为视图函数要执行,那么肯定是通过访问url的方式执行的,那么这种情况下,Flask底层就已经自动的帮我们把应用上下文和请求上下文都推入到了相应的栈中。
注意
如果想要在视图函数外面执行相关的操作,
比如反转url,那么就必须要手动推入请求上下文:
底层代码执行说明:
- 推入请求上下文到栈中,会首先判断有没有应用上下文
- 如果没有那么就会先推入应用上下文到栈中
- 然后再推入请求上下文到栈中
from flask import Flask,url_for
app = Flask(__name__)
@app.route('/')
def index():url = url_for('test_url')return f'Hello!==={url}'@app.route('/test/')
def test_url():return '这个是为了测试请求上下文'# RuntimeError: Attempted to generate a URL without the application context being pushed.
# This has to be executed when application context is available.
# with app.app_context():
# url = url_for('test_url')
# print(url)
# RuntimeError: Application was not able to create a URL adapter for request independent URL generation.
# You might be able to fix this by setting the SERVER_NAME config variable.with app.test_request_context():url = url_for('test_url')print(url)if __name__ == '__main__':app.run(debug = True)
Flask_线程隔离的g对象
保存为全局对象g对象的好处:
g对象是在整个Flask应用运行期间都是可以使用的。
并且也跟request一样,是线程隔离的。
这个对象是专门用来存储开发者自己定义的一些数据,方便在整个 Flask程序中都可以使用。
g对象使用场景:有一个工具类utils.py 和 用户办理业务:
def funa(uname):print(f'funa {uname}')
def funb(uname):print(f'funb {uname}')
def func(uname):print(f'func {uname}')
用户办理业务
from flask import Flask,request
from utils import funa,funb,func
app = Flask(__name__)
#Flask_线程隔离的g对象使用详解
@app.route("/profile/")
def my_profile():#从url中取参uname = request.args.get('uname')#调用功能函数办理业务funa(uname)funb(uname)func(uname)#每次都得传参 麻烦,引入g对象进行优化return "办理业务成功"if __name__ == '__main__':app.run(debug=True)
优化utils工具
from flask import g
def funa():print(f'funa {g.uname}')
def funb():print(f'funb {g.uname}')
def func():print(f'func {g.uname}')
优化用户办理业务
from flask import Flask,request,g
from utils import funa,funb,func
app = Flask(__name__)
#Flask_线程隔离的g对象使用详解@app.route("/profile/")
def my_profile():#从url中取参uname = request.args.get('uname')#调用功能函数办理业务# funa(uname)# funb(uname)# func(uname)#每次都得传参 麻烦,引入g对象进行优化g.uname = unamefuna()from flask import Flask,request,g
from utils import funa,funb,func
app = Flask(__name__)
#Flask_线程隔离的g对象使用详解
@app.route("/profile/")
def my_profile():#从url中取参uname = request.args.get('uname')#调用功能函数办理业务# funa(uname)# funb(uname)# func(uname)#每次都得传参 麻烦,引入g对象进行优化g.uname = unamefuna()funb()func()return "办理业务成功"
if __name__ == '__main__':app.run(debug=True)
Flask_钩子函数介绍
常见钩子函数
-
before_first_request:处理项目的第一次请求之前执行。
@app.before_first_request def first_request(): print('first time request')
-
before_request 每次请求之前执行
@app.before_request def before_request(): if not hasattr(g,'glo1'): setattr(g,'glo1','想要设置的')
-
teardown_appcontext 不管是否有异常,注册的函数都会在每次请求之后执行。
@app.teardown_appcontext def teardown(exc=None): if exc is None: db.session.commit() else: db.session.rollback() db.session.remove()
-
template_filter 在使用Jinja2模板的时候自定义过滤器。
@app.template_filter("upper") def upper_filter(s): return s.upper()
-
context_processor 上下文处理器。使用这个钩子函数,必须返回一个字典。这个字典中的值在所有模版中都可以使用。这个钩子函数的函数是,如果一些在很多模版中都要用到的变量,那么就可以使用这个钩子函数来返回,而不用在每个视图函数中 的 render_template 中去写,这样可以让代码更加简洁和好维护。
@app.context_processor def context_processor():if hasattr(g,'user'):return {"current_user":g.user}else:return {}
-
errorhandler errorhandler接收状态码,可以自定义返回这 种状态码的响应的处理方法。在发生一些异常的时候,比如404 错误,比如500错误,那么如果想要优雅的处理这些错误,就可以 使用 errorhandler 来出来。
@app.errorhandler(404) def page_not_found(error): return 'This page does not exist',404
Flask_信号机制
pip install blinker
自定义信号机制步骤
-
创建一个信号: 定义信号需要使用到blinker这个包的Namespace类来创建一个命名空间。比如定义一 个在访问了某个视图函数的时候的信号。示例代码如下:
# Namespace的作用:为了防止多人开发的时候,信号名字 冲突的问题 from blinker import Namespace mysignal = Namespace() signal1 = mysignal.signal('信号名称')
-
监听一个信号: 监听信号使用signal1对象的connect方法,在这个方法中需要传递一个函数,用来监听 到这个信号后做该做的事情。示例代码如下:
def func1(sender,uname):print(sender)print(uname) signal1.connect(func1)
-
发送一个信号: 发送信号使用signal1对象的send方法,这个方法可以传递一些其他参数过去。示例代 码如下:
signal1.send(uname='momo')
代码演示:
from flask import Flask
from blinker import Namespace
app = Flask(__name__)#【1】信号机制 3步走
# Namespace:命名空间#1.定义信号
sSpace = Namespace()
fire_signal = sSpace.signal('发送信号火箭')#2.监听信号
def fire_play(sender):print(sender)print("start play")
fire_signal.connect(fire_play)#3.发送一个信号
fire_signal.send()if __name__ == '__main__':app.run(debug=True)
Flask信号使用场景_存储用户登录日志
定义一个登录的信号,以后用户登录进来以后,就发送一个登录信号,然后能够监听这个信号,在监听到这个信号以后,就记录当前这个用户登录的信息 用信号的方式,记录用户的登录信息即登录日志
编写一个signals.py文件创建登录信号
from blinker import Namespace
from datetime import datetime
from flask import request,gnamespace = Namespace()#创建登录信号
login_signal = namespace.signal('login')
def login_log(sender):# 用户名 登录时间 ip地址now = datetime.now()ip = request.remote_addrlog_data = "{uname}*{now}*{ip}".format(uname=g.uname, now=now, ip=ip)with open('login_log.txt','a') as f:f.write(log_data + "\n")f.close()
#监听信号
login_signal.connect(login_log)
使用信号存储用户登录日志
from flask import Flask,request,g
from signals import login_signalapp = Flask(__name__)@app.route('/login/')
def login():# 通过查询字符串的形式来传递uname这个参数uname = request.args.get('uname')if uname:g.uname = uname# 发送信号login_signal.send()return '登录成功!'else:return '请输入用户名!'if __name__ == '__main__':app.run(debug=True)
Flask_内置信号
Flask内置了10个常用的信号
- template_rendered:模版渲染完成后的信号。
- **before_render_template:模版渲染之前的信号。 **
- **request_started:请求开始之前,在到达视图函数之前发送信号。 **
- **request_finished:请求结束时,在响应发送给客户端之前发送信号。 **
- **request_tearing_down:请求对象被销毁时发送的信号,即使在请求过程中发生异常也会发送信 号。 **
- got_request_exception:在请求过程中抛出异常时发送信号,异常本身会通过exception传递到订 阅(监听)的函数中。一般可以监听这个信号,来记录网站异常信息。
- appcontext_tearing_down:应用上下文被销毁时发送的信号。
- appcontext_pushed:应用上下文被推入到栈上时发送的信号。
- appcontext_popped:应用上下文被推出栈时发送的信号。
- message_flashed:调用了Flask的 flash 方法时发送的信号。
WTForms_表单验证/模板渲染
pip install flask-wtf
WTForms表单验证的基本使用
- 自定义一个表单类,继承自wtforms.Form类。
- 定义好需要验证的字段,字段的名字必须和模版中那些需要验证的input标签的name属性值保持一 致。
- 在需要验证的字段上,需要指定好具体的数据类型。
- 在相关的字段上,指定验证器。
- 以后在视图函数中,只需要使用这个表单类的对象,并且把需要验证的数据,也就是request.form 传给这个表单类,再调用表单类对象.validate()方法进行,如果返回True,那么代表用户输入的数 据都是符合格式要求的,Flase则代表用户输入的数据是有问题的。如果验证失败了,那么可以通 过表单类对象.errors来获取具体的错误信息。
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>某系统注册页面</title>
</head>
<body>
<form action="/register/" method="post"><table><tr><th>用户名:</th><td><input type="text"
name="uname"></td></tr><tr><th>密码:</th><td><input type="password"
name="pwd"></td></tr><tr><th>确认密码:</th><td><input type="password"
name="pwd2"></td></tr><tr><td></td><td><input type="submit"
value="注册"></td></tr></table>
</form>
</body>
</html>
from flask import
Flask,render_template,request
from wtforms import Form,StringField
from wtforms.validators import
Length,EqualTo
app = Flask(__name__)
@app.route('/')
def index():return 'Hello! '
class RegisterForm(Form):uname = StringField(validators= [Length(min=2,max=10,message='用户名长度2-10之间')])pwd = StringField(validators=[Length(min=2,max=10)])pwd2 = StringField(validators=[Length(min=2,max=10),EqualTo('pwd',message='2次密码不一致')])@app.route('/register/', methods=['GET','POST'])
def register():if request.method == 'GET':return render_template('register.html')else:form = RegisterForm(request.form)if form.validate(): # 验证成功:True,失败:Falsereturn '验证成功!'else:return f'验证失败!{form.errors}'if __name__ == '__main__':app.run(debug=True)
WTForms常用验证器
-
Length:字符串长度限制,有min和max两个值进行限制。
username = StringField(validators=[Length(min=3,max=10,message="用户名长度必须在3到10位之间")])
-
EqualTo:验证数据是否和另外一个字段相等,常用的就是密码和确认密码两个字段是否相等。
password_repeat = StringField(validators= [Length(min=6,max=10),EqualTo("password")])
-
Email: 验证上传的数据是否是邮箱格式数据
email = StringField(validators=[Email()])
-
InputRequired:验证该项数据为必填项,即要求该项非空
username = StringField(validators= [input_required()])
-
NumberRange:数值的区间,有min和max两个值限制,如果 处在这两个数字之间则满足。
age = IntegerField(validators= [NumberRange(12,18)])
-
Regexp:定义正则表达式进行验证,如验证手机号码。
phone = StringField(validators= [Regexp(r'1[34578]\d{9}')])
-
URL:必须是URL的形式 如http://www.bjsxt.com。
home_page = StringField(validators=[URL()])
-
UUID:验证数据是UUID类型。
uuid = StringField(validators=[UUID()])
formscheck.py表单验证工具类文件
from wtforms import
Form,StringField,IntegerField
from wtforms.validators import
Length,EqualTo,Email,InputRequired,NumberRange,Regexp,URL,UUIDclass RegisterForm(Form):uname =StringField(validators= [Length(min=2,max=15,message='用户名长度必须在2-15之间')])pwd = StringField(validators= [Length(min=6,max=12)]) pwd2 = StringField(validators=[Length(min=6,max=12),EqualTo("pwd")])class RegisterForm2(Form):email = StringField(validators=[Email()])uname = StringField(validators=[InputRequired()])age = IntegerField(validators=[NumberRange(18,40)])phone = StringField(validators=[Regexp(r'1[34578]\d{9}')])phomepage = StringField(validators=[URL()])uuid = StringField(validators=[UUID()])
WTForms自定义验证器
自定义验证器步骤如下
-
定义一个方法,方法的名字规则是: validate_字段名(self,field) 。
-
在方法中,使用 field.data 可以获取到这个字段的具体的值。
-
验证时,如果数据满足条件,那么可以什么都不做。如果验证失败,
那么应该抛出一个 wtforms.validators.ValidationError 的异常,并且把验证失败的信息传到这个异常类中。
Flask安全上传文件_访问文件
上传文件步骤:
- 在模版html中,表单需要指定 enctype='multipart/form-data' 才能上传文 件。
- 在后台如果想要获取上传的文件,那么应该使用 request.files.get('文件 名') 来获取。
- 保存文件之前,先要使用 werkzeug.utils.secure_filename 来对上传上来的文 件名进行一个过滤。能保证不会有安全问题。
- 获取到上传上来的文件后,使用 文件对象.save(路径) 方法来保存文件。 路径=完整路径=路径名+文件名
from flask import
Flask,request,render_template
import os
from werkzeug.utils import secure_filename
app = Flask(__name__)
UPLOAD_PATH =
os.path.join(os.path.dirname(__file__),'imag
es')
@app.route('/upload/',methods=
['GET','POST'])
def upload():if request.method == 'GET':return render_template('upload.html')else:desc = request.form.get("desc")pichead = request.files.get("pichead")filename = secure_filename(pichead.filename) #包装一下 保证文件安全#pichead.save(os.path.join(UPLOAD_PATH,pichead.filename)) #可优化pichead.save(os.path.join(UPLOAD_PATH,filename)) #已优化print(desc)return '文件上传成功'if __name__ == '__main__':app.run(debug=True)
Restful
pip install flask-restful
基本使用
定义Restful的类视图:
-
从 flask_restful 中导入 Api ,来创建一个 api 对象。
-
写一个类视图,让他继承自 Resource 类,然后在这个里面,使用 你想要的请求方式来定义相应的方法,比如你想要将这个类视图只 能采用 post 请求,那么就定义一个 post 方法。
-
使用 api.add_resource 来添加类视图与 url 。
from flask import Flask,url_for # pip install flask-restful from flask_restful import Resource,Api app = Flask(__name__)# 建立Api对象,并绑定应用APP api = Api(app) class LoginView(Resource):def get(self):return {"flag":True}def post(self):return {"flag":False}# 建立路由映射 # api.add_resource(LoginView,'/login/') api.add_resource(LoginView,'/login/','/login2/',endpoint='login') with app.test_request_context():# werkzeug.routing.BuildError: Could notbuild url for endpoint 'LoginView'.# Did you mean 'loginview' instead?# 默认没有写endpoint反向url_for函数通过小写函数名# 如果有多个url,会返回第1个URL# print(url_for('loginview'))print(url_for('login'))if __name__ == '__main__':app.run(debug=True)
SQLAlchemy
SQLAlchemy是一个ORM框架
关系对象映射: 对象模型与数据库表的映射
class Person:name = 'xx'age = 18country ='xx'# Person类 -> 数据库中的一张表
# Person类中的属性 -> 数据库中一张表字段
# Person类的一个对象 -> 数据库中表的一条数据
# p = Person('xx',xx)
# p.save()
# insert into table values ('xx',xx)
在操作数据库之前,选确保已经安装了以下软件:
-
mysql
-
pymysql
pip install pymysql
-
SQLAlchemy
pip install SQLAlchemy
操作数据库
-
连接数据库
from sqlalchemy import create_engine def conn_db1():# 数据库的变量HOST = '192.168.30.151' # 127.0.0.1/localhostPORT = 3306DATA_BASE = 'flask_db'USER = 'root'PWD = '123'# DB_URI = f'数据库的名+驱动名://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'DB_URI = f'mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'engine = create_engine(DB_URI)# 执行一个SQLsql = 'select 2;'conn = engine.connect()rs = conn.execute(sql)print(rs.fetchone())
-
执行原生sql
def conn_db2():# 数据库的变量HOST = '192.168.30.151' # 127.0.0.1/localhostPORT = 3306DATA_BASE = 'flask_db'USER = 'root'PWD = '123'# DB_URI = f'数据库的名+驱动名://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'DB_URI = f'mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}''''# 创建一个引擎,专门链接数据库用的engine = create_engine(DB_URI)sql = 'create table t_user(id int primary key auto_increment, name varchar(32));'# 链接数据库conn = engine.connect()# 执行SQL即可conn.execute(sql)''' def conn_db3():# 数据库的变量HOST = '192.168.30.151' #127.0.0.1/localhostPORT = 3306DATA_BASE = 'flask_db'USER = 'root'PWD = '123'# DB_URI = f'数据库的名+驱动名://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'DB_URI = f'mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'# 创建一个引擎,专门链接数据库用的engine = create_engine(DB_URI)sql = 'create table t_user1(id int primary key auto_increment, name varchar(32));'# 链接数据库with engine.connect() as conn:# 执行SQL即可conn.execute(sql)
-
ORM模型映射到数据库中
-
用
declarative_base
根据engine
创建一个ORM基类from sqlalchemy.ext.declarative import declarative_baseengine = create_engine(DB_URI) Base = declarative_base(engine)
-
用这个
Base
类作为基类来写自己的ORM类。要定义__tablename__
类 属性,来指定这个模型映射到数据库中的表名class Person(Base):__tablename__ ='t_person'
-
创建属性来映射到表中的字段,所有需要映射到表中的属性都应 该为Column类型
class Person(Base):__tablename__ ='t_person'# 在这个ORM模型中创建一些属性,来跟表中的字段进行一一映射。# 这些属性必须是sqlalchemy给我们提供好的数据类型id = Column(Integer,primary_key=True,autoincrement=True)name = Column(String(50))age = Column(Integer)country = Column(String(50))
-
-
使用 Base.metadata.create_all() 来将模型映射到数据库中
Base.metadata.create_all()
注意
一旦使用了
Base.metadata.create_all()
将模型映射到数据库中后,即使改变了模型的字段,也不会重新映射了
对数据的crud
构建session对象
所有和数据库的ORM操作都必须通过一个叫做session的会话对象来实现,通过以下代码来获取会话对象
from sqlalchemy.orm import sessionmakerengine = create_engine(DB_URI)
Base = declarative_base(engine)
session = sessionmaker(engine)()
添加对象
def create_data_one():with Session() as session:p1 = Person(name ='百战',age = 6 ,country='北京')session.add(p1)session.commit()def create_data_many():with Session() as session:p2 = Person(name ='吕布',age = 19 ,country='北京')p3 = Person(name ='貂蝉',age = 18 ,country='北京')session.add_all([p2,p3])session.commit()
查找对象
def query_data_all():with Session() as session:all_person =
session.query(Person).all()for p in all_person:print(p.name)def query_data_one():with Session() as session:p1 = session.query(Person).first()print(p1.name)def query_data_by_params():with Session() as session:# p1 = session.query(Person).filter_by(name='吕布').first()p1 = session.query(Person).filter(Person.name =='吕布').first()print(p1.age)
修改对象
def update_data():with Session() as session:p1 = session.query(Person).filter(Person.name == '吕布').first()p1.age = 20# 提交事务session.commit()
删除对象
将需要删除的对象从数据库中查出来,然后使用session.delete方法将这条数据从session中删除,最后commit就行了
def delete_data():with Session() as session:p1 = session.query(Person).filter(Person.name =='貂蝉').first()session.delete(p1)session.commit()
常用数据类型
-
Integer:整形,映射到数据库中是int类型。
-
Float:浮点类型,映射到数据库中是float类型。他占据的32 位。
-
Double:双精度浮点类型,映射到数据库中是double类型,占据64位 (SQLALCHEMY中没有)。
-
String:可变字符类型,映射到数据库中是varchar类型.
-
Boolean:布尔类型,映射到数据库中的是tinyint类型。
-
DECIMAL:定点类型。是专门为了解决浮点类型精度丢失的问题的。在存储钱相关的字段的时候建议大家都使用这个数据类型。
这个类型使用的时候需要传递两个参数,第一个参数是用来标记这个字段总能能存储多少个数字,第二个参数表示小数点后有多少位。
-
Enum:枚举类型。指定某个字段只能是枚举中指定的几个值,不能为其他值。在ORM模型中,使用Enum来作为枚举,示例代码如下:
class News(Base):__tablename__ = 't_news'tag = Column(Enum("python",'flask','django'))
在python3中已经内置了enum这个枚举的模块,我们也可以使用这个模块去定义相关的字段
class TagEnum(enum.Enum):python = "python"flask = "flask"django = "django"class News(Base):__tablename__ = 't_news'id =Column(Integer,primary_key=True,autoincrement=True)tag = Column(Enum(TagEnum))news = News(tag=TagEnum.flask)
-
Date:存储时间,只能存储年月日。映射到数据库中是date类型。在Python代码中,可以使用 datetime.date 来指定
-
DateTime:存储时间,可以存储年月日时分秒毫秒等。映射到数据库中也是datetime类型。在Python代码中,可以使用
datetime.datetime
来指定。 -
Time:存储时间,可以存储时分秒。映射到数据库中也是time 类型。在Python代码中,可以使用 datetime.time 来至此那个。
class News(Base):__tablename__ = 't_news'create_time = Column(Time)news =News(create_time=time(hour=11,minute=11,second=11))
-
Text:存储长字符串。一般可以存储6W多个字符。如果超出了这个范围,可以使用LONGTEXT类型。映射到数据库中就是text 类型。
-
LONGTEXT:长文本类型,映射到数据库中是longtext类型。
代码演示:
from sqlalchemy import create_engine,Column,Integer,String,Float,Enum,Boolean,DECIMAL,Text,Date,DateTime,Time
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import sessionmakerimport enum
from datetime import date
from datetime import datetime
from datetime import time
#准备数据库的一堆信息 ip port user pwd
数据库的名称 按要求组织格式
HOSTNAME = '127.0.0.1'
PORT = '3306'
DATABASE = 'first_sqlalchemy'
USERNAME = 'root'
PASSWORD = 'root'
#dialect+driver://username:password@host:port/database?charset=utf8
#按照上述的格式来 组织数据库信息
DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE)#创建数据库引擎
engine = create_engine(DB_URI)
#创建会话对象
session = sessionmaker(engine)()
#定义一个枚举类
class TagEnum(enum.Enum):python="PYHTON2"flask="FLASK2"django ="DJANGO"
#创建一个ORM模型 说明基于sqlalchemy 映射到mysql数据库的常用字段类型有哪些?
Base = declarative_base(engine)class News(Base):__tablename__='news'id = Column(Integer,primary_key=True,autoincrement=True)price1 = Column(Float) #存储数据时存在精度丢失问题price2 = Column(DECIMAL(10,4))title = Column(String(50))is_delete =Column(Boolean)tag1 =Column(Enum('PYTHON','FLASK','DJANGO')) #枚举常规写法tag2 =Column(Enum(TagEnum)) #枚举另一种写法create_time1=Column(Date)create_time2=Column(DateTime)create_time3=Column(Time)content1 =Column(Text)content2 =Column(LONGTEXT)
# Base.metadata.drop_all()
# Base.metadata.create_all()
#新增数据到表news中
# a1 = News(price1=1000.0078,price2=1000.0078,title='测试数据',is_delete=True,tag1="PYTHON",tag2=TagEnum.flask,
# create_time1=date(2018,12,12),create_time2=datetime(2019,2,20,12,12,30),create_time3=time(hour=11,minute=12,second=13),
#content1="hello",content2="hello hi nihao")a1 =News(price1=1000.0078,price2=1000.0078,title='测试数据',is_delete=False,tag1="PYTHON",tag2=TagEnum.python,create_time1=date(2018,12,12),create_time2=datetime(2019,2,20,12,12,30),create_time3=time(hour=11,minute=12,second=13),content1="hello",content2="hello hi nihao")
session.add(a1)
session.commit()
Column常用参数
- primary_key:True设置某个字段为主键。
- autoincrement:True设置这个字段为自动增长的。
- default:设置某个字段的默认值。在发表时间这些字段上面经常用。
- nullable:指定某个字段是否为空。默认值是True,就是可以为 空。
- unique:指定某个字段的值是否唯一。默认是False。
- onupdate:在数据更新的时候会调用这个参数指定的值或者函 数。在第一次插入这条数据的时候,不会用onupdate的值,只 会使用default的值。常用于是 update_time 字段(每次更新数据的时候都要更新该字段值)。
- name:指定ORM模型中某个属性映射到表中的字段名。如果不 指定,那么会使用这个属性的名字来作为字段名。如果指定了, 就会使用指定的这个值作为表字段名。这个参数也可以当作位置参数,在第1个参数来指定。
title = Column(String(50),name='title',nullable=False)
title = Column('my_title',String(50),nullable=False)
from datetime import datetime
from sqlalchemy import
Column,Integer,DateTime,String
from db_util import Base,Session
class News(Base):__tablename__ = 't_news2'id = Column(Integer,primary_key = True,autoincrement = True)phone = Column(String(11),unique = True)title = Column(String(32),nullable =False)read_count = Column(Integer,default=1)create_time = Column(DateTime,default =datetime.now)update_time = Column(DateTime,default =datetime.now, onupdate =datetime.now ) # 当数据更新后,参数的内容才会更改def create_data():new1 = News(phone='16866666666',title='测试列参数')with Session() as session:session.add(new1)session.commit()def create_data2():# new1 = News(phone='16866666666',title='测试列参数') #不允许重复# new1 = News(phone='16866666668') #title不能为空# with Session() as session:# session.add(new1)# session.commit()with Session() as session:new1 = session.query(News).first() new1.read_count = 2session.commit()if __name__ == '__main__':# Base.metadata.create_all()# create_data()create_data2()
query函数的使用
-
模型名。指定查找这个模型中所有的属性(对应查询表为全表查询)
-
模型中的属性。可以指定只查找某个模型的其中几个属性
-
聚合函数
- func.count:统计行的数量。
- func.avg:求平均值。
- func.max:求最大值。
- func.min:求最小值。
- func.sum:求和.
提示
func
上,其实没有任何聚合函数,但是因为他底层做了一些魔术,只要mysql中有的聚合函数,都可以通过func调用from random import randint from sqlalchemy import Column,Integer,String,func from db_util import Base,Sessionclass Item(Base):__tablename__ = 't_item'id = Column(Integer,primary_key = True,autoincrement = True)title = Column(String(32))price = Column(Integer)def create_data():with Session() as ses:for i in range(10):item = Item(title = f'产品:{i+1}',price=randint(1,100))ses.add(item)ses.commit()def query_model_name():# 获取所有的字段with Session() as ses:rs = ses.query(Item).all()for r in rs:print(r.title)def query_model_attr():# 获取指定的字段with Session() as ses:rs = ses.query(Item.title,Item.price).all()for r in rs:print(r.price)def query_by_func():# 统计指定的列数据with Session() as ses:# rs = ses.query(func.count(Item.id)).first()# rs = ses.query(func.max(Item.price)).first()# rs = ses.query(func.avg(Item.price)).first()rs = ses.query(func.sum(Item.price)).first()print(rs)if __name__ =='__main__':# Base.metadata.create_all()# create_data()# query_model_name()# query_model_attr()query_by_func()
filter过滤数据
-
equals
news= session.query(News).filter(News.title == "title1").first()
-
not equals
query(User).filter(User.name != 'ed')
-
like & like [不区分大小写]
query(User).filter(User.name.like('%ed%'))
-
in
query(User).filter(User.name.in_(['ed','wendy','jack']))
-
not in
query(User).filter(~User.name.in_(['ed','wendy','jack']))
-
is null
query(User).filter(User.name==None) # 或者是 query(User).filter(User.name.is_(None))
-
is not null
query(User).filter(User.name != None) # 或者是 query(User).filter(User.name.isnot(None))
-
and
query(User).filter(and_(User.name=='ed',User.fullname=='Ed Jones')) # 或者是传递多个参数 query(User).filter(User.name=='ed',User.fullname=='Ed Jones') # 或者是通过多次filter操作 query(User).filter(User.name=='ed').filter(User.fullname=='Ed Jones')
-
or
query(User).filter(or_(User.name=='ed',User.name=='wendy'))
表关系
外键
使用SQLAlchemy创建外键非常简单。在从表中增加一个字段,指定这个字段外键的是哪个表的哪个字段就可以了。从表中外键的字段,必须和主表的主键字段类型保持一致。
class User(Base):__tablename__ = 't_user'id = Column(Integer,primary_key=True,autoincrement=True)uname =Column(String(50),nullable=False,name='name')class News(Base):__tablename__ = 't_news'id = Column(Integer,primary_key=True,autoincrement=True)title = Column(String(50),nullable=False)content = Column(Text,nullable=False)uid = Column(Integer,ForeignKey('t_user.id',)
外键约束
RESTRICT:若子表中有父表对应的关联数据,删除父表对应数据,会阻止删除。默认项
NO ACTION:在MySQL中,同RESTRICT。
CASCADE:级联删除。
SET NULL:父表对应数据被删除,子表对应数据项会设置为NULL。
from sqlalchemy import Column,Integer,String,Text,ForeignKey
from db_util import Base,Session class User(Base):__tablename__ = 't_user'id = Column(Integer,primary_key=True,autoincrement=True)uname =Column(String(50),nullable=False,name='name')class News(Base):__tablename__ = 't_news'id = Column(Integer,primary_key=True,autoincrement=True)title = Column(String(50),nullable=False)content = Column(Text,nullable=False)# uid =Column(Integer,ForeignKey('t_user.id')) #默认不让删主表数据# uid =Column(Integer,ForeignKey('t_user.id',ondelete = 'RESTRICT')) # 默认的策略# uid =Column(Integer,ForeignKey('t_user.id',ondelete = 'NO ACTION')) # 默认的策略# uid =Column(Integer,ForeignKey('t_user.id',ondelete = 'CASCADE')) # 级联删除,发主表的数据被删除,子表的里数据也会删除uid =Column(Integer,ForeignKey('t_user.id',ondelete = 'SET NULL')) # 发现主表数据被删除时,子表的数据列会清空
一对多
SQLAlchemy提供了一个 relationship ,这个类可以定义属性,以后在访问相关联的表的时候就直接可以通过属性访问的方式就可以访问得 到了。
另外,可以通过 backref 来指定反向访问的属性名称。newss是指有多 篇新闻。他们之间的关系是一个“一对多”的关系
from sqlalchemy import Column,Integer,String,Text,ForeignKey
from sqlalchemy.orm import relationship
from db_util import Base,Sessionclass User(Base):__tablename__ = 't_user'id = Column(Integer,primary_key=True,autoincrement=True)uname = Column(String(50),nullable=False,name='name')# news = relationship('News') # 不友好def __repr__(self):return f'<User: id={self.id} uname={self.uname}>'# 1对多 ForeignKey的关键字要建立在 多一边
class News(Base):__tablename__ = 't_news'id = Column(Integer,primary_key=True,autoincrement=True)title = Column(String(50),nullable=False)content = Column(Text,nullable=False)uid = Column(Integer,ForeignKey('t_user.id'))user = relationship('User',backref='news') # 将主表的数据注入到这个字段def __repr__(self):return f'<News: id={self.id} title={self.title} content={self.content} uid=
{self.uid}>'def create_data():user = User(uname = 'sxt')news1 = News(title='Python',content='flask',uid = 1)news2 = News(title='MySQL',content='SQL',uid = 1)with Session() as ses:ses.add(user)ses.commit()with Session() as ses:ses.add(news1)ses.add(news2)ses.commit()def query_data():with Session() as ses:# news1 = ses.query(News).first()# print(news1)# select u.id u.uname from t_news n left join t_user u n.uid = u.id where n.id =1;news1 = ses.query(News).first()uid = news1.uiduser = ses.query(User).first()print(user)def query_data2():# 通地子表查询主表的数据with Session() as ses:news1 = ses.query(News).first()print(news1.user)def query_data3():# 通地主表查找子表的数据with Session() as ses:user1 = ses.query(User).first()print(user1.news)if __name__ == '__main__':# Base.metadata.create_all()# create_data()# query_data()# query_query_data3()data2()query_data3()
一对一
在sqlalchemy中,如果想要将两个模型映射成一对一的关系,那么 应该在父模型中,指定引用的时候,要传递一个 uselist=False 这个参数进去。
就是告诉父模型,以后引用这个从模型的时候,不再是一个列表了,而是一个对象了
方法1
class LoginUser(Base):__tablename__ = 't_user_login'id = Column(Integer,primary_key=True,autoincrement=True)uname = Column(String(32),nullable=False)passwd = Column(String(32),nullable=False)user = relationship('User',uselist=False) # 不友好,总有警告class User(Base):__tablename__ = 't_user'id = Column(Integer,primary_key=True,autoincrement=True)name = Column(String(32),nullable=False,name='name')gender = Column(String(1))address = Column(String(64))login_id = Column(Integer,ForeignKey('t_user_login.id'))login_user = relationship('LoginUser')
方法2
也可以借助 sqlalchemy.orm.backref 来简化代码
class LoginUser(Base):__tablename__ = 't_user_login'id = Column(Integer,primary_key=True,autoincrement=True)uname = Column(String(32),nullable=False)passwd =Column(String(32),nullable=False)# 创建1对1的关系, 创建一个字段来做别一个表的标识(外键)
class User(Base):__tablename__ = 't_user'id =Column(Integer,primary_key=True,autoincrement=True)name =Column(String(32),nullable=False,name='name')gender = Column(String(1))address = Column(String(64))login_id = Column(Integer,ForeignKey('t_user_login.id'))login_user = relationship('LoginUser',backref=backref('user',uselist=False))
示例:
from sqlalchemy import Column,Integer,String,Text,ForeignKey
from sqlalchemy.orm import relationship,backref
from db_util import Base,Sessionclass LoginUser(Base):__tablename__ = 't_user_login'id = Column(Integer,primary_key=True,autoincrement=True)uname = Column(String(32),nullable=False)passwd = Column(String(32),nullable=False)# user = relationship('User',uselist=False) # 不友好,总有警告def __repr__(self): return f'<User: id={self.id} uname={self.uname} passwd={self.passwd}># 创建1对1的关系, 创建一个字段来做别一个表的标识(外键)
class User(Base):__tablename__ = 't_user'id = Column(Integer,primary_key=True,autoincrement=True)name = Column(String(32),nullable=False,name='name')gender = Column(String(1))address = Column(String(64))login_id = Column(Integer,ForeignKey('t_user_login.id'))login_user = relationship('LoginUser',backref=backref('user',uselist=False))def __repr__(self):return f'<User: id={self.id} name={self.name} gender={self.gender} address=
{self.address}>'def create_data():login = LoginUser(uname = 'baizhan',passwd = '123')user = User(name='百战',gender ='女',address ='北京')# login.user = user # 建立关联关系user.login_user = loginwith Session() as ses:ses.add(user)ses.commit()def query_data():# with Session() as ses:# login =ses.query(LoginUser).first()# print(login.user)with Session() as ses:user = ses.query(User).first()print(user.login_user)if __name__ == '__main__':# Base.metadata.create_all()# create_data()query_data()
多对多
- 多对多的关系需要通过一张中间表来绑定他们之间的关系。
- 先把两个需要做多对多的模型定义出来
- 使用Table定义一个中间表,中间表一般就是包含两个模型的外键字段就可以了,并且让他们两个来作为一个“复合主键”
- 在两个需要做多对多的模型中随便选择一个模型,定义一个 relationship属性,来绑定三者之间的关系,在使用relationship 的时候,需要传入一个secondary=中间表对象名
from sqlalchemy import Column,Integer,String,ForeignKey
from sqlalchemy import Table
from sqlalchemy.orm import relationship,backref
from db_util import Base,Session# 创建第3张表,来建立多对多关系
# 放到2个模型之上
news_tag = Table('t_news_tag',Base.metadata,Column('news_id',Integer,ForeignKey('t_news.id'),primary_key = True),Column('tag_id',Integer,ForeignKey('t_tag.id'),primary_key = True),)
class News(Base):__tablename__ = 't_news'id = Column(Integer,primary_key=True,autoincrement=True)title = Column(String(32),nullable=False)tags = relationship('Tag',backref='newss',secondary= news_tag)def __repr__(self): return f'<News: id={self.id} title={self.title}>'class Tag(Base):__tablename__ = 't_tag'id = Column(Integer,primary_key=True,autoincrement=True)name = Column(String(32),nullable=False)# news = relationship('News',backref='tags',secondary= news_tag)def __repr__(self): return f'<Tag: id={self.id} name={self.name}>'def create_data():news1 = News(title = 'Python更新了!')news2 = News(title = 'SQLAlchemy功能又强大了!')tag1 = Tag(name = 'IT新闻')tag2 = Tag(name ='科学技术')news1.tags.append(tag1)news1.tags.append(tag2)news2.tags.append(tag1)news2.tags.append(tag2)with Session() as ses:ses.add(news1)ses.add(news2)ses.commit()def query_data():with Session() as ses:news = ses.query(News).first()print(news.tags)if __name__ == '__main__':# Base.metadata.create_all()# create_data()query_data()
ORM层面删除数据注意事项
ORM层面删除数据,会无视mysql级别的外键约束
直接会将对应的数据删除,然后将从表中的那个外键设置为NULL, 也就是数据库的 SET NULL 。
如果想要避免这种行为,应该将从表中的外键的 nullable=False 。
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from db_util import Base, Sessionclass User(Base):__tablename__ = 't_user'id = Column(Integer, primary_key=True,autoincrement=True)name = Column(String(32))class Article(Base):__tablename__ = 't_article'id = Column(Integer, primary_key=True,autoincrement=True)title = Column(String(32))uid = Column(Integer,ForeignKey("t_user.id"))# uid = Column(Integer,ForeignKey("t_user.id"),nullable = False)user =relationship('User',backref='articles')def create_data():Base.metadata.drop_all() # 删除已有的表Base.metadata.create_all() # 创建表# 初始化数据user = User(name='SXT')art1 = Article(title='Python', uid=1)art2 = Article(title='MySQL', uid=1)user.articles.append(art1)user.articles.append(art2)with Session() as ses:ses.add(user)ses.commit()def delete_data():# 默认删除主表数据时,会将子表的引用主表数据的外键设置Nullwith Session() as ses:user = ses.query(User).first()ses.delete(user)ses.commit()if __name__ == '__main__':# create_data()delete_data()
ORM层面的relationship方法中cascade
cascade属性值为:
- save-update:默认选项。在添加一条数据的时候,会把其他和他 相关联的数据都添加到数据库中。这种行为就是save-update属性 影响的。
- delete:表示当删除某一个模型中的数据的时候,是否也删掉使用 relationship和他关联的数据。
- delete-orphan:表示当对一个ORM对象解除了父表中的关联对象 的时候,自己便会被删除掉。当然如果父表中的数据被删除,自己 也会被删除。这个选项只能用在一对多上,并且还需要在子模型中 的relationship中,增加一个single_parent=True的参数。
- merge:默认选项。当在使用session.merge,合并一个对象的时 候,会将使用了relationship相关联的对象也进行merge操作。
- expunge:移除操作的时候,会将相关联的对象也进行移除。这个 操作只是从session中移除,并不会真正的从数据库中删除。
- all:是对save-update, merge, refresh-expire, expunge, delete 几种的缩写。
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship,backref
from db_util import Base, Sessionclass User(Base):__tablename__ = 't_user'id = Column(Integer, primary_key=True,autoincrement=True)name = Column(String(32))# articles =relationship('Article',backref='user',cascade='')# articles =relationship('Article',backref='user',cascade='save-update') # 默认cascade的值是saveupdate# articles =relationship('Article',backref='user',cascade='save-update,delete') # delete可以帮助删除关联表的数据# articles = relationship('Article',backref='user',cascade='save-update,delete,deleteorphan',single_parent=True) # 当关联关系被解除时,子表数据会被清空
class Article(Base):__tablename__ = 't_article'id = Column(Integer, primary_key=True,autoincrement=True)title = Column(String(32))uid = Column(Integer,ForeignKey("t_user.id"))# user =relationship('User',backref='articles',cascade='save-update,delete') # 会把主表的数据删除user = relationship('User',backref=backref('articles',cascade='save-update,delete,deleteorphan'))def create_data():Base.metadata.drop_all() # 删除已有的表Base.metadata.create_all() # 创建表# 初始化数据user = User(name='SXT')art1 = Article(title='Python', uid=1)art2 = Article(title='MySQL', uid=1)user.articles.append(art1)user.articles.append(art2)# 保存数据with Session() as ses:ses.add(user)ses.commit()def delete_data():with Session() as ses:user = ses.query(User).first()ses.delete(user)ses.commit()
def delete_art():with Session() as ses:art = ses.query(Article).first()ses.delete(art)ses.commit()def update_data():with Session() as ses:user = ses.query(User).first()user.articles = []ses.commit()if __name__ == '__main__':# create_data()# delete_data()# update_data()delete_art()
排序
order_by方法排序:可以指定根据模型中某个属性进行排序,"模型名.属性名.desc()"代表的是降序排序。
relationship的方法中order_by属性:在指定relationship方法的时候,添加order_by属性来指定排序的字段。
方法1: order_by方法指定
# 升序
users = ses.query(User).order_by(User.age).all()
# 降序
users = ses.query(User).order_by(User.age.desc()).all()
方法2:涉及两表时,定义模型时,用relationship方法中的order_by属性指定排序方式
from random import randint
from sqlalchemy import Column,Integer,String,ForeignKey
from sqlalchemy.orm import relationship,backref
from db_util import Base,Sessionclass User(Base):__tablename__ = 't_user'id = Column(Integer, primary_key=True,autoincrement=True)name = Column(String(32))age = Column(Integer)def __repr__(self):return f'<User: id={self.id} name={self.name} age={self.age}>' class News(Base):__tablename__ = 't_news'id = Column(Integer,primary_key=True,autoincrement=True)title = Column(String(32),nullable=False)content = Column(String(32),nullable=False)read_count = Column(Integer)uid = Column(Integer,ForeignKey('t_user.id'))user = relationship('User',backref=backref('newss',order_by=read_count))def __repr__(self):return f'<User: id={self.id} title={self.title} content={self.content}
read_count={self.read_count}>' def create_user():with Session() as ses:for i in range(10):user = User(name = f'用户{i}',age= randint(6,20))ses.add(user)for i in range(10):news = News(title = f'新闻{i}',content ='新闻',read_count =randint(1,1000))user.newss.append(news)ses.commit()def query_user():with Session() as ses:users = ses.query(User).all()for i in users[-1].newss:print(i)if __name__ == '__main__':# Base.metadata.drop_all()# Base.metadata.create_all()# create_user()query_user()
注意
__mapper_args__
参数的1.1版本已被抛弃
limit、offset、slice使用
- limit:可以限制查询的时候只查询前几条数据。 属top-N查询
- offset:可以限制查找数据的时候过滤掉前面多少条。可指定开 始查询时的偏移量。
- 切片:可以对Query对象使用切片操作,来获取想要的数据。
- 可以使用 slice(start,stop) 方法来做切片操作。
- 也可以使用 [start:stop] 的方式来进行切片操作。
- 一般在实际开发中,中括号的形式是用得比较多的。
from random import randint
from sqlalchemy import Column,Integer,String
from db_util import Base,Sessionclass News(Base):__tablename__ = 't_news'id =Column(Integer,primary_key=True,autoincrement=True)title =Column(String(32),nullable=False)content =Column(String(32),nullable=False)read_count = Column(Integer)def __repr__(self):return f'<User: id={self.id} title={self.title} content={self.content}
read_count={self.read_count}>' def create_data():Base.metadata.drop_all()Base.metadata.create_all()with Session() as ses:for i in range(10):news = News(title=f'title{i}',content=f'info{i}',read_count= randint(0,1000))ses.add(news)ses.commit()def query_by_limit():with Session() as ses:newss = ses.query(News).limit(3).all()for n in newss:print(n)
def query_by_offset():with Session() as ses:newss = ses.query(News).offset(3).all()for n in newss:print(n) def query_by_page():# limit topN数据# offset 跳过n数据# 分页效果 1-3 4-6 7-9 # 3 0 1 (pagenum-1)*pagesize# 3 3 2 (2-1)*3 = 3# 3 6 3 (3-1)*3 = 6# 3 9 4 (4-1)*3 = 6with Session() as ses:# (pagenum-1)*pagesizenewss = ses.query(News).limit(3).offset(3).all()for n in newss:print(n)
def query_by_slice():with Session() as ses:# 从哪个索引开始,到哪个索引结束newss = ses.query(News).slice(3,6).all()for n in newss:print(n)
def query_by_qiepian():with Session() as ses:# 从哪个索引开始,到哪个索引结束newss = ses.query(News).all()[3:6]for n in newss:print(n)if __name__ == '__main__':# create_data()# query_by_limit()# query_by_offset()# query_by_page()# query_by_slice()query_by_qiepian()
懒加载
数据库迁移工具alembic使用
alembic是sqlalchemy的作者开发的,用来做ORM模型与数据库的 迁移与映射,
alembic使用方式跟git有点了类似,
alembic的所有命令都是以alembic开头,
alembic的迁移文件也是通过版本进行控制的,
安装
pip install alembic
使用
如创建一个models.py模块,然后在里面定义需要的模型类:
from sqlalchemy import Column,String,Integer,create_engine
from sqlalchemy.ext.declarative import declarative_baseHOSTNAME = '127.0.0.1'
PORT = '3306'
DATABASE = 'alembic_demo'
USERNAME = 'root'
PASSWORD = 'root'
DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE)
engine = create_engine(DB_URI)
Base = declarative_base(engine)class User(Base):__tablename__ = 'user'id =Column(Integer,primary_key=True,autoincrement=True)uname = Column(String(50),nullable=False)country = Column(String(50))
# ORM -> 迁移文件 -> 映射到数据库中
# import os
# print(os.path.dirname(__file__))
修改配置文件
在 alembic.ini 中,给 sqlalchemy.url 项设置数据库的连接方式。方式跟 sqlalchemy的方式是一样的。
sqlalchemy.url = driver://user:pass@localhost/dbname
给 sqlalchemy.url 项设置数据库的连接操作为:
sqlalchemy.url = mysql+pymysql://root:root@localhost/alembic_demo?charset=utf8
为了使用模型类更新数据库,需要在 alembic/env.py 文件中设置 target_metadata项,默认为target_metadata=None。
需要将 target_metadata 的值设置为模型 Base.metadata ,但是要导入 models
使用sys模块和os模块把当前项目的路径导入到path中:
导入 models 的操作为:
import sys,os
sys.path.append(os.path.dirname(os.path.dirname(__file__)))import models
设置target_metadata项操作为
target_metadata = models.Base.metadata
自动生成迁移文件
使用alembic revision --autogenerate -m "提示信息"将当前模型中 的状态生成迁移文件。
将生成的迁移文件映射到数据库中
使用alembic upgrade head将刚刚生成的迁移文件,真正映射到数 据库中。
同理,如果要降级,那么使用alembic downgrade head。
以后如果修改了模型,重复4、5步骤
常用alembic命令和参数解释
- init:创建一个alembic仓库。
- revision:创建一个新的版本文件。
- --autogenerate:自动将当前模型的修改,生成迁移脚本。
- -m:本次迁移做了哪些修改,用户可以指定这个参数,方便回顾
- upgrade:将指定版本的迁移文件映射到数据库中,会执行版本 文件中的upgrade函数。
如果有多个迁移脚本没有被映射到数据库中,那么会执行多个迁移脚本
- [head]:代表最新的迁移脚本的版本号。
- downgrade:会执行指定版本的迁移文件中的downgrade函数。
- heads:展示head指向的脚本文件版本号。
- history:列出所有的迁移版本及其信息。
- current:展示当前数据库中的版本号。
另外,在你第一次执行upgrade的时候,就会在数据库中创建一个名叫alembic_version表,这个表只会有一条数据,记录当前数据库映射的是哪个版本的迁移文件。
常见错误及解决办法
问题
创建新版本时报错 FAILED: Target database is not up to date
原因
主要是heads和current不相同。current落后于heads的版本
解决办法
将current移动到head上。alembic upgrade head
问题
创建新版本时报错 KeyError: '087f047901d6' 或者 FAILED: Can't locate revision identified by 'da3a8bee2343'
原因
数据库中存的版本号不在迁移脚本文件中
解决办法
删除versions中所有的迁移文件,删除数据库所有表
Flask-SQLAlchemy和alembic结合使用
-
配置好数据库连接文件 如config.py
HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'fs_alembic_demo' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) SQLALCHEMY_DATABASE_URI = DB_URI
-
将config.py文件 结合flask项目主要运行文件 如main.py
from flask import Flask from flask_sqlalchemy import SQLAlchemy import configapp = Flask(__name__) app.config.from_object(config) db = SQLAlchemy(app)class User(db.Model):__tablename__ = 'user'id = db.Column(db.Integer,primary_key=True,autoincrement=True)uname =db.Column(db.String(50),nullable=False)age = db.Column(db.Integer)gender=db.Column(db.String(2))@app.route('/') def hello_world():return 'Hello World!'if __name__ == '__main__':app.run()
-
使用alembic创建一个仓库(初始化仓库)
- 打开dos系统界面
- cd到当前项目目录中,注意:如果想要使用alembic,则需要先进入到安装了alembic的虚拟 环境中,不然就找不到这个命令。
- 然后执行命令 “alembic init [仓库的名字,推荐使用alembic]”
-
修改配置文件
在 alembic.ini 中,给 sqlalchemy.url 项设置数据库的连接方式。方式跟 sqlalchemy的方式是一样的。
sqlalchemy.url = driver://user:pass@localhost/dbname
给 sqlalchemy.url 项设置数据库的连接操作为:
sqlalchemy.url = mysql+pymysql://root:root@localhost/fs_ale mbic_demo?charset=utf8
为了使用模型类更新数据库,需要在 alembic/env.py 文件中设置 target_metadata项,默认为target_metadata=None。 需要将 target_metadata 的值设置为模型 Base.metadata ,但是要导入 momo 使用sys模块和os模块把当前项目的路径导入到path中: 导入 momo 的操作为:
import sys,ossys.path.append(os.path.dirname(os.path.dirname(__file__ )))import main
设置target_metadata项操作为:
target_metadata = main.db.Model.metadata
-
自动生成迁移文件
使用alembic revision --autogenerate -m "提示信息"将当前模型中的状态生成迁移文件。
-
将生成的迁移文件映射到数据库中
使用alembic upgrade head将刚刚生成的迁移文件,真正映射到数据库中。
同理,如果要降级,那么使用alembic downgrade head。
-
以后如果修改了模型,重复5、6步骤
Flask-Migrate
flask-migrate是基于Alembic进行的一个封装,并集成到Flask中, 所有的迁移操作其实都是Alembic做的,他能跟踪模型的变化,并将变化映射到数据库中
安装
pip install flask-migrate
使用方法
from flask import Flask
from flask_sqlalchemy import SQLAlchemyapp = Flask(__name__)# 数据库的变量
HOST = '192.168.30.151' #
127.0.0.1/localhost
PORT = 3306
DATA_BASE = 'flask_db'
USER = 'root'
PWD = '123'
DB_URI = f'mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
#mysql+pymysql://root:123@192.168.30.151/flask_db
app.config['SQLALCHEMY_DATABASE_URI'] =DB_URI
app.config['SQLALCHEMY_TRACK_MODIFICATIONS']= False
db = SQLAlchemy(app)# 创建模型类
class User(db.Model):__tablename__ = 't_user'id = db.Column(db.Integer,primary_key =True,autoincrement = True)name = db.Column(db.String(32))age = db.Column(db.Integer)def __repr__(self):return f'<User id={self.id} name={self.name}>'from flask_migrate import MigrateMigrate(app,db)
注意
创建Migrate(app,db)对象
-
创建迁移仓库
这个命令会创建migrations文件夹,所有迁移文件都放在里面
flask db init
-
生成脚本文件
flask db migrate
-
更新数据库
flask db upgrade
-
返回以前的版本
flask db downgrade version_
Flask项目结构重构
基本结构如下:可根据实际需求做微小调整。
|project_name
|--pro_name # 整个程序的包目录
|----__init__.py # 项目包文件
|----templates # 模板文件
|------common # 通用模板
|------errors # 错误页面
|------user # 用户模板
|------email # 邮件模板
|----static # 静态资源文件
|------js # JS脚本
|------css # 样式表
|------img # 图片
|------favicon.ico # 网站图表
|----user # 用户模块
|------__init__.py # 用户模块-包文件
|------views.py # 用户模块-视图文件
|----item # 产品模块
|------__init__.py # 产品模块-包文件
|------views.py # 产品模块-视图文件
|----models.py # 数据模型
|--app.py # 项目启动控制文件
|--config.py # 配置文件
|--requirements.txt # 依赖包列表
|--migrations # 数据库迁移目录
注意1
整个程序的包目录名不能为 app ,不然会报
Error: Failed to find Flask application or factory in module 'app'. Use 'FLASK_APP=app:name' to specify one.
注意2
项目启动控制文件名为 app.py ,不然会报
Error: Could not locate a Flask application. You did not provide the "FLASK_APP" environment variable, and a "wsgi.py" or "app.py" module was not found in the current directory.
解决方案2
使用.env文件解决
# .env
FLASK_APP=pro_name/init:create_app()
# manager
from pro_name import create_app
# pip install python-dotenvif __name__ == '__main__':app = create_app('dev')app.run()
# config
class BaseConfig:# 数据库的变量HOST = '192.168.30.151' #127.0.0.1/localhostPORT = 3306DATA_BASE = 'flask_db'USER = 'root'PWD = '123'DB_URI = f'mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'SQLALCHEMY_DATABASE_URI = DB_URISQLALCHEMY_TRACK_MODIFICATIONS = Falseclass DevelopmentConfig(BaseConfig):DEBUG = Trueclass ProductionConfig(BaseConfig):DEBUG = Falseconfig = {'dev': DevelopmentConfig,'pro': ProductionConfig,'base': BaseConfig
}
# pro_name/__init__.py
from config import config
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migratedb = SQLAlchemy()def create_app(model = 'base'):app = Flask(__name__)# app.config.from_pyfile('config.py')obj = config.get(model)app.config.from_object(obj)db.init_app(app)Migrate(app,db)from pro_name.user import user_bpapp.register_blueprint(user_bp)return app
# models.py
from pro_name import db
# 创建模型类
class User(db.Model):__tablename__ = 't_user'id = db.Column(db.Integer,primary_key =
True,autoincrement = True)name = db.Column(db.String(32))pwd = db.Column(db.String(32))age = db.Column(db.Integer)city = db.Column(db.String(32))def __repr__(self):return f'<User id={self.id} name={self.name}>'
# pro_name/user/__init__.py
from flask.blueprints import Blueprint
user_bp = Blueprint('user',__name__)
from pro_name.user import view
# pro_name/user/view.py
from flask.views import MethodView
from flask import request,render_template
from pro_name.models import User
from pro_name.user import user_bp
class LoginView(MethodView):def _jump(self,msg = None):return
render_template('login.html',msg = msg)def get(self,msg = None):return self._jump()def post(self):uname = request.form.get('name')pwd = request.form.get('pwd')user = User.query.filter(User.name ==uname,User.pwd == pwd).first()if user:return '登录成功!'else:return self._jump(msg='登录的用户名或密码错误')user_bp.add_url_rule('/login/',view_func=LoginView.as_view('login'))
Ajax
Ajax基本使用
-
创建Ajax
XMLHttpRequest
对象let xhr = new XMLHttpRequest()
-
设置AJAX请求地址以及请求方式
通过 XMLHttpRequest.open() 方法用于指定 HTTP 请求的参数,或者说初始化 XMLHttpRequest 实例对象。它一共可以接受五个参数。
void open(string method,string url,optional boolean async,optional string user,optional string password );
- method :表示 HTTP 动词方法,比如 GET 、 POST 、 PUT 、 DELETE 、 HEAD 等。
- url : 表示请求发送目标 URL。
- async : 布尔值,表示请求是否为异步,默认为 true 。如果设为 false ,则 send() 方 法只有等到收到服务器返回了结果,才会进行下一步操作。该参数可选。由于同步 AJAX 请求会造成浏览器失去响应,许多浏览器已经禁止在主线程使用,只允许 Worker 里面使用。所以,这个参数轻易不应该设为 false 。
- user :表示用于认证的用户名,默认为空字符串。该参数可选。
- password :表示用于认证的密码,默认为空字符串。该参数可选。
案例:
xhr.open('GET','http://www.example.com')
-
发送请求
XMLHttpRequest.send() 方法用于实际发出 HTTP 请求。它的参数是可选 的,如果不带参数,就表示 HTTP 请求只包含头信息,也就是只 有一个 URL,典型例子就是 GET 请求;如果带有参数,就表示 除了头信息,还带有包含具体数据的信息体,典型例子就是 POST 请求。
xhr.send()
-
获取服务器端给客户端的响应数据 XMLHttpRequest 对象可以对以下事件指定监听函数
- XMLHttpRequest.onloadstart:loadstart 事件(HTTP 请求发出)的监听函数
- XMLHttpRequest.onprogress:progress事件(正在发送和加载数据)的监听函数
- XMLHttpRequest.onabort:abort 事件(请求中止,比如用户调用了 abort() 方法)的监听函数
- XMLHttpRequest.onerror:error 事件(请求失败)的监听函数
- XMLHttpRequest.onload:load 事件(请求成功完成)的监听函数
- XMLHttpRequest.ontimeout:timeout 事件(用户指定的时限超过了,请求还未完成)的监听函数
- XMLHttpRequest.onloadend:loadend 事件(请求完成,不管成功或失败)的监听函数
- XMLHttpRequest.onreadystatechange: readystatechange事件(当 readyState 属性变化)的监听函数
xhr.onload = function()
完整代码
// 创建AJAX对象
let xhr = new XMLHttpRequest()
// 设置请求的参数
xhr.open('GET','http://httpbin.org/get')
// 发送请求
xhr.send()
// 获取响应数据
xhr.onload = function () {// 获取响应的文本内容content = xhr.responseText// 打印数据到控制台console.log(content)// 获取div标签info_tag = document.getElementById('info')// 将数据填充到div中info_tag.innerHTML = content
}
AJAX的get请求参数
在get请求中,参数是拼接在url中的,所以此时可以获取到参数, 拼接到url即可
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>02_AJAX的get的参数传递</title>
</head>
<body><h1>02_AJAX的get的参数传递</h1>name:<input type="text" id="name"><br/>passwd: <input type="password" id="pwd">
<br/><input type="button" value="获取数据" onclick="submitForm()"><script>function submitForm(){let name = document.getElementById('name').valuelet pwd = document.getElementById('pwd').valueurl = 'http://httpbin.org/get'url = url +"?name="+name+"&pwd="+pwdlet xhr = new XMLHttpRequest()xhr.open('get',url)xhr.send()xhr.onload = function(){console.log(xhr.responseText)}}</script>
</body>
</html>
AJAX的post的使用
AJAX使用post的请求基本一样。但在传参时,有些不同,参数应该 放在body中,并通过XMLHttpRequest.setRequestHeader() 设置请求信息的格式
XMLHttpRequest.setRequestHeader() 方法用于设置浏览器发送的 HTTP 请求的头信息。该方法接受两个参数。第一个参数是字符串,表示头信息的字段名,第二个参数是字段值
注意
XMLHttpRequest.setRequestHeader() 该方法必须在 open() 之后、 send() 之前调 用。如果该方法多次调用,设定同一个字段,则每一次调用的 值会被合并成一个单一的值发送。
str的请求参数方式传递
function submitForm1(){// 获取input里面的数据uname =
document.getElementById('uname').valuepwd =
document.getElementById('pwd').value// 拼接参数args = 'uname='+uname+'&pwd='+pwd// 创建xhr对象let xhr = new XMLHttpRequest()// 设置请求的方式与请求地址xhr.open('POST','http://httpbin.org/post')// 设置请求内容的类型(推荐)xhr.setRequestHeader('ContentType','application/x-www-form-urlencoded')// 发送请求xhr.send(args)// 获取服务器响应xhr.onload = () => {console.log(xhr.responseText)}
}
json的请求参数方式传递
function submitForm2(){// 获取input里面的数据uname = document.getElementById('uname').valuepwd = document.getElementById('pwd').value// 拼接参数args = {'uname':uname,'pwd':pwd}args = JSON.stringify(args)// 创建xhr对象let xhr = new XMLHttpRequest()// 设置请求的方式与请求地址xhr.open('POST','http://httpbin.org/post')// 设置请求内容的类型(推荐)xhr.setRequestHeader('ContentType','application/json')// 发送请求xhr.send(args)// 获取服务器响应xhr.onload = () => {console.log(xhr.responseText)}
}
注意:
get 请求是不能提交 json 对象数据格式的,传统网站的表单提 交也是不支持 json 对象数据格式的
获取服务器端的响应
XMLHttpRequest.readyState 返回一个整数,表示实例对象的当前状态。该属 性只读。它可能返回以下值。
0:表示 XMLHttpRequest 实例已经生成,但是实例的 open() 方法还没有被调用。
1:表示 open() 方法已经调用,但是实例的 send() 方法还没有调用,仍然可以使用实例的 setRequestHeader() 方法,设定 HTTP 请求的头信息。
2:表示实例的 send() 方法已经调用,并且服务器返回的头信息和状态码已经收到。
3:表示正在接收服务器传来的数据体(body 部分)。这时,如果实例的 responseType 属性等于 text 或者空字符串, responseText 属性就会包含已经收到的部分信息。
4:表示服务器返回的数据已经完全接收,或者本次接收已经失败。
HTTP状态码
XMLHttpRequest.status 属性返回一个整数,表示服务器回应的 HTTP 状态 码。一般来说,如果通信成功的话,这个状态码是200;如果服务器没有返回状态码,那么这个属性默认是200。请求发出之前,该属性为 0 。
该属性只读在 XMLHttpRequest.onreadystatechange 事件中,我们规定当服务器响应已做好被处理的准备时,再执行任务
function get_data(){// 创建xhr对象let xhr = new XMLHttpRequest()// 设置请求的方式与请求地址xhr.open('get','http://httpbin.org/get')// 发送请求xhr.send()// 获取服务器响应xhr.onreadystatechange = () => {// 判断ajax状态码是否为4if (xhr.readyState == 4){// 判断Http状态码是否为200if (xhr.status == 200){//console.log(xhr.responseText)// 将数据转换成json类型data = JSON.parse(xhr.responseText)console.log(data)}else{// 如果没有正常响应做出处理console.log(xhr.status)}}else{console.log(xhr.readyState)}}
}
服务器端响应的数据格式
在真实的项目中,服务器端大多数情况下会以JSON对象作为响应数 据的格式。
在http请求与响应的过程中,无论是请求参数还是响应内容,如果是对象类型,一般都会被转换为对象字符串进行传输
JSON.parse() //将json字符串转换为json对象
AJAX错误处理
网络畅通,服务器端能接收到请求,服务器端返回的结果不是预期 的结果。可以判断服务器端返回的状态码,分别进行处理。 xhr.status获取http状态码
- 网络畅通,服务器端没有接收到请求,返回404状态。
- 检查请求地址是否错误 网络畅通,服务器端能接收到请求,服务器端返回500状态码
- 网络中断,请求无法发送到服务器端
在AJAX中设置一个专门处理请求失败的监听函数 XMLHttpRequest.onerror : error 事件(请求失败)的监听函数
xhr.onerror = function()
JQuery中的AJAX使用
-
jquery线上地址
jQuery.AJAX([settings])– type– url– data– contentType– beforSend 发送请求前可修改 XMLHttpRequest 对象的函数,如添加自定义 HTTP 头– success– error
-
参数的传递
json的形式传递
$.AJAX({type:'post',url:'http://httpbin.org/post',data:{name:'zs',age:18}// 参数会被转成name=zs&age=18success: function(resp){console.log(resp)} })
str的形式传递
$.AJAX({type:'post',url:'http://httpbin.org/get',data : 'name=zs&age=18'success: function(resp){console.log(resp)} })
服务器要求传递JSON
$.AJAX({type:'post',url:'http://httpbin.org/get',data:JSON.stringify({name:'zs',age:18})// 注意这一定要用JSON.stringify转换contentType: 'application/json'success: function(resp){console.log(resp)} })
beforeSend的使用
主要应用于发送请求前的处理:
• 获取前 将div设置面加载…
• 提交表单前 验证数据
如果在函数中返回ture继续执行发送请求,如果返回flase取消发送
$.AJAX({
type:'post',
url:'http://httpbin.org/get',
data:{
name:'zs',
age:18
}
beforeSend:function(){
alert("请求不会被发送")
return false
}
success: function(resp){
console.log(resp)
}
})
### jquery.get() 与 jquery.post()的使用#### GET请求> $.get(url,data,function(resp))样例代码```js
$.get('http://httpbin.org/get',
{name:'zs',age:18},function(resp){
console.log(resp)
})
POST请求
$.post(url,data,function(resp))
样例代码
$.post('http://httpbin.org/post',
{name:'zs',age:18},function(resp){console.log(resp)
})
Graphql
介绍
GraphQL 是Facebook于 2012 年在内部开发的数据查询语言,在 2015 年开源。
其数据由服务器上的一个Scheme提供,其查询返回的数据依赖请求的时候用户需要的精确数据。
官网:https://graphql.cn/
GraphQL和RESTful一样,都是一种网站架构,一种前后端通信规范,不涉及语言,不同语言有不同的实现方案。
GraphQL目前被认为是革命性的API工具,因为它可以让客户端在请求中指定希望得到的数据,而不像传统的RESTful那样只能呆板地在服务端进行预定义。
这样它就让前、后端团队的协作变得比以往更加的通畅,从而能够让组织更好地运作。
而实际上,GraphQL与RESTful都是基于HTTP进行数据的请求与接收,而且GraphQL也内置了很多RESTful模型的元素在里面。
那么在技术层面上,GraphQL和RESTful这两种API模型到底有什么异同呢?他们归根到底其实没多大区别,只不过GraphQL做了一些小改进,使得开发体验产生了较大的改变。