服务器
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
import jsondata = {'result': 'this is a test'}
host = ('localhost', 8888)class Request(BaseHTTPRequestHandler):def do_GET(self):self.send_response(200)self.send_header('Content-type', 'application/json')self.end_headers()self.wfile.write(json.dumps(data))def do_POST(self):content_length = int(self.headers['Content-Length'])post_data = self.rfile.read(content_length)print("Received POST data: %s" % post_data)self.send_response(200)self.send_header('Content-type', 'application/json')self.end_headers()self.wfile.write(json.dumps(data))if __name__ == '__main__':server = HTTPServer(host, Request)print "Starting server, listen at: %s:%s" % hostserver.serve_forever()
异步测试
import json
import subprocess
import threadingimport requestsdef post_with_request():url = 'http://localhost:8888'data = {'key': 'value'}try:requests.post(url, json=data)except requests.exceptions.RequestException as e:passdef post_with_curl():url = 'http://localhost:8888'data = {'key': 'value'}json_data = json.dumps(data)command = ['curl', '-X', 'POST', '-H', 'Content-Type: application/json', '-d', json_data, url]process = subprocess.Popen(command)# ret_code = process.wait()# out = process.stdout.read()# err = process.stderr.read()if __name__ == '__main__':post_with_curl()