基于多线程的Reactor模式的 回声服务器 EchoServer

记录下  

一个线程专门用来接受accept获取客户端的fd 

获取fd之后 从剩余的执行线程中 找到一个连接客户端数量最少的线程

然后将客户端的fd加入到这个线程中并通过EPOLL监听这个fd

线程之间通过eventfd来通信  将客户端的fd传到 对应的线程中  

参考了MediaServer   引入EventPollerPoll 和 EventPoller的 概念  

最少两个两个线程 设置为1的话 会改成2

cpp代码:

#include "durian.h"#include <sys/epoll.h>namespace DURIAN
{EventPoller::EventPoller(int id){m_id = id;}EventPoller::~EventPoller(){printf("~EventPoller signal m_id = %d m_run_flag = %d\n",m_id,m_run_flag);Wait();}bool EventPoller::Init(){m_poll_fd = epoll_create1(0);if(m_poll_fd == -1){return false;}m_event_fd = eventfd(0,0);if(m_event_fd == -1){printf("new fd failed\n");close(m_poll_fd);return false ;}return true;}void EventPoller::RunLoop(){static const int MAX_EVENTS = 1024;struct epoll_event events[MAX_EVENTS];while(m_run_flag){int ready_count = epoll_wait(m_poll_fd,events,MAX_EVENTS,2000);if(ready_count == -1){if(errno != EINTR){//exit(1);}//ready_count = 0;}else if(ready_count == 0){if(m_run_flag == false){//printf("time out and runflag = false exit thread\n");//break;}}for(int i = 0;i<ready_count;i++){const struct epoll_event &ev = events[i];int fd = events[i].data.fd;if(ev.events &(EPOLLIN | EPOLLERR |EPOLLHUP)){auto handler = m_accept_handlers[fd];handler(fd);}else if(ev.events & (EPOLLOUT | EPOLLERR | EPOLLHUP)){auto it = m_buffer_pool.find(fd);if(it!= m_buffer_pool.end()){auto &buf = it->second;if(buf.WriteData(fd) == false){Close(fd);}}}}}}int EventPoller::GetEventFD(){return m_event_fd;}int EventPoller::GetClients(){return m_accept_handlers.size();}void EventPoller::Stop(){m_run_flag = false;}void EventPoller::Start(){//printf("Enter EventPoller Start  m_id = %d pollfd = %d eventid = %d\n",m_id,m_poll_fd,m_event_fd);m_run_flag = true;m_thread_id = std::thread(&EventPoller::RunLoop,this);}void EventPoller::Wait(){if(m_thread_id.joinable()){m_thread_id.join();}}bool EventPoller::Add2Epoll(int fd){if(m_accept_handlers.count(fd) != 0){return false;}int flags = 1;if(ioctl(fd,FIONBIO,&flags) == -1){return false;}struct epoll_event ev;ev.events = EPOLLIN |EPOLLOUT |EPOLLET;ev.data.fd = fd;if(epoll_ctl(m_poll_fd,EPOLL_CTL_ADD,fd,&ev)==-1){return false;}return true;}void EventPoller::DeliverConn(int conn_fd){//printf("DeliverConn fd = %d\n",conn_fd);uint64_t count = conn_fd;if(write(m_event_fd,&count,sizeof(count)) == -1){printf("Deliverconn write failed\n");}}bool EventPoller::AddListener(int fd,ACCEPTER on_accept){if(Add2Epoll(fd) == false){return false;}std::cout<<"EventPoller AddListener fd = "<<fd<<std::endl;m_accept_handlers[fd] = [this,on_accept]( int server_fd){for(;;){int new_fd = accept(server_fd,nullptr,nullptr);std::cout<<"accept client fd = "<<new_fd<<std::endl;	if(new_fd == -1){if(errno!= EAGAIN){Close(server_fd);}return 0;}int enable = 1;setsockopt(new_fd,IPPROTO_TCP,TCP_NODELAY,&enable,sizeof(enable));on_accept(new_fd);}return 0;};return true;}bool EventPoller::AddEventer(int fd, EVENTER on_event){if(Add2Epoll(fd) == false){return false;}m_accept_handlers[fd] = [this,on_event](int cfd){for(;;){uint64_t count;if(read(cfd,&count,sizeof(count)) == -1){if(errno != EAGAIN){Close(cfd);}return 0;}on_event(count);}return 0;};return true;}bool EventPoller::AddReader(int fd, READER on_read){	if(Add2Epoll(fd) == false){return false;}m_accept_handlers[fd] = [this,on_read](int cfd){for(;;){char buf[4096] = {0};ssize_t ret = read(cfd,buf,sizeof(buf));if(ret == -1){if(errno != EAGAIN){Close(cfd);}return -1;}if(ret == 0){Close(cfd);printf("客户端关闭了连接 %d\n",cfd);return 0 ;}on_read(cfd,buf,ret);}};return true;}void EventPoller::Close(int fd){m_accept_handlers.erase(fd);m_buffer_pool.erase(fd);close(fd);}bool EventPoller::FlushData(int fd, const char * buf, size_t len){WriteBuffer *wb = nullptr;auto it = m_buffer_pool.find(fd);if(it == m_buffer_pool.end()){while(len >0){ssize_t ret = write(fd,buf,len);if(ret == -1){if(errno != EAGAIN){Close(fd);return false;}wb = &m_buffer_pool[fd];break;}buf+= ret;len-=ret;}if(len == 0){//Successreturn true;}}else{wb = &it->second;}wb->Add2Buffer(buf,len);return true;}static size_t g_pool_size = 0;
void EventPollerPool::SetPoolSize(size_t size)
{g_pool_size = size;
}
EventPollerPool & EventPollerPool::Instance()
{static std::shared_ptr<EventPollerPool> s_instance(new EventPollerPool()); static EventPollerPool &s_instance_ref = *s_instance; return s_instance_ref; 
}EventPollerPool::EventPollerPool()
{auto size = g_pool_size;auto cpus = std::thread::hardware_concurrency();size = size > 0 ? size : cpus;std::cout<<"Thread size:"<<size<<std::endl;if(size <2)size = 2;for (int i = 0; i < size; ++i) {std::shared_ptr<EventPoller> poller = std::make_shared<EventPoller>(i);m_pollers.emplace_back(poller);}
}std::shared_ptr<EventPoller> EventPollerPool::GetPoller()
{if(m_pollers.size()>1){int min_clients = 10000;int target_index = 0;for(int i = 1;i<m_pollers.size();i++){if(m_pollers[i]-> GetClients() < min_clients){min_clients = m_pollers[i]->GetClients();target_index = i;}}//printf("target index = %d min_clients = %d\n",target_index,min_clients);return m_pollers[target_index];}return m_pollers[0];}
std::shared_ptr<EventPoller> EventPollerPool::GetFirstPoller()
{return m_pollers[0];
}void EventPollerPool::StartPollers()
{for(int i = 1;i<m_pollers.size();i++){m_pollers[i]->Init();int event_fd = m_pollers[i]->GetEventFD();m_pollers[i]->AddEventer(event_fd,[&,i](uint64_t cfd){READER reader = [&,i](int fd,const char*data,size_t len){printf("Len[%s] content[%d] m_pollers[i] = %p i = %d\n",data,len,m_pollers[i],i);m_pollers[i]->FlushData(fd,data,len);return 0;};m_pollers[i]->AddReader(cfd,reader);return 0;});m_pollers[i]->Start();	}
}void EventPollerPool::Stop()
{for(int i = 0;i<m_pollers.size();i++){m_pollers[i]->Stop();}}}

头文件

#include <string.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <netinet/tcp.h>#include <sys/eventfd.h>
#include <signal.h>#include <iostream>
#include <memory>
#include <list>
#include <vector>
#include <functional>
#include <thread>
#include <mutex>#include <unordered_map>namespace DURIAN
{class WriteBuffer{private:std::list<std::string> buf_items;size_t offset = 0;public:bool IsEmpty() const{return buf_items.empty();}void Add2Buffer(const char* data,size_t len){if(buf_items.empty() || buf_items.back().size()+len >4096){buf_items.emplace_back(data,len);}else{buf_items.back().append(data,len);}}bool WriteData(int fd){while (IsEmpty() == false){auto const &item = buf_items.front();const char *p = item.data() + offset;size_t len = item.size() -offset;while(len >0){ssize_t ret = write(fd,p,len);if(ret == -1){if(errno == EAGAIN){return true;}return false;}offset += ret;p+=ret;len-= ret;}buf_items.pop_front();}return true;}};using ACCEPTER = std::function<int(int)>;using WRITER = std::function<int(int)>;using EVENTER = std::function<int(int)>;using READER = std::function<int(int,const char *data,size_t)>;//static thread_local std::unordered_map<int fd,READER>g_th_handlers;class EventPoller{private:int m_poll_fd = -1;int m_id;bool m_run_flag = false;std::unordered_map<int,ACCEPTER> m_accept_handlers;std::unordered_map<int,WriteBuffer> m_buffer_pool;std::mutex m_connction_lock;int m_event_fd;std::thread m_thread_id ;std::vector<int>m_connections;void RunLoop();public:EventPoller(int i);~EventPoller();int GetEventFD();int GetClients();std::vector<int> & GetConnections();bool Init();void Start();void Stop();void Wait();	void DeliverConn(int conn_fd);bool AddListener(int fd,ACCEPTER on_listen);bool AddEventer(int fd,EVENTER on_event);bool AddReader(int fd,READER on_read);void Close(int fd);bool Add2Epoll(int fd);bool FlushData(int fd,const char *buf,size_t len);};class EventPollerPool{public:static EventPollerPool &Instance();static void SetPoolSize(size_t size = 0);std::shared_ptr<EventPoller>GetPoller(); std::shared_ptr<EventPoller>GetFirstPoller(); 	void StartPollers();void Stop(); private:int m_size;std::vector<std::shared_ptr<EventPoller>> m_pollers;EventPollerPool();		  };}

main文件

#include "durian.h"static bool g_run_flag = true;
void sig_handler(int signo)
{signal(SIGINT, SIG_IGN);signal(SIGTERM, SIG_IGN);signal(SIGKILL, SIG_IGN);g_run_flag = false;printf("Get exit flag\n");if (SIGINT == signo || SIGTSTP == signo || SIGTERM == signo|| SIGKILL == signo){g_run_flag = false;printf("\033[0;31mprogram exit by kill cmd !\033[0;39m\n");}}bool StartServer()
{int listen_fd = socket(AF_INET,SOCK_STREAM,0);if(listen_fd == -1){printf("Create socket failed\n");return false;}else{printf("Server listen fd is:%d\n",listen_fd);}int reuseaddr = 1;if(setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr ,sizeof(reuseaddr)) == -1){return false;}struct sockaddr_in listen_addr = {0};listen_addr.sin_family  = AF_INET;listen_addr.sin_addr.s_addr = INADDR_ANY;listen_addr.sin_port = htons(8888);if(bind(listen_fd,(struct sockaddr*)&listen_addr,sizeof(listen_addr)) == -1){printf("bind failed\n");return false;}if(listen(listen_fd,100) == -1){printf("listen failed\n");return false;}DURIAN::EventPollerPool::SetPoolSize(1);DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance(); pool.StartPollers();auto poller = pool.GetFirstPoller(); if(poller->Init()){if(poller->AddListener(listen_fd,[&](int conn_fd){printf("将新的fd加到epoll监听 fd =%d\n",conn_fd);//Deliver client fd to other pollerspool.GetPoller()->DeliverConn(conn_fd);return 0;}) == false){return false;}poller->Start();}while(g_run_flag){sleep(2);}pool.Stop();}void StopServer()
{DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance(); pool.Stop();
}int main(int argc,char *argv[])
{printf(" cpp version :%d\n",__cplusplus);int thread_size = 1;bool run_flag = true;signal(SIGPIPE,SIG_IGN);signal(SIGTERM, sig_handler);signal(SIGKILL, sig_handler);signal(SIGINT,sig_handler); StartServer();return 0;
}

性能测试

ulimit -HSn 102400

ab -n 100000 -c 20000 http://192.168.131.131:8888/index.html
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/130850.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Cloud 微服务系列文章合集,一次性看个够!

微服务架构图 为了方便大家可以直接下载编辑&#xff0c;这里用的ProcessOn画的架构图&#xff0c;可以直接克隆一个出来进行编辑&#xff0c;地址&#xff1a;https://www.processon.com/view/6523a1b37fde9c4bb35c7278 微服务系列文章合集&#xff0c;点击阅读 Spring Cl…

深度学习DAY3:FFNNLM前馈神经网络语言模型

1 神经网络语言模型NNLM的提出 文章&#xff1a;自然语言处理中的语言模型预训练方法&#xff08;ELMo、GPT和BERT&#xff09; https://www.cnblogs.com/robert-dlut/p/9824346.html 语言模型不需要人工标注语料&#xff08;属于自监督模型&#xff09;&#xff0c;所以语言…

阿里云上了新闻联播

我是卢松松&#xff0c;点点上面的头像&#xff0c;欢迎关注我哦&#xff01; 阿里新任的CEO吴泳铭上央视新闻联播了! 在昨天的新闻联播里&#xff0c;出席科技座谈会&#xff0c;有一个特别镜头&#xff0c;出现了阿里新任CEO吴泳铭的镜头。 这个信号意义明显&#xff0c;我…

【angular】TodoList小项目(已开源)

参考&#xff1a;https://segmentfault.com/a/1190000013519099 文章目录 准备工作headerTodo、Doing、Done样式&#xff08;HTMLCSS&#xff09;功能&#xff08;TS&#xff09;将输入框内容加入todoList&#xff08;addTodo&#xff09;将todo事件改到doing 服务 参考开源 效…

原生JS-鼠标拖动

原生JS-鼠标拖动 通过鼠标的点击事件通过h5的属性 通过鼠标的点击事件 步骤&#xff1a; 1. 鼠标按下div。 2. 鼠标移动&#xff0c;div跟着移动 原生js&#xff0c;实现拖拽效果。1. 给被拖拽的div加上 onmousedown 鼠标【按下事件】。鼠标按下的时候&#xff0c;开始监听鼠标…

YOLOv3 | 核心主干网络,特征图解码,多类损失函数详解

https://zhuanlan.zhihu.com/p/76802514) 文章目录 1. 核心改进1.1主干网络1.2 特征图解码1.2.1 检测框&#xff08;位置&#xff0c;宽高&#xff09;解码1.2.2 检测置信度解码1.2.3 类别解码 1.3 训练损失函数1.3.1 正负样本定义1.3.2 损失函数 1. 核心改进 1.1主干网络 更…

Python操作Hive数据仓库

Python连接Hive 1、Python如何连接Hive&#xff1f;2、Python连接Hive数据仓库 1、Python如何连接Hive&#xff1f; Python连接Hive需要使用Impala查询引擎 由于Hadoop集群节点间使用RPC通信&#xff0c;所以需要配置Thrift依赖环境 Thrift是一个轻量级、跨语言的RPC框架&…

【AI视野·今日Sound 声学论文速览 第二十一期】Mon, 9 Oct 2023

AI视野今日CS.Sound 声学论文速览 Mon, 9 Oct 2023 Totally 13 papers &#x1f449;上期速览✈更多精彩请移步主页 Interesting: &#x1f4da;MBTFNet,用于歌声质量增强的多带宽时频神经网络 (from 西工大 Audio, Speech and Language Processing Group (ASLPNPU),) Daily…

go语言教程4:switch和map

文章目录 switchswitch匹配字典 go语言教程&#xff1a;安装入门➡️for循环➡️数组、切片和指针 switch和map&#xff0c;一个是控制流&#xff0c;一个是数据结构&#xff0c;之所以把两个不同类型的知识点放在一起讲解&#xff0c;是因为二者有着极其相似的运行逻辑&#…

linux下安装ffmpeg的详细教程、ffmpeg is not installed

1、下载解压 wget http://www.ffmpeg.org/releases/ffmpeg-6.0.tar.gz tar -zxvf ffmpeg-6.0.tar.gz 2、 进入解压后目录,输入如下命令/usr/local/ffmpeg为自己指定的安装目录 cd ffmpeg-6.0 ./configure --prefix/usr/local/ffmpeg make sudo make install 3、配置变量 v…

神经网络(MLP多层感知器)

分类 神经网络可以分为多种不同的类型&#xff0c;下面列举一些常见的神经网络类型&#xff1a; 前馈神经网络&#xff08;Feedforward Neural Network&#xff09;&#xff1a;前馈神经网络是最基本的神经网络类型&#xff0c;也是深度学习中最常见的神经网络类型。它由若干个…

IDEA使用模板创建webapp时,web.xml文件版本过低的一种解决方法

创建完成后的web.xml 文件&#xff0c;版本太低 <!DOCTYPE web-app PUBLIC"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN""http://java.sun.com/dtd/web-app_2_3.dtd" ><web-app><display-name>Archetype Created Web Appl…