高级IO—poll,epoll,reactor

高级IO—poll,epoll,reactor

文章目录

  • 高级IO—poll,epoll,reactor
    • poll函数
      • poll函数接口
      • poll服务器
    • epoll
      • epoll的系统调用
        • epoll_create
        • epoll_ctl
        • epoll_wait
      • epoll的工作原理
      • epoll的工作方式
        • 水平触发
        • 边缘触发
      • epoll服务器
    • reactor

poll函数

poll函数是一个用于多路复用的系统调用,类似于select函数,用于监视一组文件描述符的状态。

poll函数接口

函数原型

 #include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • fds:指向一个struct pollfd结构数组的指针,每个结构体描述一个要监视的文件描述符及其关注的事件。

  • nfdsfds数组中的结构体数量。

  • timeout:超时时间(以毫秒为单位)。设置为-1,表示阻塞式,设置为0,表示非阻塞,设置为大于0的数,例如设置为5000,表示阻塞5秒,5秒后非阻塞返回一次。

struct pollfd结构体定义如下:

struct pollfd {int fd;       // 文件描述符short events; // 关注的事件short revents; // 实际发生的事件-short-整数
};
  • event字段用于设置关注的事件,可以使用如下宏:通过event&宏事件可以将事件添加到event中

image-20231124171956283

  • revent字段用于返回实际发生的事件,也可以使用上述宏进行判断。通过revent|宏事件可以得知revent中是否包含了该就绪事件

  • poll函数的返回值是就绪文件描述符的数量,返回值大于0表示已经有文件描述符就绪,返回值等于0表示没有文件描述符就绪,返回值小于0表示出错,可以使用perror输出错误信息。

总结一下:

  1. 在poll函数的pollfd结构体中,有event参数保存用户需要OS关心的事件,有revent参数保存OS告诉用户就绪的事件,即做到了输入输出分离,不需要像select函数借助第三方数组对sock进行管理。
  2. poll函数等待文件描述符理论上没有上限。由于参数fds是一个动态数组,并不是一个确定的结构。不同于位图,动态数组可以动态扩容。且动态数组的大小取决于用户传入的nfds。

poll服务器

现对select服务器套用poll函数,改造成poll服务器。只需要对实现业务hpp改造即可

pollserver.hpp

#pragma once#include <iostream>
#include <sys/select.h>
#include <string>
#include <functional>
#include<poll.h>
#include "Sock.hpp"using namespace std;namespace Poll_sv
{static const int defaultport = 8080;         // 默认端口号static const int defaultfd = -1;             // 默认套接字标志static  const int fdnum=2048;//设置文件描述符的数量using func_t = function<string(const string &)>;class PollServer{public:PollServer(func_t f, int port = defaultport) : _func(f), _port(port), _listensock(-1),_rfds(nullptr){}void initServer(){// 获取套接字_listensock = Sock::Socket();cout << "Sock success" << endl;// 绑定网络信息Sock::Bind(_listensock, _port);cout << "Bind success" << endl;// 把套接字设置为监听状态Sock::Listen(_listensock);cout << "Listen success" << endl;_rfds=new struct pollfd[fdnum];//指针指向一个成员是poll结构体的数组for(int i=0;i<fdnum;i++){_rfds[i].fd=defaultfd;_rfds[i].events=0;_rfds[i].revents=0;}_rfds[0].fd=_listensock;_rfds[0].events=POLLIN;//       cout << "initServer" << endl;}void Print(){cout << "now using socket: ";for (int i = 0; i < fdnum; i++){if(_rfds[i].fd!=defaultfd)cout<<_rfds[i].fd<<" ";//打印正在使用的fd}cout << endl;}void Accpter(int lsock){//  logMessage(DEBUG, "Accepter begin");string clientip;uint16_t clientport = 0;int sock = Sock::Accpet(lsock, &clientip, &clientport); // 若成功返回,返回一个用于通信的套接字if (sock < 0)return;logMessage(NORMAL, "accept success [%s:%d]", clientip.c_str(), clientport);int i=0;for(;i<fdnum;i++){if(_rfds[i].fd!=defaultfd)//这里是找到默认的位置,给后续需要使用的文件描述符用,因此是跳过已经被使用的位置continue;else break;}if(i==fdnum){close(sock);logMessage(WARNING,"fd full,please wait");}else{_rfds[i].fd=sock;_rfds[i].events=POLLIN;_rfds[i].revents=0;logMessage(NORMAL,"sock has set in rfds");}Print();//     logMessage(DEBUG, "Accepter end");}void Recver(int pos){//   logMessage(DEBUG, "Recver begin");char buffer[1024];ssize_t s = recv(_rfds[pos].fd, buffer, sizeof(buffer) - 1, 0);if (s > 0){buffer[s] = 0;cout << "client# " << buffer << endl;}else if (s == 0){close(_rfds[pos].fd);               // 关闭该套接字,关闭通信通道_rfds[pos].fd = defaultfd; // 将数组中的该套接字清除logMessage(NORMAL, "client quit");return;}else{close(_rfds[pos].fd);_rfds[pos].fd = defaultfd; // 将数组中的该套接字清除logMessage(ERROR, "recv error");return;}// 将客户端发来的数据原样写回去string resp = _func(buffer);write(_rfds[pos].fd, resp.c_str(), resp.size()); // 写回去//     logMessage(DEBUG, "Recever end");}void Handlerop(){for (int i = 0; i < fdnum; i++){if (_rfds[i].fd == defaultfd)//fd没有被设置则跳过continue;if(!(_rfds[i].events&POLLIN)) continue;//结构体不是被指定标志位POLLIN设置过则跳过if (_rfds[i].fd==_listensock&&_rfds[i].revents&POLLIN)// 此时i对应的数组位置是拿到连接的文件描述符,意味着在底层连接已经拿到,等待上层提取{Accpter(_listensock);}else if (_rfds[i].revents&POLLIN) // 此时存在数组内的对应套接字都是底层读资源就绪{Recver(i);}else{}}}void Start(){int timeout=-1;for(;;){//     cout<<"poll ready"<<endl;int n=poll(_rfds,fdnum,timeout);//     cout<<"poll finished"<<endl;switch(n){case 0:logMessage(NORMAL,"timeout...");break;case -1:logMessage(WARNING,"poll error");break;default:logMessage(NORMAL,"poll success");Handlerop();break;}}}~PollServer(){if (_listensock < 0) // 为什么是小于0?close(_listensock);if(_rfds!=nullptr)delete[]_rfds;}private:int _port;int _listensock;struct pollfd* _rfds;//指向poll结构体的指针func_t _func;};
}

image-20231124213444016

总结一下:

  1. poll函数通过输入输出分离,避免了借用第三方数组来记录sock和事件。
  2. poll解决了select具有管理文件描述符数量上限的问题。
  3. poll依旧存在遍历问题。由于检查就绪事件依旧是需要遍历整个数组,即时间复杂度为O(N)。

epoll模型可以解决遍历问题,使得检查就绪事件的时间复杂度优化到O(1)。

epoll

epoll的系统调用

epoll_create

epoll_create用于创建一个epoll模型

函数原型

int epoll_create(int size);
  • size 指定了 epoll 实例能够同时监视的文件描述符的数量上限。
  • 返回值为一个非负整数的文件描述符epollfd,表示创建的epoll模型对象。创建失败,返回值为-1。可以传递epollfd给epoll_ctl函数像epoll模型中添加、修改、删除需要监视的文件描述符以及事件。
  • 注意一下:使用epoll模型后,需要关闭epollfd。
epoll_ctl

epoll_ctl 函数用于向 epoll 实例中添加、修改或删除要监视的文件描述符,并设置关注的事件。

函数原型

#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • epfd:epoll 模型的文件描述符,由 epoll_create 函数返回。
  • op:操作类型,可以是这些值:
    EPOLL_CTL_ADD:将文件描述符 fd 添加到 epoll 模型中。
    EPOLL_CTL_MOD:修改已经添加到 epoll模型中的文件描述符 fd 的关注事件。
    EPOLL_CTL_DEL:从 epoll模型中删除文件描述符 fd
  • fd:要添加、修改或删除的文件描述符。
  • event:指向 struct epoll_event 结构体的指针,用于设置关注的事件。

struct epoll_event结构体定义如下:

struct epoll_event {_uint32_t events;  // 关注的事件epoll_data_t data;  // 用户数据
};
  • events字段用于设置关注的事件。可以使用以下宏设置:
  • EPOLLIN:可读事件。
  • EPOLLOUT:可写事件。
  • EPOLLPRI:紧急事件。
  • EPOLLERR:错误事件。
  • EPOLLHUP:挂起事件。
  • EPOLLET:边缘触发模式。
  • EPOLLONESHOT:一次性事件。

epoll_data_t结构定义如下:

typedef union epoll_data
{void *ptr;int fd;uint32_t u32;uint64_t u64;
} epoll_data_t;
  • fd是用户传递需要监管的文件描述符。

  • 调用成功返回0,失败返回-1。

epoll_wait

epoll_wait函数用于收集epoll模型中已经就绪的事件,并将就绪事件的个数返回

函数原型

#include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  • epfd:epoll 实例的文件描述符,由 epoll_create 函数返回。
  • events:指向 struct epoll_event 结构体数组的指针,用于存储就绪的事件。
  • maxeventsevents 数组的大小,表示最多可以存储多少个事件。
  • timeout:等待事件的超时时间,单位为毫秒。设置为-1,表示阻塞式,设置为0,表示非阻塞,设置为大于0的数,例如设置为5000,表示阻塞5秒,5秒后非阻塞返回一次。
  • 函数的返回值表示就绪的事件数量,返回值大于0表示已经有事件就绪,返回值等于0表示超时返回,无事发生。返回值小于0表示调用出错。

epoll的工作原理

数据本身只能按照输入设备—内存—CPU—内存—输出设备的方向流动。

image-20231125164652720

  1. 网络数据到达主机时,是先到达网卡,即网课外设。CPU有很多针脚,外设虽然不会与CPU有直接的数据流通,但外设可以将信号直接发送到CPU的阵脚上(硬件中断)。外设通过中断设备,将电子信号发送到CPU上。CPU会将该信号转发到中断向量表,根据信号的值找到对应的表中位置(下标),该表是一张函数指针数组,根据指针能够调用驱动方法,驱动方法将数据拷贝到内存上,即数据从硬件传输到了OS中。

  2. 在OS中,会以红黑树的方式管理sock和events。每个节点上都有sock和event,当然还有left指针和right指针。红黑树的优点是查找效率高。当用户告诉内核那些事件需要被关心时,OS会将需要关心的事件、sock放到该红黑树当中。当事件就绪,OS会将红黑树上的节点添加到就绪队列中,该就绪队列中的成员表示内核需要告诉用户,那些事件已经就绪,等待上层将该事件资源取走。实际上,一个节点即能存在于红黑树中,也能存在于就绪队列中。

image-20231125171013425

总结一下:

image-20231125172634228

网络数据到来后,外设通过信号中断将数据拷贝到内存上。细致的说,是底层收到数据后,贯穿网络协议栈向上交付数据,可以把红黑树的节点看作成文件描述符sock,根据sock找到对应的文件结构体struct file,该结构体内有指针指向接收缓存区,然后将数据填充到该文件接收缓冲区中。随后调用struct file中的回调方法(void* private data指针指向一个回调函数)-回调机制,该回调方法会将红黑树中的节点添加到就绪队列当中,表示该事件已经就绪,通知用户来取走数据。

image-20231126152314783

红黑树、就绪队列、epoll管理的文件struct file以及部分网络协议栈一整套可以认作成一个epoll模型。进程可以通过文件描述符表找到epoll_create的返回值epollfd,epollfd会指向自己的struct file,在struct file中有关联指针能够找到对应的epoll模型。

重新看待epoll的相关接口。操作系统需要提供接口该上层使用。

 #include <sys/epoll.h>
int epoll_create(int size);

epoll_create创建epoll模型,并返回一个epollfd供上层使用。

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_ctl根据提供的epollfd找到对应的epoll模型,根据提供的fd和event增删改红黑树中的节点。

int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

epoll_wait根据epollfd找到对应的epoll模型,根据提供的events、maxevents、timeout管理就绪队列中的节点。而epoll_wait检测就绪事件,只需要检测已经就绪的事件,不需要遍历检测整个需要管理的事件表,只需要将就绪的事件(资源)从内核层拷贝到用户层。因此遍历的事件复杂度为O(1)。

其次,epoll_wait会按照传入的顺序,依次放回到就绪事件数组中。根据队列先进先出的特性,若就绪队列中有很多数据节点,一次性拿不完,可以下一次调用epoll_wait再拿。

而伴随epoll模型的创建的辅助数组,就是该管理需要关心事件的红黑树。红黑树节点是以kv形式存在,由于sock是不会重复的,因此sock作为key。

epoll相比于selectpoll,将大部分管理任务分配给操作系统负责,也因此epoll的接口相比较简单。

epoll的工作方式

前面都有提到select、poll、epoll在事件就绪的时候通知上层将事件处理。事件就绪可以认为底层的IO条件满足,可以进行某种IO行为,epoll就会通知上层进行这种IO行为将底层的数据提取走。

epoll有2种通知策略,即水平触发(LT)和边缘触发(ET)

水平触发
  1. 水平触发为Level Triggered,简称LT。epoll默认状态下是LT工作策略。

  2. 水平触发关心的是缓冲区的状态,当缓冲区可读的时候,就会一直发出通知,也就是当缓冲区中只要有数据就会发出通知,知道上层将缓冲区的所有数据读完。可以认为你在做老师布置的作业,写了但没写完的情况下,老师就会一直通知你写作业,直到作业完全写完。

  3. LT的优势在于:可以在读取数据的时候只读取一部分,在第二次调用epoll_wait时立刻返回并通知上层将底层数据读走。

  4. 支持阻塞读写和阻塞读写。‘

边缘触发
  1. 边缘触发为Edge Triggered。简称ET。
  2. 边缘触发关心的是缓冲区状态的变化,当缓冲区状态发生变化的时候才会发出通知,比如缓冲区中来了新的数据。底层有数据,ET模式下epoll只会通知上层一次,后续缓冲区来了新的数据,epoll才会再次通知。
  3. ET优势在于:ET是一次性通知的方式,倒逼上层尽量做到一次性将数据读完。其次是尽可能一次性读取多的数据,从而使得接收缓冲区可容纳下一次数据的空间尽可能的大,那么接收方的接收能力自然就强,能够告诉发送数据方:接收方的滑动窗口较大,让对方更新出更大的滑动窗口,提高数据发送的效率。
  4. ET模式下,文件描述符要求是非阻塞的。由于底层的事件到达增多,OS才会通知上层将数据取走,因此用户提取数据时,尽量调用一次read,recv就把数据取完,而为了避免一次性调用函数没有读完,就需要循环调用读取函数,当调用读取函数直到读取不到数据才算作读完数据。若fd是阻塞的,那么读取函数进行最后一次读取时读取不到数据,就会阻塞,因此fd需要是非阻塞的,读取不到数据直接返回。

LT模式下,fd可以是阻塞的也可以是非阻塞的。LT可以模拟ET工作方式。

epoll服务器

epollserver.cc

#include<iostream>
#include<functional>
#include<vector>
#include<memory>
#include"err.hpp"
#include"epollserver.hpp"
using namespace std;
using namespace EPoll_sv;// static void Usage(string proc)
// {
//     cerr<<"Usage:\n\t"<<proc<<" port "<<"\n\n";
// }string resp(const string& s)
{return s;
}int main(int argc,char* argv[])
{// if(argc!=2)// {//     Usage(argv[0]);//     exit(USAGE_ERR);// }unique_ptr<EpollServer> epolsv(new EpollServer(resp));epolsv->initServer();epolsv->start();return 0;
}

log.hpp

#pragma once#include <iostream>
#include <string>
#include<ctime>
#include <sys/types.h>#include <unistd.h>#include <stdio.h>
#include <stdarg.h>
using namespace std;
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4#define NUM 1024
#define LOG_STR "./logstr.txt"
#define LOG_ERR "./log.err"enum
{USAGE_ERR = 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR,EPOLL_CREATE_ERR
};
const char* to_str(int level)
{switch(level){case DEBUG: return "DEBUG";case NORMAL: return "NORMAL";case WARNING: return "WARNING";case ERROR: return "ERROR";case FATAL: return "FATAL";default: return nullptr;}
}void logMessage(int level, const char* format,...)
{char logprestr[NUM];
snprintf(logprestr,sizeof(logprestr),"[%s][%ld][%d]",to_str(level),(long int)time(nullptr),getpid());char logeldstr[NUM];
va_list arg;
va_start(arg,format); 
vsnprintf(logeldstr,sizeof(logeldstr),format,arg);//arg是logmessage函数列表中的...cout<<logprestr<<logeldstr<<endl;}

Sock.hpp

#pragma once#include<iostream>
#include<string>
#include<cstring>
#include<sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"class Sock
{const static int backlog=32;public:static int Socket(){int sock=socket(AF_INET,SOCK_STREAM,0);//创建套接字if(sock<0)//创建失败{logMessage(FATAL,"create sock error");exit(SOCKET_ERR);}//创建成功logMessage(NORMAL,"create sock success");int opt=1;setsockopt(sock,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));//允许套接字关闭后立刻重启return sock;} static void Bind(int sock,int port){//绑定自己的网络信息struct sockaddr_in local;memset(&local,0,sizeof(local));//将结构体清空local.sin_family=AF_INET;//添加协议local.sin_port=htons(port);//添加端口号local.sin_addr.s_addr=htons(INADDR_ANY);//不绑定指定IP,可以接收任意IP主机发送来的数据//将本地设置的信息绑定到网络协议栈if (bind(sock,(struct sockaddr*)&local,sizeof(local))<0){logMessage(FATAL,"bind socket error");exit(BIND_ERR);}logMessage(NORMAL,"bind socket success");}static void Listen(int sock)//将套接字设置为监听{if(listen(sock,0)<0){logMessage(FATAL,"listen socket error");exit(LISTEN_ERR);}logMessage(NORMAL,"listen socket success");}static int Accpet(int listensock,string * clientip,uint16_t* clientport){struct sockaddr_in cli;socklen_t len= sizeof(cli);int sock=accept(listensock,(struct sockaddr*)&cli,&len);if(sock<0){logMessage(FATAL,"accept error");//这里accept失败为什么不退出}else{logMessage(NORMAL,"accept a new link,get new sock : %d",sock);*clientip=inet_ntoa(cli.sin_addr);*clientport=ntohs(cli.sin_port);}return sock;}
};
  • 默认的LT模式下的epollserver

epollserver.hpp

#pragma once#include <iostream>
#include <sys/select.h>
#include <string>
#include <functional>
#include<sys/epoll.h>
#include"err.hpp"
#include "Sock.hpp"using namespace std;namespace EPoll_sv
{static const int defaultport = 8080;static const int size = 128;static const int defaultvalue = -1;static const int defaultnum = 64;using func_t =function<string(const string&)>;class EpollServer{public:EpollServer(func_t fun,const int port = defaultport) :_num(defaultnum), _port(port), _listensock(defaultvalue), _epfd(defaultvalue),_func(fun){}void handlerEvent(int evs){for(int i=0;i<evs;i++)//直接遍历已经就绪的事件{uint32_t event=_reves[i].events;int sock=_reves[i].data.fd;if(sock==_listensock&&(event&EPOLLIN))//当前是将连接拿上应用层的文件描述符{string clientip;uint16_t clientport;int fd=Sock::Accpet(_listensock,&clientip,&clientport);if(fd<0){logMessage(NORMAL,"accpet sock error");continue;}struct epoll_event ev;ev.data.fd=fd;ev.events=EPOLLIN;epoll_ctl(_epfd,EPOLL_CTL_ADD,fd,&ev);}else if(event&EPOLLIN)//当前是通信的事件{char buffer[1024];int n=recv(sock,buffer,sizeof(buffer)-1,0);if(n>0){buffer[n]=0;cout<<"client# "<<buffer<<endl;string resp=_func(buffer);send(sock,resp.c_str(),resp.size(),0);//将数据发送回去给客户端}else if(n==0){epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);//将sock从epoll中的结构中移除close(sock);//关闭socklogMessage(NORMAL,"client quit");}else{epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);//将sock从epoll中的结构中移除close(sock);//关闭socklogMessage(ERROR,"communicate error");}}else{}}logMessage(DEBUG,"handlerEvent out");}void initServer(){// 获取套接字_listensock = Sock::Socket();cout << "Sock success" << endl;// 绑定网络信息Sock::Bind(_listensock, _port);cout << "Bind success" << endl;// 把套接字设置为监听状态Sock::Listen(_listensock);cout << "Listen success" << endl;_epfd=epoll_create(size);//调用成功返回一个epoll文件描述符,size表示是需要监听的文件描述符的数量if(_epfd<0){logMessage(FATAL,"epoll_create error");exit(EPOLL_CREATE_ERR);}//将listensock添加到epoll模型中struct epoll_event epev;epev.events=EPOLLIN;epev.data.fd=_listensock;epoll_ctl(_epfd,EPOLL_CTL_ADD,_listensock,&epev);//申请就绪时间的空间_reves=new struct epoll_event[_num];//申请一块空间,内含_num个事件数logMessage(NORMAL, "init server success");}void start(){//等待就绪事件int timeout=-1;for(;;){logMessage(DEBUG,"epoll_wait ready");int re=epoll_wait(_epfd,_reves,_num,timeout);logMessage(DEBUG,"epoll_wait end");switch (re){case 0://0个事件就绪,即超时重传logMessage(NORMAL,"timeout...");break;case -1://epoll_wait函数调用失败logMessage(ERROR,"epoll_wait error,code: %d,errstring: %s",errno,strerror(errno));    default://到这里时返回值都大于0,即re为已经就绪的事件数logMessage(NORMAL,"wait incident success");// handlerEvent(re);break;}}}~EpollServer(){if(_listensock!=defaultvalue){close(_listensock);}if(_epfd!=defaultvalue){close(_epfd);}if(_reves!=nullptr){delete[]_reves;}}private:int _port;int _listensock;int _epfd;struct epoll_event* _reves;int _num;//事件数func_t _func;//外部传递进来的函数};
}

image-20231126190327483

客户端连接上但不处理就会一直通知。

  • ET模式下的epollserver服务器

epollserver.hpp

#pragma once#include <iostream>
#include <sys/select.h>
#include <string>
#include <functional>
#include<sys/epoll.h>
#include<unistd.h>
#include<fcntl.h>
#include"err.hpp"
#include "Sock.hpp"using namespace std;namespace EPoll_sv
{static const int defaultport = 8080;static const int size = 128;static const int defaultvalue = -1;static const int defaultnum = 64;using func_t =function<string(const string&)>;class EpollServer{public:EpollServer(func_t fun,const int port = defaultport) :_num(defaultnum), _port(port), _listensock(defaultvalue), _epfd(defaultvalue),_func(fun){}void handlerEvent(int evs){for(int i=0;i<evs;i++)//直接遍历已经就绪的事件{uint32_t event=_reves[i].events;int sock=_reves[i].data.fd;if(event&EPOLLET){Sock::setNonBlock(sock);}if(sock==_listensock&&(event&EPOLLIN))//当前是将连接拿上应用层的文件描述符{string clientip;uint16_t clientport;int fd=Sock::Accpet(_listensock,&clientip,&clientport);if(fd<0){logMessage(NORMAL,"accpet sock error");continue;}struct epoll_event ev;ev.data.fd=fd;ev.events=EPOLLIN|EPOLLET;epoll_ctl(_epfd,EPOLL_CTL_ADD,fd,&ev);}else if(event&EPOLLIN)//当前是通信的事件{char buffer[1024];int n=recv(sock,buffer,sizeof(buffer)-1,0);if(n>0){buffer[n]=0;cout<<"client# "<<buffer<<endl;string resp=_func(buffer);send(sock,resp.c_str(),resp.size(),0);//将数据发送回去给客户端}else if(n==0){epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);//将sock从epoll中的结构中移除close(sock);//关闭socklogMessage(NORMAL,"client quit");}else{epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);//将sock从epoll中的结构中移除close(sock);//关闭socklogMessage(ERROR,"communicate error");}}else{}}//      logMessage(DEBUG,"handlerEvent out");}void initServer(){// 获取套接字_listensock = Sock::Socket();cout << "Sock success" << endl;// 绑定网络信息Sock::Bind(_listensock, _port);cout << "Bind success" << endl;// 把套接字设置为监听状态Sock::Listen(_listensock);cout << "Listen success" << endl;_epfd=epoll_create(size);//调用成功返回一个epoll文件描述符,size表示是需要监听的文件描述符的数量if(_epfd<0){logMessage(FATAL,"epoll_create error");exit(EPOLL_CREATE_ERR);}//将listensock添加到epoll模型中struct epoll_event epev;epev.events=EPOLLIN|EPOLLET;epev.data.fd=_listensock;epoll_ctl(_epfd,EPOLL_CTL_ADD,_listensock,&epev);//申请就绪时间的空间_reves=new struct epoll_event[_num];//申请一块空间,内含_num个事件数logMessage(NORMAL, "init server success");}void start(){//等待就绪事件int timeout=-1;for(;;){sleep(1);logMessage(DEBUG,"epoll_wait ready");int re=epoll_wait(_epfd,_reves,_num,timeout);logMessage(DEBUG,"epoll_wait end");switch (re){case 0://0个事件就绪,即超时重传logMessage(NORMAL,"timeout...");break;case -1://epoll_wait函数调用失败logMessage(ERROR,"epoll_wait error,code: %d,errstring: %s",errno,strerror(errno));    default://到这里时返回值都大于0,即re为已经就绪的事件数logMessage(NORMAL,"wait incident success");// handlerEvent(re);break;}}}~EpollServer(){if(_listensock!=defaultvalue){close(_listensock);}if(_epfd!=defaultvalue){close(_epfd);}if(_reves!=nullptr){delete[]_reves;}}private:int _port;int _listensock;int _epfd;struct epoll_event* _reves;int _num;//事件数func_t _func;//外部传递进来的函数};
}static void setNonBlock(int fd)//把文件描述符设置为非阻塞
{int n=fcntl(fd,F_GETFL);//获取文件描述符的状态,正常返回非-1的标志位,出错返回-1if(n<0){cerr<<"fcntl :"<<strerror(errno)<<endl;return ;}fcntl(fd,F_SETFL,n|O_NONBLOCK);//对文件描述符的状态进行设置,设置为非阻塞状态
}
  • 需要将文件描述符都设置成EPOLLET模式,在处理事件函数内,若sock是EPOLLET模式,就调用setNonBlock将该sock设置成非阻塞。

Sock.hpp

#pragma once#include<iostream>
#include<string>
#include<cstring>
#include<sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"class Sock
{const static int backlog=32;public:static void setNonBlock(int fd)//把文件描述符设置为非阻塞
{int n=fcntl(fd,F_GETFL);//获取文件描述符的状态,正常返回非-1的标志位,出错返回-1if(n<0){cerr<<"fcntl :"<<strerror(errno)<<endl;return ;}fcntl(fd,F_SETFL,n|O_NONBLOCK);//对文件描述符的状态进行设置,设置为非阻塞状态
}static int Socket(){int sock=socket(AF_INET,SOCK_STREAM,0);//创建套接字if(sock<0)//创建失败{logMessage(FATAL,"create sock error");exit(SOCKET_ERR);}//创建成功logMessage(NORMAL,"create sock success");int opt=1;setsockopt(sock,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));//允许套接字关闭后立刻重启return sock;} static void Bind(int sock,int port){//绑定自己的网络信息struct sockaddr_in local;memset(&local,0,sizeof(local));//将结构体清空local.sin_family=AF_INET;//添加协议local.sin_port=htons(port);//添加端口号local.sin_addr.s_addr=htons(INADDR_ANY);//不绑定指定IP,可以接收任意IP主机发送来的数据//将本地设置的信息绑定到网络协议栈if (bind(sock,(struct sockaddr*)&local,sizeof(local))<0){logMessage(FATAL,"bind socket error");exit(BIND_ERR);}logMessage(NORMAL,"bind socket success");}static void Listen(int sock)//将套接字设置为监听{if(listen(sock,0)<0){logMessage(FATAL,"listen socket error");exit(LISTEN_ERR);}logMessage(NORMAL,"listen socket success");}static int Accpet(int listensock,string * clientip,uint16_t* clientport){struct sockaddr_in cli;socklen_t len= sizeof(cli);int sock=accept(listensock,(struct sockaddr*)&cli,&len);if(sock<0){logMessage(FATAL,"accept error");//这里accept失败为什么不退出}else{logMessage(NORMAL,"accept a new link,get new sock : %d",sock);*clientip=inet_ntoa(cli.sin_addr);*clientport=ntohs(cli.sin_port);}return sock;}
};
  • Sock类内新增了setNonBlock函数,用于将传导过来的sock设置成非阻塞。

image-20231126191654022

可以看到底层事件就绪时,ET模式下的epollserver只通知了上层一次。

reactor

通过Reactor对底层事件的关心,底层有就绪事件,就通知上层的Connection类对象调用相关函数处理就绪事件。

image-20231128202431892

Connection类

using func_t = function<void(Connection *)>;class Connection{public:Connection(int sock, Tcpserver *tcps) : _consock(sock), _tcps(tcps){}~Connection() {}void Register(func_t r, func_t w, func_t e) // 注册方法表,将方法传递进来{_reader = r;_writer = w;_excepter = e;}void Close(){close(_consock);}public:string _inbuffer;  // 输入缓冲区string _outbuffer; // 输出缓冲区func_t _reader;    // 读操作func_t _writer;    // 写操作func_t _excepter;  // 处理异常操作int _consock;Tcpserver *_tcps; // 指向Tcpserver对象的指针};
  1. 每一个文件描述符需要配备独自的堆上的接收缓冲区和输出缓冲区。先描述再组织,因此创建了结构体Connection,结构体内含文件描述符,及其配备的输出缓冲区,接收缓冲区,该缓冲区类型为string,当前该缓冲区只能处理字符串。三个回调函数。三个回调方法通过外部传参。回调方法分别是读事件方法、写事件方法、异常事件方法。
  2. 由于Connection对象内的回调函数是Tcpserver对象赋予的,因此需要先了解一下三个回调函数。

Accepter:负责获取连接,并将新连接添加到Connection对象内。

 void Accepter(Connection *con) // 针对listenfd,将获取到的连接从底层拿到应用层{for (;;){logMessage(DEBUG, "enter Accepter");string clientip;uint16_t clientport;int err = 0;int sock = _sock.Accpet(&clientip, &clientport, &err); // 获取成功,返回新的文件描述符用于通信,客户端的ip和port通过参数返回if (sock > 0){// 连接拿上来了,将fd添加到con对象中AddConnection(sock, EPOLLIN | EPOLLET,bind(&Tcpserver::Recver, this, placeholders::_1),bind(&Tcpserver::Sender, this, placeholders::_1),bind(&Tcpserver::Excepter, this, placeholders::_1));logMessage(DEBUG, "get new link,[%s:%d]", clientip.c_str(), clientport);}else{if (err == EAGAIN || err == EWOULDBLOCK)break; // 读完了else if (err == EINTR)continue; // 因为中断,继续读elsebreak; // 错误}}}

Recver:负责读取底层的数据

  void Recver(Connection *con) // 读事件{char buffer[1024];while (true){ssize_t i = recv(con->_consock, buffer, sizeof(buffer) - 1, 0);if (i > 0){buffer[i] = 0;con->_inbuffer += buffer;//每次读到的数据放到配套的缓冲区内logMessage(DEBUG, "recv str: %s", con->_inbuffer.c_str());_func(con);}else if (i == 0) // 断开连接,异常处理{if (con->_excepter){con->_excepter(con);return;}}else{if (errno == EAGAIN || errno == EWOULDBLOCK){break; // 读完了}else if (errno == EINTR) // 因信号中断,继续读{continue;}else{if (con->_excepter){con->_excepter(con);return;}}}}}
  • 将每次读取到的数据填充到配备的接收缓冲区,读到完整报文后,调用_func函数处理数据。

Sender:将sock配备的输出缓冲区内的数据发回给客户端。

void Sender(Connection *con) // 写事件{while (true){ssize_t i = send(con->_consock, con->_outbuffer.c_str(), sizeof(con->_outbuffer), 0);if (i > 0){if (con->_outbuffer.empty()) // 内容当前send函数一次性发完了{logMessage(DEBUG, "sender finish");con->_tcps->EnableReadWrite(con, true, false); // 发完了,把写通道关闭// sleep(2);break;}else{logMessage(DEBUG, "sender not finish");con->_outbuffer.erase(0, i); // 如果一次性没发完,那么就将发完的部分删减掉,剩余的下次再发}}else{if (errno == EAGAIN || errno == EWOULDBLOCK) // 上次发完了,这次再发就会err为这两个字段{break;}else if (errno == EINTR){continue; // 因信号中断了,继续发送}else{logMessage(DEBUG, "excepter");if (con->_excepter) // 异常了,执行异常事件{con->_excepter(con);return;}}}}

Tcpserver.hpp

#pragma once#include <iostream>
#include <sys/select.h>
#include <string>
#include <functional>
#include <sys/epoll.h>
#include <unordered_map>
#include <assert.h>
#include "Epoller.hpp"
#include "err.hpp"
#include "Sock.hpp"
#include "until.hpp"using namespace std;namespace TCP_sv
{static const int defaultport = 8080;static const int Gnum = 64;class Tcpserver;class Connection;using func_t = function<void(Connection *)>;class Connection{public:Connection(int sock, Tcpserver *tcps) : _consock(sock), _tcps(tcps){}~Connection() {}void Register(func_t r, func_t w, func_t e) // 注册方法表,将方法传递进来{_reader = r;_writer = w;_excepter = e;}void Close(){close(_consock);}public:string _inbuffer;  // 输入缓冲区string _outbuffer; // 输出缓冲区func_t _reader;    // 读操作func_t _writer;    // 写操作func_t _excepter;  // 处理异常操作int _consock;Tcpserver *_tcps; // 指向Tcpserver对象的指针};class Tcpserver{private:void Recver(Connection *con) // 读事件{char buffer[1024];while (true){ssize_t i = recv(con->_consock, buffer, sizeof(buffer) - 1, 0);if (i > 0){buffer[i] = 0;con->_inbuffer += buffer;//每次读到的数据放到配套的缓冲区内logMessage(DEBUG, "recv str: %s", con->_inbuffer.c_str());_func(con);}else if (i == 0) // 断开连接,异常处理{if (con->_excepter){con->_excepter(con);return;}}else{if (errno == EAGAIN || errno == EWOULDBLOCK){break; // 读完了}else if (errno == EINTR) // 因信号中断,继续读{continue;}else{if (con->_excepter){con->_excepter(con);return;}}}}}void Sender(Connection *con) // 写事件{while (true){ssize_t i = send(con->_consock, con->_outbuffer.c_str(), sizeof(con->_outbuffer), 0);if (i > 0){if (con->_outbuffer.empty()) // 内容当前send函数一次性发完了{logMessage(DEBUG, "sender finish");con->_tcps->EnableReadWrite(con, true, false); // 发完了,把写通道关闭// sleep(2);break;}else{logMessage(DEBUG, "sender not finish");con->_outbuffer.erase(0, i); // 如果一次性没发完,那么就将发完的部分删减掉,剩余的下次再发}}else{if (errno == EAGAIN || errno == EWOULDBLOCK) // 上次发完了,这次再发就会err为这两个字段{break;}else if (errno == EINTR){continue; // 因信号中断了,继续发送}else{logMessage(DEBUG, "excepter");if (con->_excepter) // 异常了,执行异常事件{con->_excepter(con);return;}}}}}void Excepter(Connection *con) // 异常事件{logMessage(DEBUG, "enter excepter");_epoller.Control(con->_consock, 0, EPOLL_CTL_DEL);con->Close();_Connections.erase(con->_consock);logMessage(DEBUG, "out excepter");delete con;}void Accepter(Connection *con) // 针对listenfd,将获取到的连接从底层拿到应用层{for (;;){logMessage(DEBUG, "enter Accepter");string clientip;uint16_t clientport;int err = 0;int sock = _sock.Accpet(&clientip, &clientport, &err); // 获取成功,返回新的文件描述符用于通信,客户端的ip和port通过参数返回if (sock > 0){// 连接拿上来了,将fd添加到con对象中AddConnection(sock, EPOLLIN | EPOLLET,bind(&Tcpserver::Recver, this, placeholders::_1),bind(&Tcpserver::Sender, this, placeholders::_1),bind(&Tcpserver::Excepter, this, placeholders::_1));logMessage(DEBUG, "get new link,[%s:%d]", clientip.c_str(), clientport);}else{if (err == EAGAIN || err == EWOULDBLOCK)break; // 读完了else if (err == EINTR)continue; // 因为中断,继续读elsebreak; // 错误}}}bool Isexist(int sock){auto iter = _Connections.find(sock);return iter != _Connections.end(); // 判断sock是否存在connection集合中}void AddConnection(int sock, uint32_t event, func_t reader, func_t writer, func_t excepter){if (event & EPOLLET) // 如果是ET模式,就将文件描述符设置为非阻塞Until::setNonBlock(sock);Connection *con = new Connection(sock, this);con->Register(reader, writer, excepter); // 把外面的函数传进去初始化内部函数bool n = _epoller.Add_Event(sock, event); // 告诉内核需要监管那些事件--将fd和事件添加到epoll模型中logMessage(DEBUG, "Add event num: %d", n);assert(n);(void)n;_Connections.insert(pair<int, Connection *>(sock, con)); // 将fd和con对象添加到map中进行管理}void Loop(int timeout){// logMessage(DEBUG,"enter Loop");int n = _epoller.Wait(_revs, _num, timeout); // 获取已经就绪的事件for (int i = 0; i < n; i++)  //遍历就绪事件                // epoll_wait出错,n是-1,此时i不<n,就进不去for循环{                                            // 拿到就绪事件的fd和event// sleep(2);// logMessage(DEBUG, "enter Loop for");//提取sock和eventint sock = _revs[i].data.fd;uint32_t event = _revs[i].events;//处理异常事件--如果是异常事件,那么会进入读事件和写事件,但是读事件是不就绪的即读出错,就会走到处理异常的代码区。
//同样的写事件也会发送写出错,走到处理异常的代码区if (event & EPOLLERR)//      logMessage(DEBUG, "event & EPOLLERR");event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)//      logMessage(DEBUG, "event & EPOLLHUP");event |= (EPOLLIN | EPOLLOUT); // 如果事件异常了,就将该事件设置为读写事件// listenfd事件就绪if ((event & EPOLLIN) && (Isexist(sock)) && (_Connections[sock]->_reader)){logMessage(DEBUG, "con->_reader");_Connections[sock]->_reader(_Connections[sock]);}if ((event & EPOLLOUT) && (Isexist(sock)) && (_Connections[sock]->_writer)){logMessage(DEBUG, "con->_writer");_Connections[sock]->_writer(_Connections[sock]);}}//   logMessage(DEBUG, "quit Loop");}public:Tcpserver(func_t fun, int port = defaultport) : _port(port), _func(fun){}void inittcpserver(){logMessage(DEBUG, "enter inittcpserver\n");// 1.创建文件描述符_sock.Socket();    // 创建文件描述符--用于建立连接_sock.Bind(_port); // 绑定端口号和ip_sock.Listen();    // 将文件描述符设置为监视状态// 2.创建epoll模型_epoller.Create();//3.将listenfd添加到con对象,即添加到epoll中,并且注册配备的缓冲区和回调函数AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,\bind(&Tcpserver::Accepter, this, placeholders::_1), nullptr, nullptr); // 对于listenfd来说,只关心读取事件logMessage(DEBUG, "quit inittcpserver\n");_revs = new struct epoll_event[Gnum]; // 创建一个事件集合,供后续存放已经就绪的事件使用_num = Gnum;}void EnableReadWrite(Connection *con, bool readable, bool writable){//判断uint32_t event = (readable ? EPOLLIN : 0) | (writable ? EPOLLOUT : 0) | EPOLLET;_epoller.Control(con->_consock, event, EPOLL_CTL_MOD);}void Distribute()//事件派发s{logMessage(DEBUG, "enter Distribute");while (true){Loop(-1);}logMessage(DEBUG, "quit Distribute");}~Tcpserver(){_epoller.Close();if (_revs != nullptr)delete[] _revs;}private:uint16_t _port;Sock _sock;Epoller _epoller;unordered_map<int, Connection *> _Connections; // 建立sock和connection对象的映射表struct epoll_event *_revs;                     // 用来存储返回的事件func_t _func;int _num; // 可监管的事件总数};
}
  1. 该代码中的bind用法是bind类内成员函数。用法是第一个参数需要传递类内成员函数对象的指针,第二个参数需要传递类对象的指针,后面才是传递类成员函数需要的参数。对一个成员函数对象使用bind后形成一个新的函数对象。例如调用AddConnection函数时,第三四五参数是仿函数对象,通过bind将传入的类内成员函数对象转换为仿函数对象。以传递Recver函数为例,在bind表达式内,第一个参数传递Recver函数对象指针,第二个参数传递Tcpserver类指针,第三个参数传递的是需要传递给Recver函数对象的参数,即con指针。通过bind,将Tcpserver内的Recver成员函数转换为Connection类内的_reader成员函数。

image-20231129180452445

  1. 建立sock和Connection对象的映射关系,以sock作为key值,Connection对象的指针作为value值,建立unordered_map数据结构进行组织管理,Connection对象内是针对key值的sock配备的缓冲区,处理函数。

  2. 在epoll中读取到数据,处理完后不能立刻发送回给客户端。因为发送缓冲区是否具有空间是未知的。服务器启动后,发送条件是就绪的,是可以直接发送,但会存在一次性发送完的情况,可以下一次调用sender的时候再发送,这就要求每一个sock需要配备自己的发送缓冲区。并且将发送事件注册到epoll中,让epoll管理。由于服务器的需求以接收事件居多,发送事件相比需求不大,因此对于epoll来说接收事件是常规设置,发送事件是按需设置。

main.cc

#include <iostream>
#include <functional>
#include <vector>
#include <memory>
#include "err.hpp"
#include "tcpserver.hpp"
#include "protocol.hpp"
using namespace std;
using namespace TCP_sv;static void Usage(string proc)
{cerr << "Usage:\n\t" << proc << " port "<< "\n\n";
}string resp(const string &s)
{return s;
}bool cal(const Request &req, Response &resp)
{// req已经有结构化完成的数据啦,你可以直接使用resp._exitcode = NONE;resp._result = NONE;switch (req._op){case '+':resp._result = req._x + req._y;break;case '-':resp._result = req._x - req._y;break;case '*':resp._result = req._x * req._y;break;case '/':{if (req._y == 0)resp._exitcode = DIV_ZERO;elseresp._result = req._x / req._y;}break;case '%':{if (req._y == 0)resp._exitcode = MOD_ZERO;elseresp._result = req._x % req._y;}break;default:resp._exitcode = OP_ERR;break;}return true;
}void calculate(Connection *conn)
{string onepackage;// 从完整报文中取出有效载荷while (handleOnePackage(conn->_inbuffer, &onepackage))// 如果是读到一个完整的报文,就进入while循环体内对报文进行处理,形成响应返回给client端{//去报头string req_str;if(!deLength(onepackage,&req_str)){logMessage(FATAL,"delength err");return;}cout << "有效载荷: " << req_str << endl;// 反序列化:用有效载荷去构造req对象Request req;if (!req.Deserialize(req_str)){logMessage(FATAL, "Deserialize err");return;}// 用req对象的成员去构造resp对象--处理函数,然后构造响应Response resp;cal(req, resp);string respstr;// 用resp对象去序列化出一个报文if (!resp.Serialize(&respstr)){logMessage(FATAL, "resp serialize err");return;}// 将报文加上报头,然后填充到输出缓冲区中conn->_outbuffer += enLength(respstr);cout << "-----result: " << conn->_outbuffer << endl;}//if (conn->_writer)conn->_writer(conn);if (!conn->_outbuffer.empty()) // 如果没有发送完conn->_tcps->EnableReadWrite(conn, true, true);//如果这次数据没发完,下次epoll检查就绪事件时当前的sock的写事件还是就绪的,那么下次epoll就自动调用写函数继续写elseconn->_tcps->EnableReadWrite(conn, true, false);//通过回调指针调用tcp对象的函数。这次写完了,将写事件关闭
}int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);exit(USAGE_ERR);}uint16_t port = atoi(argv[1]);unique_ptr<Tcpserver> selsv(new Tcpserver(calculate, port));selsv->inittcpserver();selsv->Distribute();return 0;
}

protocol.hpp

#pragma once
#include<iostream>
#include<string>
#include <sys/types.h>
#include <sys/socket.h>
#include <cstring>
#include <jsoncpp/json/json.h>
#include"log.hpp"
using namespace std;#define SEP " "
#define SEP_LEN strlen(SEP)//strlen统计'\0'之前的字符个数,而sizeof统计的是所占内存的空间大小,使用sizeof会越界出问题
#define LINE_SEP "\r\n"
#define LINE_SEP_LEN strlen(LINE_SEP)enum {NONE=0,DIV_ZERO,MOD_ZERO,OP_ERR
};
//"x op y"->"text_len"\r\n"x op y"\r\n---給内容加上报头
std::string enLength(const std::string& text)//协议定制
{std::string send_str=to_string(text.size());send_str+=LINE_SEP;send_str+=text;send_str+=LINE_SEP;return send_str;
}
//"text_len"\r\n"x op y"\r\n -> "x op y"---去掉报头,取出里面的内容
bool deLength(const std::string& str,string* ret)//协议定制
{auto it=str.find(LINE_SEP);//找到报头if(it==std::string::npos) return false;//如果没找到则直接返回int len=stoi(str.substr(0,it));//取出字符串的长度*ret=str.substr(it+LINE_SEP_LEN,len);//取出数据return true;
}class Request
{
public:
Request():_x(0),_y(0),_op(0){}
Request(int x,int y,int op):_x(x),_y(y),_op(op){}bool Serialize(std::string* out)//序列化,将传入的x op y转化为字符串"x op y"
{*out="";*out+=to_string(_x);*out+=SEP;*out+=to_string(_op);*out+=SEP;*out+=to_string(_y);return true;
}bool Deserialize( const string& origin)//反序列化,将传过来的字符串拆出来传参給_x _op _y
{//"_xSEP_opSEP_y"-> _x,_op,_yauto leftit=origin.find(SEP);cout<<"Deserialize找到了leftSEP: "<<leftit<<endl;auto rightit=origin.rfind(SEP);cout<<"Deserialize找到了rightSEP: "<<rightit<<endl;if(leftit==string::npos|| rightit==string::npos) return false;if(leftit==rightit) return false;int opsize=rightit-leftit-1;cout<<"opsize: "<<opsize<<endl;
//1 43 1--leftit=1,rightit=4,opsize=rightit-leftit-1=4-1-1=2;
//1 3 1--leftit=1,right=3,opsize=rightit-leftit-1=3-1-1=1// if(rightit-(leftit+SEP_LEN)!=1) return false;if(rightit-(leftit+SEP_LEN)!=opsize) return false;//+号ASCII码是43,从char转int被解析成43即stringlen为两位,这里的运算rightit-(leftit+SEP_LEN)!=1就出问题
//4-(1+1)==2;3-(1+1)=1std::string origin_x=origin.substr(0,leftit);std::string origin_y=origin.substr(rightit+SEP_LEN);if(origin_x.empty()) return false;if(origin_y.empty()) return false;cout<<"origin_x: "<<origin_x<<" origin_y: "<<origin_y<<endl;_x=stoi(origin_x);int opf=stoi(origin.substr(leftit,rightit));_op=opf;cout<<"opf: "<<opf<<"_op: "<<_op<<endl;_y=stoi(origin_y);return true;}public:int _x;int _y;char _op;
};class Response
{
public:
Response():_exitcode(0),_result(0){}
Response(int exitcode,int result):_exitcode(exitcode),_result(result){}
bool Serialize(string*out)//序列化
{//_exitcode _result ->"_exitcodeSEP_result"
*out="";
*out+=to_string(_exitcode);
*out+=SEP;
*out+=to_string(_result);return true;
}bool Deserialize(const string& in)//反序列化
{//_exitcodeSEP_result"->_exitcode _resultauto pos=in.find(SEP);
if(pos==string::npos) return false;string excstr=in.substr(0,pos);
string resstr=in.substr(pos+SEP_LEN);
if(excstr.empty()||resstr.empty()) return false;_exitcode=stoi(excstr);
_result=stoi(resstr);
return true;}public:
int _exitcode;//退出码
int _result;//结果
};//"text_len"\r\n"x op y"\r\n
bool handleOnePackage(string& inbuffer,string*out)
{*out="";auto pos=inbuffer.find(LINE_SEP);//找\r\nif(pos==string::npos) return false;//没找到报头和有效载荷之间的分隔符---如果字节流式的报文没读全就继续读string text_len=inbuffer.substr(0,pos);//报头是有效载荷的长度int len=stoi(text_len);int totallen=text_len.size()+LINE_SEP_LEN*2+len;//整个报文的长度if(inbuffer.size()<totallen) {logMessage(WARNING,"got uncomplete message");return false;//报文没读完继续读}logMessage(NORMAL,"got complete message");*out=inbuffer.substr(0,totallen);inbuffer.erase(0,totallen);return true;}

log.hpp

#pragma once#include <iostream>
#include <string>
#include<ctime>
#include <sys/types.h>#include <unistd.h>#include <stdio.h>
#include <stdarg.h>
using namespace std;
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4
#define ERROR_EPOLL_CREATE 5#define NUM 1024
#define LOG_STR "./logstr.txt"
#define LOG_ERR "./log.err"
const char* to_str(int level)
{switch(level){case DEBUG: return "DEBUG";case NORMAL: return "NORMAL";case WARNING: return "WARNING";case ERROR: return "ERROR";case FATAL: return "FATAL";case ERROR_EPOLL_CREATE: return "ERROR_EPOLL_CREATE";default: return nullptr;}
}void logMessage(int level, const char* format,...)
{char logprestr[NUM];
snprintf(logprestr,sizeof(logprestr),"[%s][%ld][%d]",to_str(level),(long int)time(nullptr),getpid());char logeldstr[NUM];
va_list arg;
va_start(arg,format); 
vsnprintf(logeldstr,sizeof(logeldstr),format,arg);//arg是logmessage函数列表中的...cout<<logprestr<<logeldstr<<endl;}

Sock.hpp

#pragma once#include<iostream>
#include<string>
#include<cstring>
#include<sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"const static int backlog=32;
const static int defaultsock=-1;
class Sock
{const static int backlog=32;public:Sock(int sock=defaultsock):_listensock(sock){}void Socket(){_listensock=socket(AF_INET,SOCK_STREAM,0);//创建套接字if(_listensock<0)//创建失败{logMessage(FATAL,"create sock error");exit(SOCKET_ERR);}//创建成功logMessage(NORMAL,"create sock success,origin sock: %d\n",_listensock);int opt=1;setsockopt(_listensock,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));//允许套接字关闭后立刻重启} void Bind(int port){//绑定自己的网络信息struct sockaddr_in local;memset(&local,0,sizeof(local));//将结构体清空local.sin_family=AF_INET;//添加协议local.sin_port=htons(port);//添加端口号local.sin_addr.s_addr=htons(INADDR_ANY);//不绑定指定IP,可以接收任意IP主机发送来的数据//将本地设置的信息绑定到网络协议栈if (bind(_listensock,(struct sockaddr*)&local,sizeof(local))<0){logMessage(FATAL,"bind socket error");exit(BIND_ERR);}logMessage(NORMAL,"bind socket success");}void Listen()//将套接字设置为监听{if(listen(_listensock,0)<0){logMessage(FATAL,"listen socket error");exit(LISTEN_ERR);}logMessage(NORMAL,"listen socket success");}int Accpet(string * clientip,uint16_t* clientport,int*err){logMessage(DEBUG,"enter Accept");*err=errno;struct sockaddr_in cli;socklen_t len= sizeof(cli);logMessage(DEBUG,"will accept");//拿上来连接后,第二次调用到这里,调用accept函数阻塞住了,难道不是设定了sock是非阻塞了吗?11.17.21.24int sock=accept(_listensock,(struct sockaddr*)&cli,&len);logMessage(DEBUG,"accept done");if(sock<0){logMessage(FATAL,"accept error");//这里accept失败为什么不退出}else{logMessage(NORMAL,"accept a new link,get new sock : %d",sock);*clientip=inet_ntoa(cli.sin_addr);*clientport=ntohs(cli.sin_port);}logMessage(DEBUG,"quit Accept");return sock;}int Fd(){return _listensock;}private:int _listensock;
};

until.hpp

#pragma once#include<iostream>
#include<unistd.h>
#include<fcntl.h>
#include<string.h>
#include<cerrno>
using namespace std;
class Until
{
public:
static void setNonBlock(int fd)//把文件描述符设置为非阻塞
{int n=fcntl(fd,F_GETFL);//获取文件描述符的状态,正常返回非-1的标志位,出错返回-1if(n<0){cerr<<"fcntl :"<<strerror(errno)<<endl;return ;}fcntl(fd,F_SETFL,n|O_NONBLOCK);//对文件描述符的状态进行设置,设置为非阻塞状态
}};void Print_log()
{cout<<"print_log"<<endl;
}void Download()
{cout<<"download_something"<<endl;
}

Epoller.hpp

#pragma once#include <iostream>
#include <sys/select.h>
#include <string>
#include <functional>
#include <sys/epoll.h>
#include "Sock.hpp"using namespace std;
static const int defaultfd = -1; // 默认fd
static const int size = 128;     //
class Epoller
{
public:Epoller(int fd = defaultfd) : _epfd(fd) {}~Epoller(){if (_epfd != defaultfd){close(_epfd);}}void Create(){logMessage(DEBUG, "enter epoller create");_epfd = epoll_create(size); // 将管理事件数传进去,创建一个具有指定事件数的epoll模型if (_epfd < 0){logMessage(ERROR, "epoll_create error");exit(ERROR_EPOLL_CREATE);}logMessage(DEBUG, "out epoller create,_epfd: %d\n", _epfd);}// 用户告知内核,需要底层监管那些事件bool Add_Event(int sock, uint16_t event) // 将sock和event添加到epoll模型中{struct epoll_event epv;epv.events = event;epv.data.fd = sock;int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &epv);return n == 0;}// 内核告诉用户,已经就绪了多少个事件int Wait(struct epoll_event revent[], int num, int timeout) // 监管num个事件,就绪事件保存在revent数组中{logMessage(DEBUG, "enter epoll_Wait");int n = epoll_wait(_epfd, revent, num, timeout);logMessage(DEBUG, "quit epoll_Wait,get n: %d", n);//     sleep(2);return n; // 内核帮助用户监管事件,返回已经就绪的事件数量}bool Control(int sock, uint32_t event, int action){struct epoll_event epv;epv.events = event;epv.data.fd = sock;int n = 0;if (action == EPOLL_CTL_MOD){logMessage(NORMAL,"enter Control MOD");n = epoll_ctl(_epfd, action, sock, &epv);}else if (action == EPOLL_CTL_DEL){n = epoll_ctl(_epfd, action, sock, nullptr);}elsen = -1;return n == 0;}void Close(){if (_epfd != defaultfd){close(_epfd);}}private:int _epfd;
};

由于该reactor处理的是类似于接收“1+1”的完整报文的数据,返回客户端所得数的业务,因此客户端也需要能够具备发送完整报文的能力。

calclient.cc

#include<iostream>
#include<string>
#include<memory>
#include"calclient.hpp"
using namespace std;
using namespace client;
static void Usage(string proc)
{cout<<"\nUsage :\n\t"<<proc<<" serverip serverport\n"<<endl;
}
int main(int argc, char* argv[])
{if(argc!=3){Usage(argv[0]);exit(1);}string serverip=argv[1];
uint16_t serverport=atoi(argv[2]);unique_ptr<calclient> tc(new calclient(serverip,serverport));tc->initclient();
tc->start();return 0;
}

calclient.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <ctype.h>
#include"protocol.hpp"
using namespace std;
#define NUM 1024
namespace client
{class calclient
{public:
calclient(const string& ip,const uint16_t& port)
:_sock(-1)
,_port(port)
,_ip(ip)
{}void initclient()
{
//1.创建sockfd
_sock=socket(AF_INET,SOCK_STREAM,0);
if(_sock<0)
{cerr<<"socket create error"<<endl;exit(2);
}
//2.绑定 ip port,不显示绑定,OS自动绑定
}void start()
{
struct sockaddr_in ser;
bzero(&ser,sizeof(ser));
socklen_t len=sizeof(ser);
ser.sin_family=AF_INET;
ser.sin_port=htons(_port);
ser.sin_addr.s_addr=inet_addr(_ip.c_str());
if(connect(_sock,(struct sockaddr *)&ser,len)!=0)
{cerr<<"connect error"<<endl;
}else
{string line;string inbuffer;while(true){cout<<"mycal>>: ";//输入"xopy"getline(cin,line);Request req=ParseLine(line);//用"xopy"取出x op y构造Request对象string context;req.Serialize(&context);//序列化,用x op y构造字符串"xSEPopSEPy"string send_str=enLength(context);//定制协议---"x op y"->"text_len"\r\n"x op y"\r\n---給内容加上报头cout<<"calclient send str: "<<send_str<<endl;send(_sock,send_str.c_str(),send_str.size(),0);//客户端把报文发送給服务器string package;if(!recvPackage(_sock,inbuffer,&package)) continue;//服务器处理完数据,客户端接收服务器发送来的报文//  "content_len"\r\n"exitcode result"\r\nstring reser_len;if(!deLength(package,&reser_len)) continue;//去报头//  "content_len"\r\n"exitcode result"\r\n -> "exitcode result"Response rep;rep.Deserialize(reser_len);//反序列化://_exitcodeSEP_result"->_exitcode _resultcout<<"_exitcode: "<<rep._exitcode<<endl;cout<<"_result: "<<rep._result<<endl;}
}
}~calclient()
{if(_sock>=0) close(_sock);
}Request ParseLine(const string& line)
{//"xopy"->取出来到x op y 上
int i=0;
int status=0;
int num=line.size();
string left,right;
char op;while(i<num)
{
switch(status)
{case 0:{if(!isdigit(line[i])){op=line[i];//取出运算符**status=1;}elseleft.push_back(line[i++]);//取出左操作数}break;case 1:i++;status=2;break;case 2:right.push_back(line[i++]);break;
}
}
cout<<"left: "<<stoi(left)<<" op: "<<op<<" right: "<<stoi(right)<<endl;
return Request(stoi(left),stoi(right),op);//返回Request对象}private:
int _sock;
uint16_t _port;
string _ip;};
}

image-20231129194945169
reactor保证了事件就绪,还负责了IO,并且还完成了业务处理。负责了IO过程+业务处理,该过程称为半同步。实际上可以将业务放到其他处理逻辑上,只负责IO过程,这称为半异步。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/231614.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云时空社会化商业 ERP 系统 Shiro 反序列化漏洞复现

0x01 产品简介 时空云社会化商业ERP&#xff08;简称时空云ERP&#xff09; &#xff0c;该产品采用JAVA语言和Oracle数据库&#xff0c; 融合用友软件的先进管理理念&#xff0c;汇集各医药企业特色管理需求&#xff0c;通过规范各个流通环节从而提高企业竞争力、降低人员成本…

井盖倾斜监测方式,智能井盖传感器效果

大家是否都曾经想过&#xff0c;为什么路面上的井盖容易发生事故&#xff1f;其实这主要是因为井盖倾斜或者位移等异常状态出现时&#xff0c;由于人员巡查的范围较大从而无法及时察觉所导致的。为了保障道路行人和车辆的安全&#xff0c;对于井盖的监测需要不断完善和升级。而…

C#开发的OpenRA游戏之属性SelectionDecorations(13)

C#开发的OpenRA游戏之属性SelectionDecorations(13) 在前面分析SelectionDecorations属性类时,会发现它有下面这个属性: public class SelectionDecorations : SelectionDecorationsBase, IRender { readonly Interactable interactable; 它是定义了一个Interactabl…

【产品经理】AI在SaaS产品中的应用及挑战

随着ChatGPT大模型在全球的爆火&#xff0c;AI迅速在各个行业内&#xff0c;助力于各行业的效率提升。而SaaS领域&#xff0c;AI同样也大有可为。 AI&#xff08;人工智能&#xff0c;Artificial Intelligence的缩写&#xff09;近一年来一直处于舆论风口&#xff0c;随着ChatG…

小航助学题库蓝桥杯题库c++选拔赛(22年3月)(含题库教师学生账号)

需要在线模拟训练的题库账号请点击 小航助学编程在线模拟试卷系统&#xff08;含题库答题软件账号&#xff09; 需要在线模拟训练的题库账号请点击 小航助学编程在线模拟试卷系统&#xff08;含题库答题软件账号&#xff09;

统信桌面版arm系统安装火狐浏览器和浏览器驱动

一、系统信息 二、下载浏览器和驱动 1、浏览器 https://security.debian.org/debian-security/pool/updates/main/f/firefox-esr/firefox-esr_115.5.0esr-1~deb10u1_arm64.deb 2、驱动 https://github.com/mozilla/geckodriver/releases geckodriver-v0.33.0-linux-aarch6…

【Web】NewStarCTF Week3 个人复现

①Include &#x1f350; ?filephpinfo 提示查下register_argc_argv 发现为on LFI包含 pearcmd命令执行学习 pearcmd.php文件包含妙用 ?file/usr/local/lib/php/pearcmd&config-create/<?eval($_POST[a])?>./ha.php ?file./ha post传&#xff1a; asystem…

被DDoS攻击了怎么办?为什么要选择高防ip?

在当今互联网高度发达的时代&#xff0c;许多企业都依赖于网络来开展业务、推广产品、提供服务。然而&#xff0c;网络攻击&#xff0c;尤其是分布式拒绝服务&#xff08;DDoS&#xff09;攻击&#xff0c;已经成为一种日益严重的威胁。面对这种攻击&#xff0c;如何保护您的业…

STM32F407-14.3.6-01输入捕获模式

输入捕获模式 在输入捕获模式下&#xff0c;当相应的 ICx⑦ 信号检测到跳变沿后&#xff0c;将使用捕获/比较寄存器 (TIMx_CCRx⑪) 来锁存计数器的值。发生捕获事件时&#xff0c;会将相应的 CCXIF⑬ 标志&#xff08;TIMx_SR 寄存器&#xff09;置 1&#xff0c; 并可发送中断…

GoLong的学习之路,进阶,Redis

这个redis和上篇rabbitMQ一样&#xff0c;在之前我用Java从原理上进行了剖析&#xff0c;这里呢&#xff0c;我做项目的时候&#xff0c;也需要用到redis&#xff0c;所以这里也将去从怎么用的角度去写这篇文章。 文章目录 安装redis以及原理redis概念redis的应用场景有很多red…

Linux(fork+exec创建进程)

1.进程创建 内核设计与实现43页; 执行了3次ps -f ,ps -f的父进程的ID(PPID)都是一样的,即bash. 实际上Linux上这个bash就是不断的复制自身,然后把复制出来的用exec替换成想要执行的程序(比如ps); 运行ps,发现ps是bash的一个子进程;原因就是bash把自己复制一份,然后替换成ps;…

云时空社会化商业 ERP 系统 service SQL 注入漏洞复现

0x01 产品简介 时空云社会化商业ERP&#xff08;简称时空云ERP&#xff09; &#xff0c;该产品采用JAVA语言和Oracle数据库&#xff0c; 融合用友软件的先进管理理念&#xff0c;汇集各医药企业特色管理需求&#xff0c;通过规范各个流通环节从而提高企业竞争力、降低人员成本…