单一入口文件
manage.py
# encoding: utf-8
import jsonfrom flask import Flask, jsonify, request
from flask_cors import CORS
from database import db,r
from sqlalchemy import text
from config import *
# 实例化flask对象
app = Flask(__name__)
# 配置转码
app.config['JSON_AS_ASCII'] = False
# 配置跨越
CORS(app, cors_allowed_origins="*")# 配置mysql数据库
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://{}:{}@{}:{}/{}'.format(mysql_user,mysql_password,mysql_host,mysql_port,mysql_db)
# 自动提交sql请求
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True# 初始化操作
db.init_app(app)"""
methodsurl:paramsget delete: argspost :data
"""
# 配置路由
@app.route('/', methods=['GET'])
def index():id = request.args.get('id', None)print('请求的默认参数{}'.format(id))userlist = db.session.execute(text("select * from user")).fetchall()# row._mapping 用于获取查询结果的键值对,从而确保每个 row 对象可以被正确地转换为字典。data = [dict(row._mapping) for row in userlist]return jsonify({"data": data})# 新建用户
@app.route('/user/add', methods=['POST'])
def add_user():data = request.get_json()db.session.execute(text("insert into user(email,password) values(:email,:password)"), data)db.session.commit()# 字典转json字符串json_data = json.dumps(data)r.set('userlist', json_data)return jsonify({"msg": "success"})# 删除用户
@app.route('/user/del', methods=['POST'])
def del_user():data = request.get_json()db.session.execute(text("delete from user where id=:id"), data)db.session.commit()return jsonify({"msg": "success"})# 更新用户
@app.route('/editUser', methods=['POST'])
def editUser():data = request.get_json()db.session.execute(text("update user set email=:email,password=:password where id=:id"), data)db.session.commit()return jsonify({"msg": "success"})@app.route('/user/query', methods=['GET'])
def query_user():try:id = request.args.get('id', None)if id is None:return jsonify({"error": "ID parameter is required"}), 400try:id = int(id)except ValueError:return jsonify({"error": "ID must be an integer"}), 400userlist = db.session.execute(text("select * from `user` where id=:id"), {"id": id}).fetchall()data = [dict(row._mapping) for row in userlist]return jsonify({"data": data})except Exception as e:return jsonify({"error": str(e)}), 500# redis 查询
@app.route('/redis/query', methods=['POST'])
def redis_query():data = request.get_json()userlist = r.get('userlist')data = json.loads(userlist)return jsonify({"data": data})# 启动flask
if __name__ == '__main__':app.run(debug=True, host='0.0.0.0', port=5000)
数据库
# -*- coding: utf-8 -*-from flask_sqlalchemy import SQLAlchemy
import pymysql# 通过pymysql 模拟mysqldbpymysql.install_as_MySQLdb()# 实例化mysql对象
db = SQLAlchemy()import redis# 实例化redis对象r = redis.Redis(host='localhost', port=6379, db=5,decode_responses=True)
配置文件
# -*- coding: utf-8 -*-mysql_user = 'root'
mysql_password = '123456'
mysql_host = '127.0.0.1'
mysql_port = 3306
mysql_db = 'social'
测试文件
# -*- coding: utf-8 -*-import requestsclass HttpApiTest:def test_get(self,url,data=None):res = requests.get(url,params=data)return res.textdef test_post(self,url,data=None):res = requests.post(url,data=data)return res.textif __name__ == '__main__':httpapi = HttpApiTest()res = httpapi.test_get('http://127.0.0.1:5000/',data={'id':'123'})res2 = httpapi.test_post('http://127.0.0.1:5000/user/user/add',data={'id':'123'})# res1 = httpapi.test_post('http://127.0.0.1:5000/user/update_user',data={'id':'123'})# print(res1)