0、目的
目的比较简单,测试,使用python来提交数据是非常简洁的,修改代码也容易,除了做人工智能,本身也是一个非常好的测试端工具
1、简单的post
一个简单的示例程序,将 headers 内容置为’application/json’,为了演示,每次修改提交数据number的值,向服务器程序的8000端口提交data,for 循环提交10次。
import json
import requests
import time
headers = {'Content-Type': 'application/json'}
data = {'name' : 'one screen','number' : 0,'reg' : 'test'
}
for num in range(1,10):data['number'] = numdatas = json.dumps(data)try:r = requests.post("http://127.0.0.1:8000/post/data", data=datas, headers=headers)print(r.text)except requests.exceptions.ConnectionError:print('connectionError') time.sleep(1)
使用nodejs 写一段服务端代码,简单,直接用express,进行接收数据,nodejs同样是非常翻方便简单的
var express = require('express');
var cors = require('cors');
var app = express();//创建express实例
app.use(cors());//为了解决跨域问题
var http = require('http').Server(app);
//var httpget = require('http');
var bodyParser = require('body-parser');
app.use(bodyParser.json({limit: '1mb'}));
app.use(bodyParser.urlencoded({ extended: true
}));
//var session = require('express-session');
/*
app.use(session({secret: 'secret',resave: true,// don't save session if unmodifiedsaveUninitialized: false,// don't create session until something storedcookie: {maxAge: 1000 * 60 * 10 //过期时间设置(单位毫秒)}
}));
*/
app.use(express.static(__dirname));app.post("/post/data",function(req,res){console.log(req.body);res.send("{ret:1}");
});app.get("/",function(req,res){res.send("{ret:ok}");});http.listen(8000, function () {console.log('listening on *:8000');
});
启动服务端程序,端口8000等待数据
客户端收到数据,打印返回,服务端收到数据打印内容,正常。
2、复杂一点的例子
修改一下headers
USER_AGENTS = ["Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)","Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; fr) Presto/2.9.168 Version/11.52",'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36'
]headers = {'Referer': 'http://127.0.0.1:8000/','User-Agent': random.choice(USER_AGENTS ),'Accept-Language': 'zh-CN,zh;q=0.9',}
我们使用import http.client, urllib.parse,开始写get 和post 程序,以下为代码清单
#coding=utf-8import http.client, urllib.parse
import http.client, urllib.parse
import randomUSER_AGENTS = ["Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)","Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)","Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
]def get_demo(num,keyword):page = urllib.parse.urlencode({'page':num})params = urllib.parse.urlencode({})headers = {'Referer': 'http://127.0.0.1:8000/','User-Agent': random.choice(USER_AGENTS ),'Accept-Language': 'zh-CN,zh;q=0.9',}conn = http.client.HTTPConnection("127.0.0.1",8000, timeout=10)conn.request("GET", "/test/"+str(num), params, headers)r1 = conn.getresponse()#print(r1.read())html = r1.read()data = html.decode('utf-8') # This will return entire content.print(data)content = data.find(keyword)print(content)if content != -1:print('bingo:'+page)else:print('try {},status:{}'.format(page, r1.status))def post_demo():params = urllib.parse.urlencode({'page': 1, 'name': 'qianbo'})headers = {"Content-type": "application/x-www-form-urlencoded","Accept": "application/json"}#conn = http.client.HTTPSConnection("127.0.0.1",8000)conn = http.client.HTTPConnection("127.0.0.1",8000)conn.request("POST", "/post/data", params, headers)response = conn.getresponse()print(response.status, response.reason)if not response.closed:data = response.read()print(data, type(data.decode('utf-8')))conn.close()
def main():get_demo(1,"qianbo")post_demo()if __name__ == '__main__':main()
在main函数中分别调用get 和 post程序,并且打印结果,如果需要https, 我们只要将
conn = http.client.HTTPConnection("127.0.0.1",8000) 改成conn = http.client.HTTPSConnection("127.0.0.1",8000)
服务端的nodejs 代码修改一下,清单如下:
var express = require('express');
var cors = require('cors');
var app = express();//创建express实例
app.use(cors());//为了解决跨域问题
var http = require('http').Server(app);
//var httpget = require('http');
var bodyParser = require('body-parser');
app.use(bodyParser.json({limit: '1mb'}));
app.use(bodyParser.urlencoded({ extended: true
}));app.use(express.static(__dirname));app.post("/post/data",function(req,res){console.log(req.body);res.send("{ret:1}");
});app.get("/",function(req,res){res.send("{ret:1,name:'qianbo'}");});app.get("/test/:page",function(req,res){var page = req.params.page;console.log("page is ",page);res.send("{ret:1,name:'qianbo'}");
});
http.listen(8000, function () {console.log('listening on *:8000');
});
其实就是增加了一个test/:page 接收,方便演示,运行后, 服务端和客户端的结果如下
python非常方便,可以节省很多时间去关心自己的业务和逻辑
说道逻辑,下面我们开始写一段有关业务的代码,以便于更好的示例
3、业务融入
下面我们开始计算业务上的医学心率数据
定义均值和标准差数据函数
def get_average(records):"""平均值"""return sum(records) / len(records)
def get_standard_deviation(records):"""方差 反映一个数据集的离散程度标准差 == 均方差 反映一个数据集的离散程度"""average = get_average(records)print("average:",average)test = sum([(x - average) ** 2 for x in records]) / len(records)return math.sqrt(test)
定义差值均方,反应心率中的变化成为
def get_rmssd(records):"""差值均方得平方根,反应hrv中得变化成分"""len1 = 1len2 = len(records) sum = 0.0for i in range(len1,len2-1):sum += math.pow(records[i] - records[i-1],2)#sum += t*treturn math.sqrt(sum / (len2-1))
定义post函数
def post2():str = json.dumps({'userid':1})print(str)headers = {"Content-type": "application/json","Accept": "text/plain"}conn = http.client.HTTPConnection("127.0.0.1",8000)conn.request('POST', "/hrv", str, headers)response = conn.getresponse()print(response.status, response.reason)data = response.read().decode('utf-8')print(data)conn.close()def post(datain):s = requests.session()#print(s.headers)headers1 = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:54.0) Gecko/20100101 Firefox/54.0','Content-Type': 'application/json'}#json = pd.DataFrame.to_json(datain)json1 = json.dumps(datain) # 转换为jsonresponse = s.post(url1,data=json1, headers=headers1,timeout=10)print(response)print(response.content)
使用python产生发送的数据,简单写一下,模拟业务数据产生
data = {'Id': 116,'Type':3,'Device':'B00007','Data':[]}
nowtime = time.mktime(time.localtime()) - 1000*60 *6 *60 # 往前推一个小时number = 6 #一分钟6个数据,一共60分钟 360个数据
for i in range(number): #x=np.append(x,)data['Data'].insert(0,{'heart':random.randint(70,90), 'Time':nowtime+i*166})d_calc = []
for i in range(10000):d_calc.append(60000/random.randint(70,100))res = get_standard_deviation(d_calc)
print(res)
res = get_rmssd(d_calc)
print(res)
后面就可以使用post 进行发送数据了,比较简单,不做运行了。感谢阅读。