代码:
import web import os.path import MySQLdb import json import hashlib import random import math import os from datetime import datetimeclass DateEncoder(json.JSONEncoder):def default(self, obj):if isinstance(obj, datetime):return obj.strftime("%Y-%m-%d %H:%M:%S")else:return json.JSONEncoder.default(self, obj)urls = ('/', 'indexHandler','/user/register', 'RegisterHandler','/user/login', 'LoginHandler','/user/logout', 'LogoutHandler','/user/getuserinfo', 'GetuserinfoHandler','/post/list', 'PostlistHandler','/post/detail', 'PostdetailHandler','/post/add', 'PostaddHandler','/post/edit', 'PosteditHandler','/post/delete', 'PostdeleteHandler','/reply/list', 'ReplylistHandler','/reply/detail', 'ReplydetailHandler','/reply/add', 'ReplyaddHandler','/reply/edit', 'ReplyeditHandler','/reply/delete', 'ReplydeleteHandler' )class BaseHandler:conn = Nonecursor = NonesecretKey = 'saacac3423@21212'pagesize = 20def __init__(self):self.conn = MySQLdb.Connection('127.0.0.1', 'root', '123456', 'my_bbs')self.cursor = self.conn.cursor(cursorclass = MySQLdb.cursors.DictCursor)def __del__(self):self.cursor.close()self.conn.close()def get_argument(self, name, default = ''):i = web.input()if name in i:if name == "id":return i.idelif name == "sessionId":return i.sessionIdelif name == "page":return i.pageelif name == "keyword":return i.keywordelif name == "username":return i.usernameelif name == "password":return i.passwordelif name == "nickname":return i.nicknameelif name == "title":return i.titleelif name == "content":return i.contentelif name == "contentId":return i.contentIdelse:return defaultdef getloginuserinfo(self, sessionId):try:sessionIdHead = self.get_secure_cookie("sessionId")except:sessionIdHead = ''if sessionIdHead is not None and sessionIdHead != '':sessionId = sessionIdHeadsql = "select id,username,nickname,addTime,sessionId from user where sessionId='%s'" % sessionIdself.cursor.execute(sql)data = self.cursor.fetchone()if data is None:data = {'id' : 0, 'username' : '', 'nickname' : '', 'addTime' : '', 'sessionId' : ''}return datadef response(self, code, msg, data):if code != 0:result = {'code' : code, 'msg' : msg, 'data' : None}else:result = {'code' : 0, 'msg' : '', 'data' : data}result = json.dumps(result, cls = DateEncoder, ensure_ascii = False)web.header('Content-Type', 'text/plain; charset=utf-8')web.header('Server', 'webpy-Framework')return resultdef error(self, code, msg):return self.response(code, msg, None)def success(self, data = {}):return self.response(0, '', data)class indexHandler(BaseHandler):def GET(self):web.header('Content-Type', 'text/plain; charset=utf-8')web.header('Server', 'webpy-Framework')return "此站接口使用python.webpy实现,<a href='api.html' target='_blank'>接口列表</a>"class RegisterHandler(BaseHandler):def GET(self):username = self.get_argument("username", "")password = self.get_argument("password", "")nickname = self.get_argument("nickname", "")sql = "select id,username,nickname,addTime from user where username='%s'" % usernameself.cursor.execute(sql)data = self.cursor.fetchone()if data != None:return self.error(1, '用户名已经存在')try:passwordMd5 = hashlib.md5(password.encode(encoding='utf-8')).hexdigest()sql = "insert into user(username, password, nickname) value('%s', '%s', '%s')" % (username, passwordMd5, nickname)self.cursor.execute(sql)self.conn.commit()insertId = self.cursor.lastrowidreturn self.success(insertId)except MySQLdb.Error as e:self.conn.rollback()return self.error(1, '注册失败')class LoginHandler(BaseHandler):def GET(self):username = self.get_argument("username", "")password = self.get_argument("password", "")passwordMd5 = hashlib.md5(password.encode(encoding='utf-8')).hexdigest()sql = "select id,username,nickname,addTime from user where username='%s' and password='%s'" % (username, passwordMd5)self.cursor.execute(sql)data = self.cursor.fetchone()if data == None:return self.error(1, '用户名或者密码错误')tmpSessionId = self.secretKey + str(data['id']) + str(data['addTime'])tmpSessionId = hashlib.md5(tmpSessionId.encode(encoding='utf-8')).hexdigest()try:sql = "update user set sessionId='%s' where id=%s" % (tmpSessionId, data['id'])self.cursor.execute(sql)self.conn.commit()data['sessionId'] = tmpSessionIdreturn self.success(data)except MySQLdb.Error as e:self.conn.rollback()return self.error(1, '保存会话id失败')class LogoutHandler(BaseHandler):def GET(self):sessionId = self.get_argument("sessionId", "")data = super().getloginuserinfo(sessionId)if data == None:return self.success(None)if data['sessionId'] == '':return self.success(data)try:sql = "update user set sessionId='' where sessionId='%s'" % sessionIdself.cursor.execute(sql)self.conn.commit()data['sessionId'] = ''return self.success(data)except MySQLdb.Error as e:self.conn.rollback()return self.error(1, '删除会话id失败')class GetuserinfoHandler(BaseHandler):def GET(self):sessionId = self.get_argument("sessionId", "")userinfo = super().getloginuserinfo(sessionId)return self.success(userinfo)class PostlistHandler(BaseHandler):def GET(self):page = self.get_argument("page", "1")keyword = self.get_argument("keyword", "")if page == "":page = 1page = int(page)if page <= 0:page = 1addsql = " isDel=0 "if keyword is not None and keyword != '':addsql = " isDel=0 and title like '%"+keyword+"%' "start = (page - 1) * self.pagesizesql1 = "select count(1) as count from content where %s" % addsqlself.cursor.execute(sql1)countdata = self.cursor.fetchone()totalpage = math.ceil(countdata['count'] / float(self.pagesize))data = []if totalpage > 0:sql2 = "select id,title,userId,userNickename,replyNum,updateTime from content where %s order by updateTime desc limit %s,%s" % (addsql, start, self.pagesize)self.cursor.execute(sql2)data = self.cursor.fetchall()return self.success({'totalpage' : totalpage, 'data' : data})class PostdetailHandler(BaseHandler):def GET(self):id = self.get_argument("id", "0")sql = "select id,title,content,userId,userNickename,replyNum,updateTime from content where isDel=0 and id=%s" % idself.cursor.execute(sql)data = self.cursor.fetchone()return self.success(data)class PostaddHandler(BaseHandler):def GET(self):title = self.get_argument("title", "")content = self.get_argument("content", "")sessionId = self.get_argument("sessionId", "")userinfo = super().getloginuserinfo(sessionId)userId = userinfo['id']userNickename = userinfo['nickname']if userId <= 0:return self.error(1, '请先登录')try:sql = "insert into content(title, content, userId, userNickename) value('%s', '%s', %s, '%s')" % (title, content, userId, userNickename)self.cursor.execute(sql)self.conn.commit()insertId = self.cursor.lastrowidreturn self.success(insertId)except MySQLdb.Error as e:self.conn.rollback()return self.error(1, '发帖失败')class PosteditHandler(BaseHandler):def GET(self):id = self.get_argument("id", "0")title = self.get_argument("title", "")content = self.get_argument("content", "")sessionId = self.get_argument("sessionId", "")userinfo = super().getloginuserinfo(sessionId)userId = userinfo['id']userNickename = userinfo['nickname']if userId <= 0:return self.error(1, '请先登录')try:sql = "update content set title='%s',content='%s',userId=%s,userNickename='%s' where id=%s and userId=%s" % (title, content, userId, userNickename, id, userId)self.cursor.execute(sql)self.conn.commit()return self.success(None)except MySQLdb.Error as e:self.conn.rollback()return self.error(1, '编辑帖子失败')class PostdeleteHandler(BaseHandler):def GET(self):id = self.get_argument("id", "0")sessionId = self.get_argument("sessionId", "")userinfo = super().getloginuserinfo(sessionId)userId = userinfo['id']userNickename = userinfo['nickname']if userId <= 0:return self.error(1, '请先登录')try:sql = "update content set isDel=1 where id=%s and userId=%s" % (id, userId)self.cursor.execute(sql)self.conn.commit()return self.success(None)except MySQLdb.Error as e:self.conn.rollback()return self.error(1, '删除帖子失败')class ReplylistHandler(BaseHandler):def GET(self):page = self.get_argument("page", "1")contentId = self.get_argument("contentId", "0")if page == "":page = 1page = int(page)if page <= 0:page = 1start = (page - 1) * self.pagesizesql1 = "select count(1) as count from reply where isDel=0 and contentId=%s" % contentIdself.cursor.execute(sql1)countdata = self.cursor.fetchone()totalpage = math.ceil(countdata['count'] / float(self.pagesize))data = []if totalpage > 0:sql2 = "select id,content,replyUserId,replyUserNickename,addTime from reply where isDel=0 and contentId=%s order by id asc limit %s,%s" % (contentId, start, self.pagesize)self.cursor.execute(sql2)data = self.cursor.fetchall()return self.success({'totalpage' : totalpage, 'data' : data})class ReplydetailHandler(BaseHandler):def GET(self):id = self.get_argument("id", "0")sql = "select id,content,replyUserId,replyUserNickename,addTime from reply where isDel=0 and id=%s" % idself.cursor.execute(sql)data = self.cursor.fetchone()return self.success(data)class ReplyaddHandler(BaseHandler):def GET(self):contentId = self.get_argument("contentId", "0")content = self.get_argument("content", "")sessionId = self.get_argument("sessionId", "")userinfo = super().getloginuserinfo(sessionId)userId = userinfo['id']userNickename = userinfo['nickname']if userId <= 0:return self.error(1, '请先登录')try:sql2 = "update content set replyNum=replyNum+1 where id=%s" % contentIdself.cursor.execute(sql2)sql1 = "insert into reply(contentId, content, replyUserId, replyUserNickename) value(%s, '%s', %s, '%s')" % (contentId, content, userId, userNickename)self.cursor.execute(sql1)self.conn.commit()insertId = self.cursor.lastrowidreturn self.success(insertId)except MySQLdb.Error as e:self.conn.rollback()return self.error(1, '回复失败')class ReplyeditHandler(BaseHandler):def GET(self):id = self.get_argument("id", "0")content = self.get_argument("content", "")sessionId = self.get_argument("sessionId", "")userinfo = super().getloginuserinfo(sessionId)userId = userinfo['id']userNickename = userinfo['nickname']if userId <= 0:return self.error(1, '请先登录')try:sql = "update reply set content='%s',replyUserId=%s,replyUserNickename='%s' where id=%s and replyUserId=%s" % (content, userId, userNickename, id, userId)self.cursor.execute(sql)self.conn.commit()return self.success(None)except MySQLdb.Error as e:self.conn.rollback()return self.error(1, '编辑回复失败')class ReplydeleteHandler(BaseHandler):def GET(self):id = self.get_argument("id", "0")sessionId = self.get_argument("sessionId", "")userinfo = super().getloginuserinfo(sessionId)userId = userinfo['id']userNickename = userinfo['nickname']if userId <= 0:return self.error(1, '请先登录')sql = "select id,content,replyUserId,replyUserNickename,addTime,contentId from reply where isDel=0 and id=%s" % idself.cursor.execute(sql)contentdata = self.cursor.fetchone()if contentdata is None:return self.error(1, '回复不存在')try:sql2 = "update content set replyNum=replyNum-1 where id=%s" % contentdata['contentId']self.cursor.execute(sql2)sql1 = "update reply set isDel=1 where id=%s and replyUserId=%s" % (id, userId)self.cursor.execute(sql1)self.conn.commit()return self.success(None)except MySQLdb.Error as e:self.conn.rollback()return self.error(1, '删除回复失败')if __name__ == "__main__":app = web.application(urls, globals())app.run()
输出:
D:\workspace\studys\study_pys\pc_app\dist>D:\software\Python310\python.exe D:\workspace\studys\study_bbs\start_web_webpy.py 1092
http://0.0.0.0:1092/