服务端增加根据上传附件格式 xlsx 类型,将表格第一个sheet数据批量快速导入数据库
服务端
import socketserver
import json
import os
#import pymysql
import cx_Oracle #Oracle 数据库连接
import time
import tqdm
import pandas as pd
import openpyxlclass MyServer(socketserver.BaseRequestHandler):def handle(self):self.add_ip = self.client_address[0]self.add_post = str(self.client_address[1])while True:try:data = self.request.recv(102400)#如果获取为空就退出if not data:break#否则解码处理数据self.data = json.loads(data.decode('utf-8'))# data 是获取字典内容,self.client_address 是 ip地址与 端口print('客户端的消息:',self.data,self.client_address)#数据库对访问记录存档self.log_record()# 定义处理规则self.visit_response()#将结果反馈给客户端self.request.sendall(self.fankui.encode('utf-8'))except :#print('连接异常')break#定义处理规则def visit_response(self):#获取时间in_time = self.get_current_time()[0:14]print('self.data1111',self.data)if self.data['leixing'] == '文件传递':try:file_download_statr,path,file_type = self.file_download()# 如果文件名类型是 xlsx 就把数据导入数据库# time.sleep(0.5)try:print('file_type', file_type)if str(file_type).upper() == 'XLSX':wb = openpyxl.load_workbook(path)#wb.sheetnames[0] 第一个sheet名ws = wb[wb.sheetnames[0]]data_import = pd.read_excel(path, sheet_name=wb.sheetnames[0], header=0, dtype='str',keep_default_na=False);data_import = data_import.where(data_import.notnull(), None);file_path_state1 = os.path.exists(path)print('path', type(path), file_path_state1)ws_type, excel_nr,sheet_name = excel_type(str(path))print('结果', ws_type, excel_nr,sheet_name)table_name = 'ceshi_tmp_' + str(self.add_ip).replace('.', '')excel_in_state = excel_in_new(table_name, ws_type, excel_nr,path,sheet_name)print('excel_in_state', excel_in_state)except:print('数据导入数据库失败')self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (self.data['leixing'], self.add_ip, excel_in_state, file_download_statr)except:self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (self.data['leixing'], self.add_ip, self.add_post, '传递失败')elif self.data['leixing'] == '文件下载': #***********************20230906新增self.user_ip = self.service_user()print('user_ip', self.user_ip, type(self.user_ip))#限制ip地址,不在里面的不允许if self.add_ip in ('134.35.33.250','134.80.99.107','134.80.98.158','134.80.98.173','134.35.10.10') or self.add_ip in self.user_ip:# self.user_ip 是通过 shzc.yytowz_service_user 这个表维护# 获取sql语句生成表格download_start, file_path, file_name, file_size = Oracle_download(self.data['mac'], in_time,str(self.data['ziduan1']).replace("^","'"))print('download_start', download_start, '大小字节', file_size, '文件名', file_name)try:# 将表格返回客户端file_download_statr = self.file_up(file_path,self.data['ziduan3'])self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (self.data['leixing'], file_path, file_size, file_download_statr)except:self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (self.data['leixing'], file_path, file_size, '传递失败')else:self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (self.data['leixing'], 'file_path', '0', '传递失败')elif self.data['leixing'] == '发起访问':try:self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (self.data['leixing'], self.add_ip, self.add_post, '访问成功')except:self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (self.data['leixing'], self.add_ip, self.add_post, '访问失败')else:try:self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (self.data['leixing'], self.add_ip, self.add_post, '访问成功')except:self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (self.data['leixing'], self.add_ip, self.add_post, '访问失败')#文件下载函数def file_download(self):in_time = self.get_current_time()[0:14]file_path = self.data['ziduan1']file_size = self.data['ziduan2']file_hz = file_path.split('/')[-1]file_type = file_hz.split('.')[-1]print('接收文件名:',file_hz,' 接收文件大小:',file_size,' 字节')# 文件传输的缓冲区BUFFER_SIZE = 4096# 接受客户端信息filename, file_size, new_filename = self.data['ziduan1'], self.data['ziduan2'], str(self.data['ziduan3'])[0:6]#判断月文件夹是否存在,不存在创建一个file_path_state1 = os.path.exists('./file_server/'+new_filename)if file_path_state1 == False:os.mkdir('./file_server/'+new_filename)# 获取文件的名字filename = os.path.basename(filename)path = './file_server/'+new_filename +'/' + self.data['mac'] + '_' + in_time + '_' + file_hzprint('filename',os.path.isfile(filename),filename)file_size = int(file_size)if os.path.isfile(path):f = open(path, "wb")else:f = open(path, "wb")rece_size = 0while rece_size < file_size:data = self.request.recv(BUFFER_SIZE)f.write(data)rece_size += len(data)else:return '传递成功',path,file_typedef file_up(self,fujian_label,in_time):# 文件传输的缓冲区BUFFER_SIZE = 4096# 传递文件到指定目录下filename = fujian_label.replace('/', '//')# 文件大小file_size = os.path.getsize(filename)# 创建连接chuandi_tup = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%d","ziduan3":"%s" }' % ('文件提取', filename, file_size, in_time)self.request.sendall(chuandi_tup.encode('utf-8'))# 文件传输progress = tqdm.tqdm(range(file_size), f"发送{filename}", unit="B", unit_divisor=1024)with open(filename, "rb") as f:for _ in progress:# 读取文件bytes_read = f.read(BUFFER_SIZE)if not bytes_read:break# sendall确保及时网络忙碌的时候,数据仍然可以传输self.request.sendall(bytes_read)progress.update(len(bytes_read))# 关闭资源self.request.close()#数据库登录def mysql_execute(self, in_sql, leixing):# 登录数据库#conn = pymysql.connect(host='127.0.0.1', port=3306, user='szc', password='szcNSP850219', database='szc_sql',charset='utf8')dsn = "134.80.200.216/pdbzbjs1"try:conn = cx_Oracle.connect(user="zbweb", password="zibo_533_03", dsn=dsn, encoding="UTF-8")except:time.sleep(10)conn = cx_Oracle.connect(user="zbweb", password="zibo_533_03", dsn=dsn, encoding="UTF-8")# 得到一个可以执行SQL语句的光标对象cursor = conn.cursor()# 数据库执行导入的语句if leixing == '数量':# 反馈数量count = cursor.execute(in_sql)elif leixing == '单条':# 反馈单条cursor.execute(in_sql)count = cursor.fetchone()[0]elif leixing == '多条':# 反馈多条cursor.execute(in_sql)count = cursor.fetchall()elif leixing == '编辑':count = cursor.execute(in_sql)conn.commit()# 关闭光标对象cursor.close()# 关闭数据库连接conn.close()# 反馈return count# 时间计算def get_current_time(self):ct = time.time()local_time = time.localtime(ct)data_head = time.strftime("%Y%m%d%H%M%S", local_time)data_secs = abs(ct - round(ct)) * 1000time_stamp = "%s%03d" % (data_head, data_secs)return time_stamp#日志留存def log_record(self):in_time = self.get_current_time()[0:14]ziduan1 = str(self.data['ziduan1']).replace("'","^")sql = "insert into shzc.yytowz_service_title (leixing,ziduan1,ziduan2,ziduan3,ip_id,post_id,in_time,mac,hostname,ip) values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s') " % (self.data['leixing'], ziduan1, self.data['ziduan2'], self.data['ziduan3'], str(self.client_address[0]),str(self.client_address[1]),in_time, self.data['mac'], self.data['hostname'], self.data['ip'])num = self.mysql_execute(sql,'编辑')#print('num',num)return num# shzc.yytowz_service_user 获取文件下载ip范围def service_user(self):sql = "select distinct a.ip_id from shzc.yytowz_service_user a where STATUS='1' "num = self.mysql_execute(sql, '多条')ip_list = []for ip in num:for ip_id in ip:ip_list.append(ip_id)return ip_list# 时间计算
def get_current_time(input_date='0'):# 如果时间传入为空if input_date == '0':ct = time.time() # - 24 * 60 * 60 #如果是取昨天日期是减数值local_time = time.localtime(ct)data_head = time.strftime("%Y%m%d%H%M%S", local_time)data_secs = abs(ct - round(ct)) * 1000time_stamp = "%s%03d" % (data_head, data_secs)else:time_stamp = input_date + '120000001'return time_stampdef file_transfer(user,file_name):#获取月份in_month = get_current_time()[0:6]# 文件传递给服务器file_path = file_namefile_statr = user.file_up(file_path, in_month)# 如果执行结果不成功,再次执行一次,保底if file_statr['ziduan3'] != '传递成功':file_statr = file_transfer(user,file_name)return file_statrelse:return file_statr#这里用作程序预备,目前建立必要的文件夹
def server_init():# 程序执行前先确认 ./file_server/ 是否存在,不存在新建file_path_state1 = os.path.exists('./file_server')if file_path_state1 == False:os.mkdir('./file_server')file_path_state1 = os.path.exists('./file_server/file_out')if file_path_state1 == False: os.mkdir('./file_server/file_out')#文件数据生成,分mac与时间,不然没法同步下载
def Oracle_download(mac,in_time,sql='0'):# 结果数据生成表格准备发送dsn = "134.80.200.216/pdbzbjs1"conn = cx_Oracle.connect(user="zbweb", password="zibo_533_03", dsn=dsn, encoding="UTF-8")df = pd.read_sql("""%s""" % sql, con=conn)df.to_excel("./file_server/file_out/"+mac+"_"+in_time+"_结果下载.xlsx", index=False)# 文件大小file_path = './file_server/file_out/'+mac+'_'+in_time+'_结果下载.xlsx'file_name = file_path.split('/')[-1]file_size = os.path.getsize(file_path)#返回根据语句处理结果与return "结果下载",file_path,file_name,file_size#获取表格数据 #.replace(" ","").replace("[","").replace("]","").replace("'","").split(",")
def excel_type(path):excel_nr ={}wb = openpyxl.load_workbook(path)print('sheetnames',wb.sheetnames)ws = wb[wb.sheetnames[0]]sheet_name = wb.sheetnames[0]minr = ws.min_rowminc = ws.min_columnmaxr = ws.max_rowmaxc = ws.max_column# 数据库内容print(minr, minc, maxr, maxc)ws_type = {'hang': maxr, 'lie': maxc}rngs = ws.iter_rows(min_row=minr, min_col=minc, max_row=maxr, max_col=maxc)row_cs = 0for row in rngs:#标题做表头if row_cs == 0:value = [c.value for c in row]# print(len(value))row_bt = []for valus_id in range(len(value)):row_bt.append('y' + str(value[valus_id] +'_'+ str(valus_id)))excel_nr[row_cs] = row_btelif row_cs == 1:value = [c.value for c in row]# print(len(value))row_nr = []for valus_id in range(len(value)):row_nr.append(str(value[valus_id]))excel_nr[row_cs] = row_nrrow_cs += 1try:excel_nr.pop(None)except:passreturn ws_type, excel_nr,sheet_name#数据库登录
def mysql_execute(in_sql,leixing):# 登录数据库#conn = pymysql.connect(host='127.0.0.1', port=3306, user='szc', password='szcNSP850219', database='szc_sql',charset='utf8')dsn = "134.80.200.216/pdbzbjs1"try:conn = cx_Oracle.connect(user="zbweb", password="zibo_533_03", dsn=dsn, encoding="UTF-8")except:time.sleep(10)conn = cx_Oracle.connect(user="zbweb", password="zibo_533_03", dsn=dsn, encoding="UTF-8")# 得到一个可以执行SQL语句的光标对象cursor = conn.cursor()# 数据库执行导入的语句if leixing == '数量':# 反馈数量count = cursor.execute(in_sql)elif leixing == '单条':# 反馈单条cursor.execute(in_sql)count = cursor.fetchone()[0]elif leixing == '多条':# 反馈多条cursor.execute(in_sql)count = cursor.fetchall()elif leixing == '编辑':count = cursor.execute(in_sql)conn.commit()# 关闭光标对象cursor.close()# 关闭数据库连接conn.close()# 反馈return countdef excel_in_new(table_name,ws_type,excel_nr,path,sheet_name):print('结果',table_name, ws_type)# 先删除这个表sql = "drop table zhyw." + table_nametry:mysql_execute(sql, '编辑')except:pass# 创建临时表sql = "create table zhyw." + table_name + " ("for i in range(ws_type['lie']):# print('i',i,excel_nr[0][i])sql = sql + excel_nr[0][i] + " varchar2(100),"sql = sql[:-1] + ")"print('sql:', sql)try:mysql_execute(sql, '编辑')except:passdsn = "134.80.200.216/pdbzbjs1";conn = cx_Oracle.connect(user="zbweb", password="zibo_533_03", dsn=dsn, encoding="UTF-8");cursor = conn.cursor();data_import = pd.read_excel(path, sheet_name=sheet_name, header=0, dtype='str', keep_default_na=False);data_import = data_import.where(data_import.notnull(), None);df = data_import.apply(lambda x: tuple(x), axis=1).values.tolist() # dataframe 需要改改为包含元组的列表才可以导入数据库#deal_export ="insert into zhyw.shc_caigou_kucun_mx values(:1,:2,:3,:4,:5,:6,:7,:8,:9,:10,:11,:12,:13,:14,:15,:16,:17,:18,:19)"deal_export = "insert into zhyw." + table_name +" values("for i in range(ws_type['lie']):deal_export = deal_export + ":" + str(i+1) +","deal_export = deal_export[:-1] + ")"print('deal_export',deal_export)cursor.prepare(deal_export); # 编译sqlprint(cursor.executemany(None, df));conn.commit();return "zhyw."+table_nameif __name__ == '__main__':#服务器文件夹准备server_init()#服务器开始s = socketserver.ThreadingTCPServer(('134.35.10.10', 8967), MyServer)#类似实现连接循环s.serve_forever()
客户端与之前无变化
import socket
import json
import time
import os
import tqdm
import uuid #获取系统macclass My_Main():def __init__(self):# 程序执行前先确认 ./file_server/file_work_order/ 是否存在,不存在新建file_path_state1 = os.path.exists('./file_main')if file_path_state1 == False: os.mkdir('./file_main')# 定义服务端地址self.ip_num, self.port_num = '134.35.10.10', 8967# macself.mac = uuid.UUID(int=uuid.getnode()).hex[-12:]# 获取主机名self.hostname = socket.gethostname()# 获取IPself.ip = socket.gethostbyname(self.hostname)#发起访问调用模块def socket_dlgc(self,leixing, name_text, pass_text):# 获取14位长度时间,年月日时分秒的self.in_time = get_current_time()[0:14]tcp_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)tcp_client.connect((self.ip_num, self.port_num))if None == name_text:print('与服务器断开连接')#发送msg = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s","mac":"%s","hostname":"%s","ip":"%s" }' % (str(leixing), str(name_text), str(pass_text), self.in_time,self.mac, self.hostname, self.ip)tcp_client.send(msg.encode("utf-8")) # 说话 #data = tcp_client.recv(102400) # 听话js_data = json.loads(data.decode('utf-8'))tcp_client.close()return js_data# 文件下载函数def file_download(self,sql):# 获取14位长度时间,年月日时分秒的self.in_time = get_current_time()[0:14]# 套接字是对访问的ip地址和端口反馈,需要从开始定好tcp_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)tcp_client.connect((self.ip_num, self.port_num))# 申请数据下载chuandi_tup = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%d","ziduan3":"%s","mac":"%s","hostname":"%s","ip":"%s" }' % ('文件下载', sql, 1, self.in_time+'.xlsx',self.mac, self.hostname, self.ip)tcp_client.send(chuandi_tup.encode("utf-8")) # 说话self.data = tcp_client.recv(102400) # 听话js_data = json.loads(self.data.decode('utf-8'))#print(js_data, type(js_data))if js_data['ziduan1'] not in ('', None):file_size = int(js_data['ziduan2'])#print(file_size)filename = js_data['ziduan3']rece_size = 0recv_data = tcp_client.recv(4096)if recv_data: # 如果获取数据不为空try:with open('./file_main/' + filename, "wb")as f:f.write(recv_data)while rece_size < file_size:recv_data = tcp_client.recv(4096)f.write(recv_data)rece_size += len(self.data)js_data = {'leixing': '文件下载', 'ziduan1': js_data['ziduan3'], 'ziduan2': file_size, 'ziduan3': '传递成功'}except:js_data = {'leixing': '文件下载', 'ziduan1': js_data['ziduan3'], 'ziduan2': file_size, 'ziduan3': '传递失败'}# 关闭套接字tcp_client.close()return js_data#客户端文件上传,fujian_label 文件全路径def file_up(self,fujian_label):# 获取14位长度时间,年月日时分秒的self.in_time = get_current_time()[0:14]# 文件传输的缓冲区BUFFER_SIZE = 4096# 创建连接s = socket.socket()s.connect((self.ip_num, self.port_num))# 传递文件到指定目录下filename = fujian_label.replace('/', '//')# 文件大小file_size = os.path.getsize(filename)chuandi_tup = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%d","ziduan3":"%s","mac":"%s","hostname":"%s","ip":"%s" }' % ('文件传递', filename, file_size, self.in_time,self.mac, self.hostname, self.ip )s.send(chuandi_tup.encode())# 文件传输progress = tqdm.tqdm(range(file_size), f"发送{filename}", unit="B", unit_divisor=BUFFER_SIZE)with open(filename, "rb") as f:for _ in progress:# 读取文件bytes_read = f.read(BUFFER_SIZE)if not bytes_read:breaktry:# sendall确保及时网络忙碌的时候,数据仍然可以传输s.sendall(bytes_read)progress.update(len(bytes_read))except:js_data = {'leixing': '文件传递', 'ziduan1': self.ip_num, 'ziduan2': self.port_num, 'ziduan3': '传递失败'}break#文件传递完后,看看是否有反馈,有的话函数返回try:data = s.recv(102400) # 听话js_data = json.loads(data.decode('utf-8'))except:js_data = {'leixing': '文件传递', 'ziduan1': self.ip_num, 'ziduan2': self.port_num, 'ziduan3': '传递失败'}# 关闭资源s.close()return js_data# 时间计算
def get_current_time(input_date='0'):# 如果时间传入为空if input_date == '0':ct = time.time() # - 24 * 60 * 60 #如果是取昨天日期是减数值local_time = time.localtime(ct)data_head = time.strftime("%Y%m%d%H%M%S", local_time)data_secs = abs(ct - round(ct)) * 1000time_stamp = "%s%03d" % (data_head, data_secs)else:time_stamp = input_date + '120000001'return time_stamp#文件传递给服务器
def file_transfer(user,file_name,num=0):# 防止有语法错误等原因导致死循环,限制最多处理4次if num < 5:# 文件传递给服务器try:file_statr = user.file_up(file_name)num += 1except:file_statr = {'leixing': '文件传递', 'ziduan1': file_name, 'ziduan2': '0', 'ziduan3': '传递失败'}num += 1# 如果执行结果不成功,再次执行一次,保底if file_statr['ziduan3'] != '传递成功':file_statr = file_transfer(user,file_name,num)return file_statrelse:return file_statrelse:file_statr = {'leixing': '文件传递', 'ziduan1': file_name, 'ziduan2': '0', 'ziduan3': '传递失败'}return file_statr#文件传递给服务器
def file_gain(user,sql,num=0):#防止有语法错误等原因导致死循环,限制最多处理4次if num < 5:# 文件传递给服务器try:file_statr = user.file_download(sql)num += 1except:file_statr = {'leixing': '文件下载', 'ziduan1': sql, 'ziduan2': '0', 'ziduan3': '传递失败'}num += 1# 如果执行结果不成功,再次执行一次,保底if file_statr['ziduan3'] != '传递成功':file_statr = file_gain(user,sql,num)return file_statrelse:return file_statrelse:file_statr = {'leixing': '文件下载', 'ziduan1': sql, 'ziduan2': '0', 'ziduan3': '传递失败'}return file_statrdef use_show():print('请您选择需要处理的类型(请输入选择的编码):')print('类型:发起访问 编码:1')print('类型:文件传递 编码:2')print('类型:文件下载 编码:3')def use_choice(use_input):if use_input == '1':use_type = '发起访问'elif use_input == '2':use_type = '文件传递'elif use_input == '3':use_type = '文件下载'else:use_type = '类型不详'return use_typedef use_decision(user,use_input):if use_input == '1':name_text = input('请您输入需要发起的标题:')pass_text = input('请您输入需要发起的内容:')socket_dlgc_statr = user.socket_dlgc('发起访问', name_text, pass_text)print('系统反馈:', socket_dlgc_statr)elif use_input == '2':wav_write = input('请您输入需要发送文件名:')transfer_statr = file_transfer(user, wav_write)print('系统反馈:',transfer_statr)elif use_input == '3':sql = sql_out()gain_statr = file_gain(user, sql)print('系统反馈:', gain_statr)else:use_type = '类型不详'pass#获取sql语句处理
def sql_out():# 语句存储sql_text = ''# 循环执行while True:sql = input('请您输入需要发送的语句:').replace("'", "^")try:sql_out = sql.index(';')#print('sql_out', sql_out, type(sql_out))except:sql_out = -1#print('sql_out', sql_out, type(sql_out))if sql_out == -1:sql_text = sql_text + ' ' + sqlelse:sql_text = sql_text + ' ' + sqlbreakreturn sql_text.replace(";", " ")if __name__ == '__main__':# 加载类user = My_Main()# 循环执行while True:use_show()use_input = input('请您输入选择的编码:')use_type = use_choice(use_input)print('您选择需要处理的类型:',use_type)#类型不详就断开if use_type=='类型不详':breakuse_decision(user, use_input)