进程池线程池实现TCP高性能并发通信
使用进程池与线程池实现并发服务,为多个客户进行接收和发送消息的服务
代码实现
# 导入进程池
from multiprocessing import Pool, cpu_count
# 导入线程池
from multiprocessing.pool import ThreadPool
from socket import *
import os
from queue import Queuedef send_data(client, addr, q):# 子进程中无法使用input,而且子进程错误不会展示print(f"【send_data】准备向客户{addr}发送数据...")while True:msg = q.get()if not msg:print(f"【send_data】收到关闭通知, 发送功能关闭!")returnclient.send(f"消息 <{msg}> 已收到!".encode("gbk"))def recv_data(client, addr, q):while True:data = client.recv(1024).decode('gbk')q.put(data)# 客户端调用close; data为 '' (网络调试助手有缺陷无法执行close,可以使用浏览器测试)if not data:# 往队列写入None,通知发送消息的子线程关闭,并关闭服务套接字q.put('')client.close()print(f"【recv_data】客户{addr}关闭连接, 接收功能关闭!")breakprint(f"【recv_data】 {addr} 发来消息 : {data}\n")# 进程负责处理连接请求(一个进程跟进一个客户)
def process_connect(client, addr):print(f"由进程 {os.getpid()} 为新客户 {addr} 服务!")# 线程负责处理数据请求(一个线程处理客户的一个需求)t_pool = ThreadPool(2)# 创建一个队列,为接收和发送之间传递消息q = Queue()t_pool.apply_async(send_data, (client, addr, q))t_pool.apply_async(recv_data, (client, addr, q))def main():# 创建tcp监听套接字tcp_server_socket = socket(AF_INET, SOCK_STREAM)tcp_server_socket.bind(("192.168.0.180", 9000))tcp_server_socket.listen(128)# 进程池负责接收连接请求(进程池数与cpu处理器数量一致)pool = Pool(cpu_count())while True:# 等待连接请求,获取服务套接字client_socket, client_addr = tcp_server_socket.accept()pool.apply_async(process_connect, (client_socket, client_addr))if __name__ == '__main__':main()
运行效果: