分页查询
分页查询是处理大量数据时常用的技术,通过分页可以将数据分成多个小部分,方便用户逐页查看。SQLAlchemy 提供了简单易用的方法来实现分页查询。
本篇我们也会在最终实现这样的分页效果:
1. 什么是分页查询
分页查询是将查询结果按照一定数量分成多页展示,每页显示固定数量的记录。分页查询通常使用两个参数:
limit
:每页显示的记录数量。offset
:跳过的记录数量。
例如,要查询第二页,每页显示 10 条记录:
limit
:10offset
:10
2. 使用 SQLAlchemy 实现分页查询
基本查询
首先,我们需要一个基本的查询来获取数据:
import db
from model import Studentdef basic_query():students = db.session.query(Student).all()for stu in students:print(stu.to_dict())
使用 limit
和 offset
前文中,我们已经了解到 SQLAlchemy 提供了 limit
和 offset
方法来实现分页查询。limit
限制返回的记录数量,offset
跳过指定数量的记录。
import db
from model import Studentdef paginated_query(page, per_page):q = db.select(Student).limit(per_page).offset((page - 1) * per_page)students = db.session.execute(q).scalars()for stu in students:print(stu.to_dict())
例如,要获取第 2 页,每页显示 10 条记录:
paginated_query(2, 10)
对应的 SQL 语句:
SELECT * FROM tb_student LIMIT 10 OFFSET 10;
3. 前后端实现分页功能
后端分页
在后端实现分页功能时,可以创建一个函数来处理分页逻辑。这个函数接受 page
和 per_page
参数,并返回当前页的数据和总页数。
import db
from model import Studentdef get_paginated_students(page, per_page):total = db.session.query(Student).count()q = db.select(Student).limit(per_page).offset((page - 1) * per_page)students = db.session.execute(q).scalars()return {'total': total,'page': page,'per_page': per_page,'pages': (total + per_page - 1) // per_page,'data': [stu.to_dict() for stu in students]}
前端分页
在前端实现分页时,可以使用后端提供的分页数据来渲染页面:
{"total": 100,"page": 2,"per_page": 10,"pages": 10,"data": [{"id": 11, "name": "Student 11", ...},{"id": 12, "name": "Student 12", ...},...]
}
前端可以根据这些数据渲染分页控件和当前页的数据。
[拓展] Flask 分页演示
下面是一个前后端不分离的 Flask 项目,代码文件比较多,你需要自行理一下。同时也要保证 Flask
、 Flask-SQLAlchemy
与 Flask-MysqlDB
的安装。
pip install flask
pip install flask-sqlalchemy # 兼容 Flask 的 SQLAlchemy 框架,提供 ORM 功能
pip install flask-mysqldb # 为 Flask-SQLAlchemy 提供 MySQL 驱动
Flask 项目目录如下:
flask_app/ # 项目目录
├── templates/ # 模板目录
│ └── list.html # 模板文件
├── config.py # Flask 配置文件
├── db.py # 数据库核心文件,包含重要操作
├── manage.py # Flask 路由和业务视图文件
└── models.py # 数据库模型文件
首先看一下配置文件 config.py
:
class Config:SQLALCHEMY_DATABASE_URI = 'mysql://root:0908@localhost:3306/db_flask_demo_school?charset=utf8mb4' # 数据库连接。自行替换数据库用户名称和密码以及实际数据库名SQLALCHEMY_ECHO = False # 是否打印执行的 SQL 语句及其耗时DEBUG = True # 是否启用调试模式
db.py
"""
Create database:> create database db_flask_demo_school charset=utf8mb4
"""
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import *db = SQLAlchemy()
models.py
from db import *class Student(db.Model):__tablename__ = 'tb_student2'id = db.Column(db.Integer, primary_key=True, comment="主键")name = db.Column(db.String(15), index=True, comment="姓名")age = db.Column(db.SmallInteger, comment="年龄")sex = db.Column(db.Boolean, comment="性别")email = db.Column(db.String(128), unique=True, comment="邮箱地址")money = db.Column(db.Numeric(10, 2), default=0.0, comment="钱包")def to_dict(self):return {'id': self.id,'name': self.name,'age': self.age,'sex': self.sex,'email': self.email,'money': float(self.money)}def __repr__(self):return f'<{self.__class__.__name__}: {self.name}>'
然后就是 manage.py
,编写了路由与业务代码:
from pathlib import Path
from flask import Flask, jsonify, request, render_template
from config import Config
from models import db, Studentapp = Flask(__name__, template_folder='./templates')
app.config.from_object(Config)db.init_app(app)@app.route('/', methods=['GET'])
def index():"""没啥用,勿看"""title = Path(__file__).namereturn title@app.route('/students', methods=['POST'])
def create_student():"""采集访问的信息,创建学生"""sex = request.form.get('sex')sex = int(sex) if sex.isdigit() else 0student = Student(name=request.form.get('name', '未知'),age=request.form.get('age', 0),sex=bool(sex),email=request.form.get('email', ''),money=request.form.get('money', 0),)if request.form.get('id', None) is not None:student.id = request.form['id']db.session.add(student)db.session.commit()return jsonify({'success': True,'data': student.to_dict(),'msg': 'success'}), 201@app.route('/students', methods=['DELETE'])
def delete_students():"""删除学生表的所有记录"""db.session.execute(db.delete(Student))db.session.commit()return jsonify({'success': True,'data': None,'msg': 'success'})@app.route('/students', methods=['GET'])
def get_students():# 旧版本 2.x 获取全部数据# students = Student.query.all()# 新版本 3.1.x 获取全部数据students = db.session.execute(db.select(Student).where()).scalars()return jsonify({'success': True,'data': {'count': Student.query.count(),'students': [student.to_dict() for student in students]},'msg': 'success'})@app.route('/students/<int:student_id>', methods=['GET'])
def get_student(student_id):# 根据主键查询数据,不存在则为 Nonestudent = db.session.get(Student, student_id)if not student:return jsonify({'success': False,'data': None,'msg': 'student not found'})return jsonify({'success': True,'data': student.to_dict(),'msg': 'success'})@app.route('/students/data', methods=['GET'])
def students_data():"""这里是分页器的使用,不同于我们所使用的 limit 和 offset 需要自己编写"""# 不采取数据分页时,大量数据时会导致服务器运存膨胀,这是非常不妥的page = request.args.get('page', 1, type=int)per_page = request.args.get('size', 3, type=int)# 创建分页器对象pagination = Student.query.paginate(page=page, per_page=per_page, max_per_page=20)print('当前页对象', pagination)print('总数据量', pagination.total)print('当前页数据列表', pagination.items)print('总页码', pagination.pages)print()print('是否有上一页', pagination.has_prev)print('上一页页码', pagination.prev_num)print('上一页对象', pagination.prev())print('上一页对象的数据列表', pagination.prev().items)print()print('是否有下一页', pagination.has_next)print('下一页页码', pagination.next_num)print('下一页对象', pagination.next())print('下一页对象的数据列表', pagination.next().items)# """前后端分离推荐使用的 json 结果,这里没用到"""data = {"page": pagination.page, # 当前页码"pages": pagination.pages, # 总页码"has_prev": pagination.has_prev, # 是否有上一页"prev_num": pagination.prev_num, # 上一页页码"has_next": pagination.has_next, # 是否有下一页"next_num": pagination.next_num, # 下一页页码"items": [{"id": item.id,"name": item.name,"age": item.age,"sex": item.sex,"money": item.money,} for item in pagination.items]}return render_template('list.html', **locals())if __name__ == '__main__':with app.app_context():db.drop_all() # 启动时先删除相关表,后创建相关表db.create_all()app.run('0.0.0.0', 9527)
最后就是 list.html
这个模板文件,呈现一个分页的演示:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title><style>body {font-family: Arial, sans-serif;background-color: #f4f7fa;color: #333;}table {border-collapse: collapse;margin: 50px auto;width: 80%;box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1);background-color: #fff;}th, td {padding: 12px 15px;text-align: center;}th {background-color: #007bff;color: #fff;text-transform: uppercase;}tr:nth-child(even) {background-color: #f2f2f2;}tr:hover {background-color: #e9f5ff;}.page {margin: 20px auto;text-align: center;}.page a, .page span {padding: 8px 16px;margin: 0 4px;color: #007bff;background: #fff;border: 1px solid #007bff;border-radius: 4px;text-decoration: none;transition: background-color 0.3s, color 0.3s;}.page a:hover {background-color: #007bff;color: #fff;}.page span {background-color: #007bff;color: #fff;}</style>
</head>
<body><table border="1" align="center" width="600"><tr><th>ID</th><th>Age</th><th>Name</th><th>Sex</th><th>Money</th></tr>{% for student in pagination.items %}<tr><td>{{ student.id }}</td><td>{{ student.age }}</td><td>{{ student.name }}</td><td>{{ "男" if student.sex else "女" }}</td><td>{{ student.money }}</td></tr>{% endfor %}<tr align="center"><td colspan="5" class="page">{% if pagination.has_prev %}<a href="?page=1">首 页</a><a href="?page={{ pagination.page - 1 }}">上一页</a><a href="?page={{ pagination.page - 1 }}">{{ pagination.page - 1 }}</a>{% endif %}<span>{{ pagination.page }}</span>{% if pagination.has_next %}<a href="?page={{ pagination.page + 1 }}">{{ pagination.page + 1 }}</a><a href="?page={{ pagination.page + 1 }}">下一页</a><a href="?page={{ pagination.pages }}">尾 页</a>{% endif %}</td></tr></table>
</body>
</html>
为了确保能够有一定数量的数据,请你另外新建一个 request.py
,用于创建大量数据(如果你知道 faker 的使用,也可以自己弄一些数据),先启动 manage.py
,保证后端服务的开启和路由可用,然后直接运行该文件后可添加测试数据:
# request.py
import requests # pip install requestsstudents = [ # 虚拟数据,务必当真{'name': '王毅','age': 21,'sex': 1,'email': 'wangyi@gmail.com','money': 4488.5},{'name': '张晓','age': 19,'sex': 0,'email': 'zhangxiao@example.com','money': 2389.75},{'name': '李春阳','age': 23,'sex': 1,'email': 'lichunyang@outlook.com','money': 6715.32},{'name': '刘瑞','age': 20,'sex': 0,'email': 'liurui@yahoo.com','money': 3456.89},{'name': '陈欢','age': 22,'sex': 1,'email': 'chenhuan@gmail.com','money': 5678.12},{'name': '吴娜','age': 18,'sex': 0,'email': 'wuna@example.org','money': 1234.56},{'name': '赵丹','age': 24,'sex': 0,'email': 'zhaoda@outlook.com','money': 7890.43},{'name': '孙宇','age': 21,'sex': 1,'email': 'sunyu@yahoo.co.jp','money': 4567.89},{'name': '黄宇','age': 19,'sex': 1,'email': 'huangyu@gmail.com','money': 2345.67},{'name': '杨静','age': 22,'sex': 0,'email': 'yangjing@example.com','money': 6789.01}
]
for student in students:response = requests.request('POST', 'http://127.0.0.1:9527/students', data=student)print('添加一条记录', response.json())
确定 Flask 项目正常启动,并且上面的数据也完成了注入,如果你发现启动失败了,请检查路由、数据库连接是否有问题,你可能需要一定的 Flask 基础知识。接下来如何访问我们渲染的模板呢?
根据路由视图和设置的访问端口(9527):
@app.route('/students/data', methods=['GET'])
def students_data():...return render_template('list.html', **locals())
我们直接在浏览器访问:http://127.0.0.1:9527/students/data 这个地址即可。
上述案例是演示所用,随意写的,小部分代码参考了某机构的教程代码示例,平台原因无法标注,路由设计也是很随便的,这种代码如果存在版权纠纷,emmm.....,请联系我删除,谢谢。无私开源,只为搞懂后端开发的学习,请勿钻牛角……