epoll
上次说到了poll,它存在效率问题,因此出现了改进的poll----epoll。
目前epoll是公认的效率最高的多路转接的方案。
快速了解epoll接口
epoll_create:
这个参数其实已经被废弃了。 这个值只要大于0就可以了。
这是用来创建一个epoll模型的。
创建成功了就返回一个文件描述符。失败了返回-1
epoll_wait:
第一个参数就是epoll_create的返回值。后面俩参数是用户定义的缓冲区,用来返回就绪的文件描述符和事件。这里的timeout的单位是毫秒,跟poll那里一样。
这个接口的返回值是:已经就绪的fd的个数。
epoll_event结构体
这里依旧是以位图的形式传递标记位。
epoll_ctl:
第一个参数依旧是epfd,op表示我们要进行哪些操作
看名字也能大概看出来,分别是新增,修改和删除。
epoll原理
在Linux内核中,专门为epoll设计了一套独立的模型
首先在内核中设计了一颗红黑树,它以fd为键值,其结点的值一般是一个结构体,里面包含了要关心的fd,以及这个fd关心的事件,还有一些链接字段等,然后还会有一个等待队列,如果有fd就绪了,就会将结点链入到这个就绪队列中,等待上层来处理,注意:这个结点既可以在红黑树中,也可以同时在就绪队列中。我们用epoll的时候,OS会将一个回调函数注册到底层,底层一旦就绪,就会自动调用这个回调函数,比如将数据向上交付,交给tcp的接受队列,查找红黑树中的fd,然后构建就绪结点插入到就绪队列中。
这样,用户只需要从就绪队列中获取就绪结点就可以了。因此我们把这三个组合起来就是epoll模型。
这样回想起来之间了解的接口,比如epoll_create,它创建的就是一套epoll模型和注册底层的回调机制。但是有没有可能创建了多个epoll模型呢?那么多个模型OS也要管理起来,(比如可以记录红黑树的头结点指针和队列的头结点指针,就可以将这两个结构放在一起。)所以内核会创建一个struct file,把它也当作一个文件,struct file中也存在一个指针指向这个模型,OS再将这个文件添加到进程的文件描述符表里,通过fd来找到,这就解释了为什么epoll_create返回的也是一个文件描述符,因为epoll模型也被接入到了struct file中了。
如下
到这里,我们发现,select&&poll,他俩跟epoll完全不一样。
所以现在对着图来看epoll的接口,就会发现简单的很多了,epoll_create就是为了创建epoll模型,epoll_ctl就是拿着epfd,进行的操作其实就是在修改红黑树。
epoll_wait也是拿着epfd,看就绪队列是否有fd就绪。
另外这里的红黑树的性质,像不像我们在使用select/poll时用户维护的数组?性质是一样的,不过这次是由内核维护的。所以epoll是单独设计的一点,还体现在用户不需要再使用额外的数据结构来管理文件描述符和要关心的事件了。
epoll的优势:
1.检测就绪的时间复杂度是O(1)。因为只要看就绪队列是否为空就可以了。
获取就绪fd的时间复杂度是O(n)。
2.fd和event没有上限。
3.epoll的返回值 n,表示有几个fd就绪了,并且在内核中,会把这些结点一个一个弹出。就绪事件是连续的。这样就不需要用户再进行遍历排除非法的fd了,也就是需要浪费用户额外的时间了。
在进入代码前,可以先了解一下cmake
CMake
cmake是一个自动生成makefile的工具。
在CentOS下可以用
sudo yum install -y cmake
来安装Cmake,如果是Ubuntu的话把yum改成apt就好了。
cmake --version
这个命令来查看cmake的版本。
关于使用
首先要创建一个文件:CMakeLists.txt
因为我的CentOS只能安装到2.8.12.2这样的版本,所以在这里我选择最低的版本为2.8这样的,写好后,我们执行
cmake .
然后就会生成一大堆文件,包括makefile,
vim一下这个Makefile
发现已经帮我们写好了
我们可以直接make
接下来就跟我们熟悉的一样了。
所以其实makefile已经有取代方案了,今后我们面对复杂的项目,使用CMake会简单很多。
epoll的工作模式
epoll有两种工作模式:
LT(Level Triggered): 水平触发
一般epoll的默认工作模式就是LT模式。
也就是说,在LT模式下,如果上层不及时处理数据,LT会一直通知,直到数据全部被处理。
ET(发Edge Triggered):边缘触发
我们以TCP协议为例,如果我们能一次将数据全部读走,那么我们就能给对方通知一个更大的窗口,从概率上让对方能一次发送更多的数据,因此IO效率也可能就更高。
epollecho服务简单实现
首先对epoll进行简单的封装
Epoller.hpp
#pragma once#include "nocopy.hpp"
#include "Log.hpp"
#include <cerrno>
#include <cstring>
#include <sys/epoll.h>class Epoller : nocopy
{static const int size = 128;
public:Epoller(){_epfd = epoll_create(size);if(_epfd == -1){lg(Error,"epoll_create error: %s",strerror(errno));}else{lg(Info,"epoll_create success: %d",_epfd);}}int Epollwait(struct epoll_event revents[],int num){int n = epoll_wait(_epfd,revents,num,-1); // -1表示阻塞等待return n;}int EpollUpdata(int oper,int sock,uint32_t event){int n = 0;if(event == EPOLL_CTL_DEL) // 这里对于删除操作单独处理了{n = epoll_ctl(_epfd,oper,sock,nullptr);if(n != 0){lg(Error,"epoll_ctl delete error");}}else {// EPOLL_CTL_ADD || EPOLL_CTL_MODstruct epoll_event ev;ev.events = event;ev.data.fd = sock; // 为了方便后期它就绪时,我们知道是哪个fd。n = epoll_ctl(_epfd,oper,sock,&ev); // 本质上是对红黑树进行插入或修改结点if(n != 0){lg(Error,"epoll_ctl error");}}return n;}~Epoller(){if(_epfd >= 0)close(_epfd);}
private:int _epfd;int _timeout;
};
EpollServer.hpp
#pragma once#include <iostream>
#include <memory>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Log.hpp"
#include "Epoller.hpp"
#include "nocopy.hpp"uint32_t EVENT_IN = (EPOLLIN);
uint32_t EVENT_OUT = (EPOLLOUT);class EpollServer : nocopy
{const int num = 64;
public:EpollServer(uint16_t port = 8080):_port(port),_listensocket_ptr(new Sock()),_epoller_ptr(new Epoller()){}void Init(){_listensocket_ptr->Socket();_listensocket_ptr->Bind(_port);_listensocket_ptr->Listen();lg(Info,"create listen socket success :%d\n",_listensocket_ptr->Fd());}void Start(){// 首先将listensock添加到epoll中(还有其关心的事件),也就是添加到内核的红黑树中_epoller_ptr->EpollUpdata(EPOLL_CTL_ADD,_listensocket_ptr->Fd(),EVENT_IN);struct epoll_event revs[num]; // num是我们自己定的,不受接口限制while(true){int n = _epoller_ptr->Epollwait(revs,num);if(n > 0){// 有事件就绪了lg(Debug,"event happend, fd is : %d",revs[0].data.fd);Dispatcher(revs,n);}else if(n == 0){lg(Info,"time out ....\n");}else {lg(Error,"epoll_wait eroor\n");}}}void Dispatcher(struct epoll_event* revs,int num){for(int i = 0; i < num; ++i){uint32_t events = revs[i].events;int fd = revs[i].data.fd;if(events & EVENT_IN){if(fd == _listensocket_ptr->Fd()){//新链接Accepter();}else {// 普通读取Recver(fd);}}else // 这里只考虑读取事件了{}}}void Accepter(){std::string clientip;uint16_t clientport;int sock = _listensocket_ptr->Accept(&clientip,&clientport);if(sock < 0){lg(Error,"Accept error, fd is : %d",sock);}// 获取到链接后,不能直接read,而是将fd添加进epoll_epoller_ptr->EpollUpdata(EPOLL_CTL_ADD,sock,EVENT_IN);lg(Info,"get a new link, clientip : %s, clientport: %d",clientip.c_str(),clientport);}void Recver(int fd){// 这里依旧只是一个demo,并没有处理到位char buffer[1024];ssize_t n = read(fd,buffer,sizeof(buffer) - 1);if(n > 0){buffer[n] = 0;std::cout << "get a massge: " << buffer << std::endl;//然后回显给对方std::string echo_str = "server echo $ ";echo_str += buffer;write(fd,echo_str.c_str(),echo_str.size());}else if(n == 0){lg(Info,"client quit, me too,close fd is: %d",fd);// 这里关闭时有一个细节,要先从epoll中移除// 再close,避免先close导致fd非法再从epoll中移除而报错_epoller_ptr->EpollUpdata(EPOLL_CTL_DEL,fd,0);close(fd);}else {lg(Warning,"recver error,close ..");_epoller_ptr->EpollUpdata(EPOLL_CTL_DEL,fd,0);close(fd);}}~EpollServer(){_listensocket_ptr->Close();}private:std::unique_ptr<Sock> _listensocket_ptr;std::unique_ptr<Epoller> _epoller_ptr;uint16_t _port;
};
在读取这里,以TCP为例,因为TCP是面向字节流的,我们并不能保证每次读取都是能读到完整的数据, 如果当前缓冲区里只有半个请求,我们读吗?读了之后放哪呢?在这里我们并没有处理,因此会在Ractor中解决。
我们可以通过单独设计一个类来进行防拷贝
nocopy.hpp
#pragma onceclass nocopy
{
public: nocopy(){}nocopy(const nocopy &) = delete;const nocopy& operator=(const nocopy &) = delete;
};
Reactor
Reactor模式是一种事件驱动的并发编程模型,它解决了在高并发环境下处理大量客户端请求的问题。
Reactor其实是一种半同步半异步的模型,同步是因为调用epoll,要自己等,异步体现在它可以进行回调处理。
代码:
首先我们先对设置非阻塞的函数进行一个封装
Comm.hpp
#pragma once#include <unistd.h>
#include <fcntl.h>void SetNonBlockOrDie(int sock)
{int fl = fcntl(sock,F_GETFL);if(fl < 0)exit(4);fcntl(sock,F_SETFL,fl | O_NONBLOCK);
}
TcpServer.hpp
#pragma once#include <iostream>
#include <string>
#include <memory>
#include <cerrno>
#include <unordered_map>
#include <functional>
#include "Comm.hpp"
#include "Log.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"
#include "Socket.hpp"class Connection;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);const static int g_buffer_size = 128;using func_t = std::function<void(std::weak_ptr<Connection>)>;
using except_func = std::function<void(std::weak_ptr<Connection>)>;// 对每个链接进行管理
class Connection
{
public:Connection(int sock): _sock(sock){}void SetHandler(func_t recv_cb, func_t send_cb, except_func except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}int SockFd(){return _sock;}void AppendInBuffer(const std::string &info){_inbuffer += info;}void AppendOutBuffer(const std::string &info){_outbuffer += info;}std::string &InBuffer(){return _inbuffer;}std::string &OutBuffer(){return _outbuffer;}void SetWeakPtr(std::weak_ptr<TcpServer> tcp_server_ptr){_tcp_server_ptr = tcp_server_ptr;}~Connection(){}private:int _sock;std::string _inbuffer; // 输入缓冲区,但是无法接收二进制流std::string _outbuffer;public:// 回调方法func_t _recv_cb;func_t _send_cb;func_t _except_cb;// 回指指针std::weak_ptr<TcpServer> _tcp_server_ptr;std::string _ip;uint16_t _port;
};class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy
{static const int num = 64;public:TcpServer(uint16_t port, func_t OnMessage): _port(port),_OnMessage(OnMessage),_quit(true),_epoller_ptr(new Epoller()),_listensock_ptr(new Sock()){}void Init(){_listensock_ptr->Socket();SetNonBlockOrDie(_listensock_ptr->Fd());_listensock_ptr->Bind(_port);_listensock_ptr->Listen();lg(Info, "create listen socket success: %d", _listensock_ptr->Fd());Addconnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);}void Addconnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, except_func except_cb,const std::string &ip = "0.0.0.0", uint16_t port = 0){// 1.给sock建立一个Connection对象,并插入到map中std::shared_ptr<Connection> new_connection(new Connection(sock));new_connection->SetWeakPtr(shared_from_this()); // shared_from_this()是返回当前对象的shared_ptr,目的是设置回指new_connection->SetHandler(recv_cb, send_cb, except_cb);new_connection->_ip = ip;new_connection->_port = port;// 2.添加到unordered_map中_connections.insert(std::make_pair(sock, new_connection));// 3.添加对应的fd和事件到epoll中_epoller_ptr->EpollUpdata(EPOLL_CTL_ADD, sock, event);}// 链接管理器void Accepter(std::weak_ptr<Connection> conn){auto connection = conn.lock();while (true){struct sockaddr_in peer;socklen_t len = sizeof(peer);int sock = ::accept(connection->SockFd(), (struct sockaddr *)&peer, &len); // :: 表示使用系统原生的接口if (sock > 0){uint16_t peerport = ntohs(peer.sin_port);char ipbuf[128];inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));lg(Debug, "get a new client,get info[%s : %d],sockfd : %d", ipbuf, peerport, sock);SetNonBlockOrDie(sock);// listensock只要设置recv_cb,而其他sock读写异常都要关心设置Addconnection(sock, EVENT_IN,std::bind(&TcpServer::Recver, this, std::placeholders::_1),std::bind(&TcpServer::Sender, this, std::placeholders::_1),std::bind(&TcpServer::Excepter, this, std::placeholders::_1),ipbuf, peerport);}else{if (errno == EWOULDBLOCK)break;else if (errno == EINTR)continue;elsebreak; // 说明是真出错了}}}// 事件管理器// 这里不需要关心数据的格式,服务器只要IO数据即可,有没有读完,报文的细节交给上层void Recver(std::weak_ptr<Connection> conn){if (conn.expired())return;auto connection = conn.lock();int sock = connection->SockFd();while (true) // 循环读取数据,因为是ET模式{char buffer[g_buffer_size];memset(buffer, 0, sizeof(buffer));int n = recv(sock, buffer, sizeof(buffer) - 1, 0); // 虽然这里flags是0,但是sock已经设置成非阻塞了if (n > 0){connection->AppendInBuffer(buffer);}else if (n == 0){lg(Info, "sockfd : %d,client info %s:%d,quit...", connection->SockFd(), connection->_ip.c_str(), connection->_port);connection->_except_cb(connection);return;}else{if (errno = EWOULDBLOCK)break;if (errno == EINTR)continue;else{lg(Warning, "sockfd: %d,client info %s:%d recv error...", connection->SockFd(), connection->_ip.c_str(), connection->_port);connection->_except_cb(connection);return;}}}// 数据读上来了,但是不一定完整,交给上层处理_OnMessage(connection);}// 发送这里会麻烦一些void Sender(std::weak_ptr<Connection> conn){if (conn.expired())return;auto connection = conn.lock();auto &outbuffer = connection->OutBuffer();while (true){ssize_t n = send(connection->SockFd(), outbuffer.c_str(), sizeof(outbuffer), 0); // 同理,不会再阻塞了if (n > 0){outbuffer.erase(0, n);if (outbuffer.empty())break;}else if (n == 0){return;}else{if (errno = EWOULDBLOCK)break;if (errno == EINTR)continue;else{lg(Warning, "sockfd: %d,client info %s:%d send error...", connection->SockFd(), connection->_ip.c_str(), connection->_port);connection->_except_cb(connection);return;}}}// 如果发送缓冲区被打满了,可能还没发完就退出循环了// 此时才来判断是否要关心写事件就绪if(!outbuffer.empty()){// 开启对写事件的关心EnableEvent(connection->SockFd(),true,true);}else {// 关闭对写事件的关心EnableEvent(connection->SockFd(),true,false);}}void Excepter(std::weak_ptr<Connection> conn){if(conn.expired());auto connection = conn.lock();int fd = connection->SockFd();lg(Warning,"Excepter hander sockfd: %d,client info %s : %d, excepter hander",fd,connection->_ip.c_str(),connection->_port);// 1.移除对该fd的关心_epoller_ptr->EpollUpdata(EPOLL_CTL_DEL,fd,0);// 2.再关闭fdclose(fd);lg(Debug,"close fd done %d ...",fd);// 3.再从unordered_map中移除_connections.erase(fd);}void EnableEvent(int sock,bool readable,bool writeable){uint32_t events = 0;events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);_epoller_ptr->EpollUpdata(EPOLL_CTL_MOD,sock,events);}bool IsConnectionSafe(int fd){auto iter = _connections.find(fd);if(iter == _connections.end())return false;else return true;}void Dispatcher(int timeout){int n = _epoller_ptr->Epollwait(revs,num,timeout);for(int i = 0; i < n; ++i){uint32_t event = revs[i].events;int sock = revs[i].data.fd;//这里只处理读写事件if((event & EPOLLIN) && IsConnectionSafe(sock)){if(_connections[sock]->_recv_cb)_connections[sock]->_recv_cb(_connections[sock]);}if((event & EPOLLOUT) && IsConnectionSafe(sock)){if(_connections[sock]->_send_cb)_connections[sock]->_send_cb(_connections[sock]);}}}void Loop(){_quit = false;while(!_quit){Dispatcher(-1); // 阻塞式}_quit = true;}void PrintConnection(){std::cout << "_connections fd list: ";for (auto &connection : _connections){std::cout << connection.second->SockFd() << ", ";std::cout << "inbuffer: " << connection.second->InBuffer().c_str();}std::cout << std::endl;}~TcpServer(){}private:std::shared_ptr<Epoller> _epoller_ptr; // 内核std::shared_ptr<Sock> _listensock_ptr; // 监听socketstd::unordered_map<int, std::shared_ptr<Connection>> _connections; // 哈希表管理链接struct epoll_event revs[num];uint16_t _port;bool _quit;// 处理上层信息(跟上层的应用场景有关)func_t _OnMessage;
};
其中在写,也就是Sender那里要说明:
select/poll/epoll因为发送缓冲区经常有空间,因此写事件是经常就绪的,如果我们对它进行EPOLLOUT设置关心,EPOLLOUT几乎每次都就绪,server经常返回,浪费CPU资源,
因此:对于读事件,我们设置常关心,对于写事件,按需关心 -> 也就是直接写入,如果写入完成,那么就结束了,不需要关心,如果因为一些原因(比如发送缓冲区满了)而没写完,也就是outbuffer里面还有数据,此时再设置关心,等下次写完了,再去掉关心。
Main.cc
#include <iostream>
#include <functional>
#include <memory>
#include "Log.hpp"
#include "TcpServer.hpp" // 只处理IO
// 这里可以把之前的网络计算器搬过来,用来当作上层应用// for debug
void DefaultOnmessage(std::weak_ptr<Connection> conn)
{if(conn.expired()) return;auto connection_ptr = conn.lock();std::cout << "上层得到了数据: " << connection_ptr->InBuffer() << std::endl;// 在这里可以加入多线程(线程池),那么主线程只负责读取数据,在这里交给其他线程处理数据//std::string response_str = ?// if(response_str.empty()) return;// lg(Debug, "%s", response_str.c_str());// response_str 发送出去// connection_ptr->AppendOutBuffer(response_str);// 正确的理解发送?// connection_ptr->_send_cb(connection_ptr);auto tcpserver = connection_ptr->_tcp_server_ptr.lock();tcpserver->Sender(connection_ptr);
}int main()
{std::shared_ptr<TcpServer> epoll_svr(new TcpServer(8080,DefaultOnmessage));epoll_svr->Init();epoll_svr->Loop();return 0;
}
总之我们的服务仅仅只是处理数据,将读取上来,而并没有做处理,我们也可以加入上层应用,比如之前的网络版本计算器,现在没有加入,导致读取上来的数据是堆积在一起的。
在大型服务器中,我们还可以加入多线程,主线程专门只做连接,和读取数据,其他线程处理数据。
还可以加入链接管理机制,比如用一个小堆,里面放入每个链接的链接时长,对于长时间没有相应的链接,我们可以判定超时而将它关闭,如果在超时前发送了消息,那么就重置它的超时时间。
总之Reactor就是一个反应堆,它是一个半同步半异步模型,说是同步,体现在它也要进行等待,它参与了这个过程,异步体现在它可以将数据交给其他线程处理,然后只将结果返回。
就像打地鼠游戏一样,玩家监视着很多地洞,哪个洞冒出了地鼠,我们玩家就要去处理。