【Linux网络编程】Reactor模式与Proactor模式
Reactor模式
Reactor 模式是指主线程即 IO 处理单元只负责监听文件描述符上是否有事件发生,有则立刻将该事件通知给工作线程即逻辑单元,除此之外,主线程不做任何其它实质性的动作。读写数据,接受新的连接,以及处理客户请求均在工作线程中完成。
同步 IO 模型实现 Reactor 模式的工作流程:
- 主线程往 epoll 内核事件表中注册 socket 上的读就绪事件。
- 主线程调用 epoll_wait 等待 socket 上有数据可读。
- 当 socket 上有数据可读时,epoll_wait 通知主线程,主线程则将 socket 可读事件放入请求队列。
- 睡眠在请求队列上的工作线程被唤醒,它从 socket 上读取数据,并处理客户请求,然后往 epoll 内核事件表中注册写就绪事件。
- 主线程调用 epoll_wait 等待 socket 可写。
- 当 socket 可写时,epoll_wait 通知主线程,主线程将 socket 可写事件放入请求队列中。
- 睡眠在请求队列上的某个工作线程被唤醒,它往 socket 上写入服务器处理客户请求的结果。
如下为 Reactor 模式的工作流程图:
工作线程从请求队列中取出事件后,将根据事件的类型来决定如何处理它:对于可读事件则执行读数据和处理请求的操作;对于可写事件则执行写数据的操作。因此并没有所谓的“读工作线程”和“写工作线程”。
Proactor 模式
与 Reactor 模式不同,Proactor 模式将所有的 IO 操作交由主线程与内核去处理,工作线程仅仅负责业务逻辑。
异步 IO 模型(以 aio_read 与 aio_write 为例)实现 Proactor 模式的工作流程:
- 主线程调用 aio_read 函数向内核注册 socket 读完成事件,并告诉内核用户的读缓冲区的位置,以及读操作完成时如何通知应用程序。
- 主线程继续处理其它逻辑。
- 当 socket 上的数据被读入用户缓冲区后,内核将向应用程序发送一个信号,以通知应用程序数据已经可用。
- 应用程序预先定义好的信号处理函数选择一个工作线程来处理客户请求。工作线程处理完客户请求后,调用 aio_write 函数向内核注册 socket 上的写完成事件,并告诉内核用户写缓冲区的位置,以及写操作完成时如何通知应用程序。
- 主线程继续处理其它逻辑。
- 当用户缓冲区中的数据写入 socket 后,内核向应用程序发送一个信号,以通知应用程序数据已经发送完毕。
- 应用程序预先定义好的信号处理函数选择一个工作线程来做善后处理,比如决定是否关闭 socket。
如下为 Proactor 模式的工作流程图:
连接 socket 上的读写事件是通过 aio_read / aio_write 向内核注册的,因此内核将通过信号来向应用程序报告连接 socket 上的读写事件。所以,主线程中的 epoll_wait 调用仅用来检测监听 socket 上的连接请求事件,而不能用来检测连接 socket 上的读写事件。
Reactor 模型代码
#include "webserver.h"WebServer::WebServer(int port, int trigMode, int timeoutMs) : port_(port), timeoutMs_(timeoutMs), isClose_(false), epoller_(new Epoller()) {InitEventMode_(trigMode);if(!InitSocket_()) { isClose_ = true };
}WebServer::~WebServer() {close(listenFd_);isClose_ = true;
}void WebServer::Start() {while (!isClose_) {int eventCnt = epoller_->Wait(timeoutMs_);for (int i = 0; i < eventCnt; ++i) {int fd = epoller_->GetEventFd(i); // 获取事件对应的fduint32_t events = epoller_->GetEvents(i); // 获取事件的类型if (fd == listenFd_) {DealListen_();} else if (events & EPOLLIN) {DealRead_(); // 子线程中执行} else if (events & EPOLLOUT) {DealWrite_(); // 子线程中执行} else if (events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {CloseConn_();} else {perror("Unexpected event");}}}
}void WebServer::DealListen_() {struct sockaddr_in cliaddr;socklen_t len = sizeof(cliaddr);do {int cfd = accept(listenFd_, (struct sockaddr*)&cliaddr, &len);if (cfd < 0) { return ; }// 获取客户端信息char cliIp[16];inet_ntop(AF_INET, &cliaddr.sin_addr.s_addr, cliIp, sizeof(cliIp));unsigned short cliPort = ntohs(cliaddr.sin_port);// 输出客户端的信息printf("client's ip is %s, and port is %d\n", cliIp, cliPort );SetFdNonblock(cfd);epoller_->AddFd(cfd, connEvent_ | EPOLLIN);} while (listenEvent_ & EPOLLET);
}void WebServer::InitEventMode_(int trigMode) {listenEvent_ = EPOLLRDHUP;connEvent_ = EPOLLRDHUP | EPOLLONESHOT; // 注册EPOLLONESHOT,防止多线程同时操作一个socket的情况switch ((trigMode)){case 0:break;case 1:connEvent_ |= EPOLLET;break;case 2:listenEvent_ |= EPOLLET;break;case 3:connEvent_ |= EPOLLET;listenEvent_ |= EPOLLET;break;default:connEvent_ |= EPOLLET;listenEvent_ |= EPOLLET;break;}
}bool WebServer::InitSocket_() {// 创建socketlistenFd_= socket(AF_INET, SOCK_STREAM, 0);if (listenFd_ == -1) {perror("socket");return false;}// 设置端口复用int optval = 1;setsockopt(listenFd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));struct sockaddr_in saddr;saddr.sin_family = AF_INET;saddr.sin_addr.s_addr = INADDR_ANY;saddr.sin_port = htons(port_);// 绑定int ret = bind(listenFd_, (struct sockaddr*)&saddr, sizeof(saddr));if (ret == -1) {perror("bind");return false;}// 监听ret = listen(listenFd_, 128);if (ret == -1) {perror("listen");return false;}// 将listenFd_注册至epoll事件表中ret = epoller_->AddFd(listenFd_, listenEvent_ | EPOLLIN);if (ret == 0) {perror("Add listen error");return false;}// 设置listenFd_非阻塞SetFdNonblock(listenFd_);return true;
}int WebServer::SetFdNonblock(int fd) {assert(fd > 0);return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
}