目录
1、Callbacks.h
2、TcpServer.h
3、TcpServer.cc
1、Callbacks.h
回调操作
#pragma once#include <memory>
#include <functional>class Buffer;
class TcpConnection;using TcpConnectionPtr=std::shared_ptr<TcpConnection>;
using ConnectionCallback=std::function<void(const TcpConnectionPtr&)>;
using CloseCallback=std::function<void(const TcpConnectionPtr&)>;
using WriteCompleteCallback=std::function<void(const TcpConnectionPtr&)>;
using MessageCallback=std::function<void(const TcpConnectionPtr&,Buffer*,Timestamp)>;
2、TcpServer.h
#pragma once/*** 用户使用muduo编写服务器程序* 在这里加上头文件,用户使用的时候就不用加了
*/
#include "EventLoop.h"
#include "Acceptor.h"
#include "InetAddress.h"
#include "noncopyable.h"
#include "EventLoopThreadPool.h"
#include "Callbacks.h"#include <functional>
#include <string>
#include <memory>
#include <atomic>
#include <unordered_map>//对外的服务器编程使用的类
class TcpServer:noncopyable
{
public:using ThreadInitCallback=std::function<void(EventLoop*)>;enum Option//预制两个选项来表示端口是否可重用{kNoReusePort,//不可重用kReusePort,//可重用};TcpServer(EventLoop* loop,const InetAddress& listenAddr,const std::string& nameArg,Option option=kNoReusePort);~TcpServer();void setThreadInitcallback(const ThreadInitCallback& cb){threadInitCallback_=cb;}void setConnectionCallback(const ConnectionCallback& cb){connectionCallback_=cb;}void setMessageCallback(const MessageCallback& cb){messageCallback_=cb;}void setWriteCompleteCallback(const WriteCompleteCallback& cb){writeCompleteCallback_=cb;}//设置底层subloop的个数void setThreadNum(int numThreads);//开启服务器监听void start();
private:void newConnection(int sockfd,const InetAddress& peerAddr);void removeConnection(const TcpConnectionPtr& conn);void removeConnectionInLoop(const TcpConnectionPtr& conn);using ConnectionMap=std::unordered_map<std::string,TcpConnectionPtr>;EventLoop* loop_;//baseloop 用户定义的loopconst std::string ipPort_;//保存服务器相关的ip地址,端口号const std::string name_;//保存服务器名称std::unique_ptr<Acceptor> acceptor_;//运行在mainloop,任务就是监听新连接事件std::shared_ptr<EventLoopThreadPool> threadPool_;//one loop per threadConnectionCallback connectionCallback_;//有新连接时的回调MessageCallback messageCallback_;//有读写消息时的回调WriteCompleteCallback writeCompleteCallback_;//消息发送完成以后的回调ThreadInitCallback threadInitCallback_;//loop线程初始化的回调std::atomic_int started_;int nextConnId_;ConnectionMap connections_;//保存所有的连接
};
3、TcpServer.cc
mainLoop相当于reactor模型的reactor反应堆的角色
poller相当于是多路分发器的角色,掌控epoll的所有操作
Acceptor创建listenfd,封装成acceptchannel,通过enableReading向poller上注册读事件,并将listenfd添加到poller中,poller监听acceptChannel上的事件,有事件发生时执行一个读事件的回调(因为之前约定的是对读事件感兴趣),读事件的回调绑定的是handleRead,handleRead中accept函数返回一个跟客户端通信的connfd,然后去执行相应的回调
IOLoop运行在主线程中,想要把当前connfd封装的channel发送给subloop1,但是subloop1还没有被唤醒,我是在主线程中操作IOLoop的,很明显IOLoop和subLoop1不在一个线程,所以执行queueinloop唤醒subloop1,相当于在subloop1中的wakeupfd中写了一个数字,然后就把这个新的TcpConnection注册到subloop1中,操作到其它subloop上也是一样的过程。
重写代码:
#include "TcpServer.h"
#include "Logger.h"EventLoop* CheckLoopNotNull(EventLoop* loop)
{if(loop==nullptr){LOG_FATAL("%s:%s:%d mainLoop is null!\n",__FILE__,__FUNCTION__,__LINE__);}return loop;
}TcpServer::TcpServer(EventLoop *loop,const InetAddress &listenAddr,const std::string &nameArg,Option option):loop_(CheckLoopNotNull(loop)),ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop,listenAddr,option==kReusePort)),threadPool_(new EventLoopThreadPool(loop,name_)),connectionCallback_(),messageCallback_(),nextConnId_(1)
{//当有用户连接时,会执行TcpServer::newConnection回调 acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection,this,std::placeholders::_1,std::placeholders::_2));
}TcpServer::~TcpServer()
{}// 设置底层subloop的个数
void TcpServer::setThreadNum(int numThreads)
{threadPool_->setThreadNum(numThreads);
}// 开启服务器监听 loop.loop()
void TcpServer::start()
{if(started_++==0)//防止一个TcpServer对象被start多次{threadPool_->start(threadInitCallback_);//启动底层的loop线程池loop_->runInLoop(std::bind(&Acceptor::listen,acceptor_.get()));}
}// void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr);
// void TcpServer::removeConnection(const TcpConnectionPtr &conn);
// void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn);