目录
1、回顾
2、Acceptor
2.1 Socket
2.1.1 Socket.h
2.1.2 Socket.cc
2.2 Acceptor
2.2.1 Acceptor.h
2.2.2 Acceptor.cc
1、回顾
Channel、Poller、EPollPoller、EventLoop相当于Reactor模型中的Reactor反应堆和Demultiplex事件分发器
Thread、EventLoopThread、EventLoopThreadPool:如果只有baseloop的话,这个线程既要负责处理连接请求,还要处理读写事件,效率低,所以muduo库是采用Mutiple Reactors,可以通过setthreadnum设置底层线程的数量(设置的是subloop工作线程的数量)。
以上部分已完成。
2、Acceptor
现在我们看Acceptor
Acceptor就是处理accept,监听新用户的连接,拿到跟客户端通信的clientfd,打包成channel,根据muduo的轮询算法找一个subloop,把subloop唤醒(每一个loop都有一个由系统API eventfd创建的wakeupfd(带有线程间通知notify机制的fd),在subloop的poller中监听的,mainloop通过给wakeupfd写一个整数去唤醒subloop),把接收的channel给subloop。
我们打开TCPServer,处理一下Acceptor,
Acceptor运行在我们的baseloop(mainreactor)里面
2.1 Socket封装fd
Acceptor对socket类进行了封装,先来看一下socket
2.1.1 Socket.h
#pragma once
#include "noncopyable.h"class InetAddress;//封装socket fd
class Socket:noncopyable
{
public:explicit Socket(int sockfd):sockfd_(sockfd){}~Socket();int fd()const{return sockfd_;}void bindAddress(const InetAddress& localaddr);void listen();int accept(InetAddress* peeraddr);void shutdownWrite();void setTcpNoDelay(bool on);//直接发送,数据不进行TCP缓存 void setReuseAddr(bool on);void setReusePort(bool on);void setKeepAlive(bool on);
private:const int sockfd_;
};
2.1.2 Socket.cc
#include "Socket.h"
#include "Logger.h"
#include "InetAddress.h"#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <netinet/tcp.h>Socket::~Socket()
{close(sockfd_); // 调用系统的close
}void Socket::bindAddress(const InetAddress &localaddr)
{if (0 != bind(sockfd_, (sockaddr *)localaddr.getSockAddr(), sizeof(sockaddr_in))){LOG_FATAL("bind scokfd:%d fail\n", sockfd_);}
}
void Socket::listen()
{if (0 != ::listen(sockfd_, 1024)){LOG_FATAL("listen sockfd:%d fail\n", sockfd_);}
}int Socket::accept(InetAddress *peeraddr)
{/*** 1. accept函数的参数不合法* 2. 对返回的connfd没有设置非阻塞* Reactor模型 one loop per thread* poller + non-blocking IO*/sockaddr_in addr;socklen_t len;bzero(&addr, sizeof addr);int connfd = ::accept(sockfd_, (sockaddr *)&addr, &len);if (connfd >= 0){peeraddr->setSockAddr(addr);}return connfd;
}void Socket::shutdownWrite()
{if (::shutdown(sockfd_, SHUT_WR) < 0){LOG_ERROR("shutdownWrite error");}
}void Socket::setTcpNoDelay(bool on)
{int optval = on ? 1 : 0;::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof optval); // IPPROTO_TCP表示协议级别
}void Socket::setReuseAddr(bool on)
{int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval); // SQL_SOCKET表示socket级别
}
void Socket::setReusePort(bool on)
{int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval);
}
void Socket::setKeepAlive(bool on)
{int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof optval);
}
2.2 Acceptor
Acceptor相当于对accept的一个封装,Acceptor是运行在baseloop中的
一条新连接的回调:拿到跟客户端通信的clientfd,打包成channel,根据muduo的轮询算法找一个subloop,把subloop唤醒(getnextloop函数实现)把接收的channel分发给subloop,去监听已连接用户的读写事件。
2.2.1 Acceptor.h
#pragma once
#include "noncopyable.h"
#include "Socket.h"
#include "Channel.h"#include <functional>class EventLoop;
class InetAddress;class Acceptor:noncopyable
{
public:using NewConnectionCallback=std::function<void(int sockfd,const InetAddress&)>;Acceptor(EventLoop* loop,const InetAddress& listenAddr,bool reuseport);~Acceptor();void setNewConnectionCallback(const NewConnectionCallback& cb){NewConnectionCallback_=cb;}bool listenning() const{return listenning_;}void listen();
private:void handleRead();EventLoop* loop_;//Acceptor用的就是用户定义的哪个baseLoop,也称为mainLoopSocket acceptSocket_;Channel acceptChannel_;NewConnectionCallback NewConnectionCallback_;bool listenning_;
};
2.2.2 Acceptor.cc
#include "Acceptor.h"
#include "Logger.h"
#include "InetAddress.h"#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <unistd.h>static int createNonblocking()//创建非阻塞的I/O
{int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);if (sockfd < 0){LOG_FATAL("%s:%s:%d listen socket create err:%d\n", __FILE__, __FUNCTION__, __LINE__, errno);}
}Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport):loop_(loop),acceptSocket_(createNonblocking())//创建socket,acceptChannel_(loop,acceptSocket_.fd())//channel和poller都是通过请求本线程的loop和poller通信 ,listenning_(false)
{acceptSocket_.setReuseAddr(true);acceptSocket_.setReusePort(true);acceptSocket_.bindAddress(listenAddr);//bind绑定套接字 //TcpServer::start() Acceptor.listen 如果有新用户的连接,就要执行一个回调(connfd=》打包成channel=》唤醒subloop)//baseLoop => acceptChannel_(listenfd)有事件发生 => 底层反应堆调用回调 acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead,this));//绑定回调
}
Acceptor::~Acceptor()
{acceptChannel_.disableAll();acceptChannel_.remove();
}void Acceptor::listen()
{listenning_=true;acceptSocket_.listen();//listenacceptChannel_.enableReading();//acceptChannel_=>Poller
}//listenfd有事件发生,就是有新用户连接了
void Acceptor::handleRead()
{InetAddress peerAddr;int connfd=acceptSocket_.accept(&peerAddr);if(connfd>=0){if(NewConnectionCallback_){NewConnectionCallback_(connfd,peerAddr);//轮询找到subloop,唤醒,分发当前的新客户端的channel}else//客户端没有办法去服务 {::close(connfd);}}else{LOG_ERROR("%s:%s:%d accept err:%d\n", __FILE__, __FUNCTION__, __LINE__, errno);if(errno==EMFILE)//EMFILE错误的解决方法:1、调整当前进程文件描述符的上限 2、说明单台服务器不足以支撑现有的流量,进行集群和分布式部署{LOG_ERROR("%s:%s:%d sockfd reached limit\n", __FILE__, __FUNCTION__, __LINE__);}}
}