从0到1开发go-tcp框架【3-读写协程分离、引入消息队列、进入连接管理器、引入连接属性】【基础篇完结】

从0到1开发go-tcp框架【3-读写协程分离、引入消息队列、进入连接管理器、引入连接属性】

1 读写协程分离[v0.7]

  1. 添加一个Reader和Writer之间通信的channel
  2. 添加一个Writer goroutine
  3. Reader由之前直接发送给客户端改为发送给通信channel
  4. 启动Reader和Writer一起工作

zinx/znet/connection.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""io""net"
)type Connection struct {Conn       *net.TCPConnConnID     uint32isClosed   boolmsgChannel chan []byte//告知当前的连接已经退出/停止(由Reader告知writer退出)ExitChan   chan boolMsgHandler *MsgHandle
}func NewConnection(conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c := &Connection{Conn:       conn,ConnID:     connID,MsgHandler: msgHandle,isClosed:   false,msgChannel: make(chan []byte),ExitChan:   make(chan bool, 1),}return c
}func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println("[conn Writer  goroutine exit!]", c.RemoteAddr().String())//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端for {select {case data := <-c.msgChannel://有数据写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send data error , ", err)return}case <-c.ExitChan://代表reader已经退出,此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp := NewDataPack()//读取客户端的msg Head 二进制流 8字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head err ", err)break}//拆包,将读取到的headData封装为msgmsg, err := dp.UnPack(headData)if err != nil {fmt.Println("unpack msg err ", err)break}//根据dataLen,再次读取Data,放在msg.Data中,var data []byte//如果数据包中有数据,则读取if msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())//将切片data读满if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data err ", err)break}}msg.SetData(data)//封装请求,改为router处理r := Request{conn: c,msg:  msg,}go c.MsgHandler.DoMsgHandler(&r)}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = truec.Conn.Close()c.ExitChan <- trueclose(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New("connection closed\n")}//将data进行封包dp := NewDataPack()binaryMsg, err := dp.Pack(NewMessage(msgId, data))if err != nil {fmt.Println("Pack error msg id=", msgId)return errors.New("pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("write msg id ", msgId, " error ", err)return errors.New("conn write err ")}return nil
}

测试

myDemo/ZinxV0.7/client.go

  • client0.go
package mainimport ("fmt""io""myTest/zinx/znet""net""time"
)/*
模拟客户端
*/
func main() {fmt.Println("client start...")time.Sleep(time.Second * 1)//1 创建服务器连接conn, err := net.Dial("tcp", "127.0.0.1:8092")if err != nil {fmt.Println("client start err ", err)return}for {//发送封装后的数据包dp := znet.NewDataPack()binaryMsg, err := dp.Pack(znet.NewMessage(0, []byte("Zinx client0 test msg")))if err != nil {fmt.Println("client pack msg err ", err)return}if _, err := conn.Write(binaryMsg); err != nil {fmt.Println("client write err ", err)return}//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...//1 先读取流中的head部分,得到Id和dataLenbinaryHead := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(conn, binaryHead); err != nil {fmt.Println("client read head err ", err)break}//将二进制的head拆包到msg中msgHead, err := dp.UnPack(binaryHead)if err != nil {fmt.Println("client unpack msgHead err ", err)break}if msgHead.GetMsgLen() > 0 {//2 有数据, 再根据dataLen进行二次读取,将data读出来msg := msgHead.(*znet.Message)msg.Data = make([]byte, msg.GetMsgLen())if _, err := io.ReadFull(conn, msg.Data); err != nil {fmt.Println("read msg data error ", err)return}fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))}//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片time.Sleep(time.Second * 1)}
}
  • client1.go

在这里插入图片描述

myDemo/ZinxV0.7/server.go

package mainimport ("fmt""myTest/zinx/ziface""myTest/zinx/znet"
)//自定义一个Router,测试路由功能
type PingRouter struct {znet.BaseRouter
}func (pr *PingRouter) Handler(request ziface.IRequest) {fmt.Println("call router handler...")//先读取客户端数据,再回写ping...ping...ping...fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))//回写pingerr := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))if err != nil {fmt.Println(err)}
}//定义第二个Router
type HelloRouter struct {znet.BaseRouter
}func (hr *HelloRouter) Handler(request ziface.IRequest) {fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))if err != nil {fmt.Println(err)}
}func main() {s := znet.NewServer("[Zinx v0.7]")//添加自定义路由(PingRouter和HelloRouter)router0 := &PingRouter{}s.AddRouter(0, router0)router1 := &HelloRouter{}s.AddRouter(1, router1)s.Serve()
}

结果:
在这里插入图片描述

  • 接受多个客户端也可以
    在这里插入图片描述
  • 当client0退出时,不会影响client1
    在这里插入图片描述

2 创建消息队列及多任务[v0.8]

  1. 创建一个消息队列,MsgHandler消息管理模块增加:TaskQueue、WorkerPoolSize
  2. 创还能多任务worker的工作池并且启动
  3. 将之前发送的消息,全部改为把消息发送给消息队列和worker工作池来处理

在这里插入图片描述

实现消息队列机制和工作池机制(集成到自定义框架)

  1. 创建一个消息队列:MsgHandler消息管理模块
  2. 创建多任务worker的工作池并启动
  3. 将之前发送的消息,全部改为把消息发送给消息队列和worker工作池来处理
  4. 将消息队列机制集成到Zinx框架中
  • 开启并调用消息队列及worker工作池
  • 将从客户端处理的消息,发送给当前Worker的工作池来处理

zinx/znet/server.go

package znetimport ("fmt""myTest/zinx/util""myTest/zinx/ziface""net"
)type Server struct {Name       stringIPVersion  stringIP         stringPort       intMsgHandler *MsgHandle
}func NewServer(name string) *Server {s := &Server{Name:       name,IPVersion:  "tcp4",IP:         util.GlobalObject.Host,Port:       util.GlobalObject.TcpPort,MsgHandler: NewMsgHandle(),}return s
}func (s *Server) Start() {//启动服务监听端口fmt.Printf("[Zinx] Server Name :%s , listen IP :%v , Port: %d is starting \n", s.Name, s.IP, s.Port)fmt.Printf("[Zinx] Version :%s , MaxConn:%v , MaxPackageSize: %d \n", util.GlobalObject.Version, util.GlobalObject.MaxConn, util.GlobalObject.MaxPackageSize)var cid uint32 = 0go func() {//0 开启消息队列及Worker工作池s.MsgHandler.StartWorkerPool()addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Printf("resolve tcp addr error %v\n", err)return}listener, err := net.ListenTCP(s.IPVersion, addr)if err != nil {fmt.Println("listen ", s.IPVersion, " err ", err)return}fmt.Println("[start] Zinx server success ", s.Name, "Listening...")//阻塞连接,处理业务for {conn, err := listener.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}dealConn := NewConnection(conn, cid, s.MsgHandler)cid++//开启goroutine处理启动当前conngo dealConn.Start()}}()
}func (s *Server) Stop() {}func (s *Server) Serve() {s.Start()//阻塞,一直读取客户端所发送过来的消息select {}
}func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {s.MsgHandler.AddRouter(msgId, router)
}

zinx/znet/connection.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""io""myTest/zinx/util""net"
)type Connection struct {Conn       *net.TCPConnConnID     uint32isClosed   boolmsgChannel chan []byte//告知当前的连接已经退出/停止(由Reader告知writer退出)ExitChan   chan boolMsgHandler *MsgHandle
}func NewConnection(conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c := &Connection{Conn:       conn,ConnID:     connID,MsgHandler: msgHandle,isClosed:   false,msgChannel: make(chan []byte),ExitChan:   make(chan bool, 1),}return c
}func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println("[conn Writer  goroutine exit!]", c.RemoteAddr().String())//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端for {select {case data := <-c.msgChannel://有数据写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send data error , ", err)return}case <-c.ExitChan://代表reader已经退出,此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp := NewDataPack()//读取客户端的msg Head 二进制流 8字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head err ", err)break}//拆包,将读取到的headData封装为msgmsg, err := dp.UnPack(headData)if err != nil {fmt.Println("unpack msg err ", err)break}//根据dataLen,再次读取Data,放在msg.Data中,var data []byte//如果数据包中有数据,则读取if msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())//将切片data读满if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data err ", err)break}}msg.SetData(data)//封装请求,改为router处理r := Request{conn: c,msg:  msg,}//判断是否开启workerPool,如果没有开启则直接创建协程处理;如果开启则通过workerPool处理if util.GlobalObject.WorkerPoolSize > 0 {c.MsgHandler.SendMsgToTaskQueue(&r)} else {go c.MsgHandler.DoMsgHandler(&r)}}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = truec.Conn.Close()c.ExitChan <- trueclose(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New("connection closed\n")}//将data进行封包dp := NewDataPack()binaryMsg, err := dp.Pack(NewMessage(msgId, data))if err != nil {fmt.Println("Pack error msg id=", msgId)return errors.New("pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("write msg id ", msgId, " error ", err)return errors.New("conn write err ")}return nil
}

zinx/znet/msgHandler.go

package znetimport ("fmt""myTest/zinx/util""myTest/zinx/ziface""strconv"
)type MsgHandle struct {//msgId与对应的router对应Api map[uint32]ziface.IRouter//负责worker取任务的消息队列TaskQueue []chan ziface.IRequest//业务工作worker池的goroutine数量WorkerPoolSize uint32
}func NewMsgHandle() *MsgHandle {return &MsgHandle{Api:            make(map[uint32]ziface.IRouter),TaskQueue:      make([]chan ziface.IRequest, util.GlobalObject.WorkerPoolSize),WorkerPoolSize: util.GlobalObject.WorkerPoolSize,}
}func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {//判断是否有对应的routerif _, ok := mh.Api[request.GetMsgID()]; !ok {fmt.Println("msgId ", request.GetMsgID(), "does not exist handler, need to add router")return}//call handlerrouter := mh.Api[request.GetMsgID()]router.PreHandle(request)router.Handler(request)router.PostHandler(request)
}func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {if _, ok := mh.Api[msgId]; ok {//如果已经存在了对应的router,则提示panic("repeat api, msgId = " + strconv.Itoa(int(msgId)))}mh.Api[msgId] = routerfmt.Println("msgId ", msgId, "Add router success ")
}//启动一个worker工作池(开启工作池的动作只能发生一次,一个zinx框架只能有一个worker工作池)
func (mh *MsgHandle) StartWorkerPool() {for i := 0; i < int(mh.WorkerPoolSize); i++ {//开辟任务队列mh.TaskQueue[i] = make(chan ziface.IRequest, util.GlobalObject.MaxWorkerTaskLen)//启动workergo mh.startOneWorker(i, mh.TaskQueue[i])}
}func (mh *MsgHandle) startOneWorker(workerId int, taskQueue chan ziface.IRequest) {fmt.Println("Worker ID=", workerId, " is started...")for {select {//从任务队列中取消息(如果有消息过来,出列的就是request,然后执行该request所绑定的业务)case request := <-taskQueue:mh.DoMsgHandler(request)}}
}//将消息交给taskQueue,由Worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {//通过取余数的方式来达到负载均衡workID := request.GetConnection().GetConnectionID() % util.GlobalObject.WorkerPoolSizefmt.Println("Add ConnID=", request.GetConnection().GetConnectionID()," requestID=", request.GetMsgID()," workID=", workID)//将消息发送给对应worker的任务队列mh.TaskQueue[workID] <- request
}

zinx/ziface/imsgHandler.go

package zifacetype IMsgHandler interface {DoMsgHandler(request IRequest)AddRouter(msgId uint32, router IRouter)StartWorkerPool()SendMsgToTaskQueue(request IRequest)
}

测试

myDemo/ZinxV0.8/Server.go

同myDemo/ZinxV0.7/Server.go,修改一下NewServer时候所传的Zinx的名称即可

myDemo/ZinxV0.8/Client.go

同myDemo/ZinxV0.7/Client.go

myDemo/ZinxV0.8/zinx.json

{"Name": "Zinx Server Application","Version": "V0.8","Host": "0.0.0.0","TcpPort": 8092,"MaxConn": 30,"MaxPackageSize": 1024,"WorkerPoolSize": 10
}

在这里插入图片描述

在这里插入图片描述

3 连接管理器(connManager)[v0.9]

3.1 连接管理器(conn)的定义与实现

创建一个连接管理模块ConnManager

  • 添加连接
  • 删除连接
  • 根据连接ID查找对应的连接
  • 总连接个数
  • 清理全部的连接

3.2 将连接管理模块集成到Zinx框架中

  1. 给server添加一个ConnMgr属性
  2. 修改NewServer方法,加入ConnMgr初始化
  3. 判断当前连接数是否超出最大值MaxConn
  4. 当server停止的时候(调用server.Stop方法),应该加入ConnMgr.ClearConn()

3.3 提供创建连接/销毁连之前所需的Hook函数

给我们自定义框架Zinx提供创建连接之后/销毁连接之前所要处理的一些业务。提供给用户能够注册的Hook函数

  • 添加OnConnStart()
  • 添加OnConnStop()

zinx/ziface/iserver.go

package zifacetype IServer interface {Start()Stop()Serve()AddRouter(msgId uint32, router IRouter)GetConnMgr() IConnManager//注册创OnConnStart钩子函数SetOnConnStart(func(conn IConnection))SetOnConnStop(func(conn IConnection))//调用OnConnStart钩子函数CallOnConnStart(conn IConnection)CallOnConnStop(conn IConnection)
}

zinx/ziface/iconnmanager.go

package zifacetype IConnManager interface {Add(conn IConnection)Remove(conn IConnection)Get(connID uint32) (IConnection, error)Len() intClearConn()
}

zinx/znet/connmanager.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""myTest/zinx/util""myTest/zinx/ziface""sync"
)type ConnManager struct {connections map[uint32]ziface.IConnection //管理的连接集合connLock    sync.RWMutex                  //保护连接集合的读写锁
}func NewConnManager() *ConnManager {return &ConnManager{connections: make(map[uint32]ziface.IConnection, util.GlobalObject.MaxConn),}
}
func (cm *ConnManager) Add(conn ziface.IConnection) {//添加写锁cm.connLock.Lock()defer cm.connLock.Unlock()cm.connections[conn.GetConnectionID()] = connfmt.Println("connectionID=", conn.GetConnectionID(), " add to ConnManager success, conn num=", cm.Len())
}func (cm *ConnManager) Remove(conn ziface.IConnection) {//保护共享资源mapcm.connLock.Lock()defer cm.connLock.Unlock()delete(cm.connections, conn.GetConnectionID())fmt.Println("connectionID=", conn.GetConnectionID(), " remote from ConnManager success, conn num=", cm.Len())
}func (cm *ConnManager) Get(connID uint32) (ziface.IConnection, error) {cm.connLock.RLock()defer cm.connLock.RUnlock()if conn, ok := cm.connections[connID]; ok {return conn, nil} else {return nil, errors.New("connection NOT FOUND")}
}func (cm *ConnManager) Len() int {return len(cm.connections)
}func (cm *ConnManager) ClearConn() {cm.connLock.Lock()defer cm.connLock.Unlock()for connID, conn := range cm.connections {//停止连接conn.Stop()//删除连接delete(cm.connections, connID)}fmt.Println("Clear All connections success! conn num=", cm.Len())
}

zinx/znet/connection.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""io""myTest/zinx/util""myTest/zinx/ziface""net"
)type Connection struct {Conn       *net.TCPConnConnID     uint32isClosed   boolmsgChannel chan []byte//告知当前的连接已经退出/停止(由Reader告知writer退出)ExitChan   chan boolMsgHandler *MsgHandleTcpServer  ziface.IServer
}func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c := &Connection{Conn:       conn,ConnID:     connID,MsgHandler: msgHandle,isClosed:   false,msgChannel: make(chan []byte),ExitChan:   make(chan bool, 1),TcpServer:  server,}//将conn添加到connMgr中c.TcpServer.GetConnMgr().Add(c)return c
}func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println("[conn Writer  goroutine exit!]", c.RemoteAddr().String())//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端for {select {case data := <-c.msgChannel://有数据写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send data error , ", err)return}case <-c.ExitChan://代表reader已经退出,此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp := NewDataPack()//读取客户端的msg Head 二进制流 8字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head err ", err)break}//拆包,将读取到的headData封装为msgmsg, err := dp.UnPack(headData)if err != nil {fmt.Println("unpack msg err ", err)break}//根据dataLen,再次读取Data,放在msg.Data中,var data []byte//如果数据包中有数据,则读取if msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())//将切片data读满if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data err ", err)break}}msg.SetData(data)//封装请求,改为router处理r := Request{conn: c,msg:  msg,}//判断是否开启workerPool,如果没有开启则直接创建协程处理;如果开启则通过workerPool处理if util.GlobalObject.WorkerPoolSize > 0 {c.MsgHandler.SendMsgToTaskQueue(&r)} else {go c.MsgHandler.DoMsgHandler(&r)}}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()//执行钩子函数c.TcpServer.CallOnConnStart(c)
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = true//连接关闭之前执行hook关闭的钩子函数c.TcpServer.CallOnConnStop(c)c.Conn.Close()c.ExitChan <- true//连接conn关闭时,需要从连接管理模块中移除c.TcpServer.GetConnMgr().Remove(c)close(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New("connection closed\n")}//将data进行封包dp := NewDataPack()binaryMsg, err := dp.Pack(NewMessage(msgId, data))if err != nil {fmt.Println("Pack error msg id=", msgId)return errors.New("pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("write msg id ", msgId, " error ", err)return errors.New("conn write err ")}return nil
}

zinx/znet/server.go

package znetimport ("fmt""myTest/zinx/util""myTest/zinx/ziface""net"
)type Server struct {Name       stringIPVersion  stringIP         stringPort       intMsgHandler *MsgHandleConnMgr    *ConnManager//创建连接之前的Hook函数OnConnStart func(conn ziface.IConnection)OnConnStop  func(conn ziface.IConnection)
}func NewServer(name string) *Server {s := &Server{Name:       name,IPVersion:  "tcp4",IP:         util.GlobalObject.Host,Port:       util.GlobalObject.TcpPort,MsgHandler: NewMsgHandle(),ConnMgr:    NewConnManager(),}return s
}func (s *Server) Start() {//启动服务监听端口fmt.Printf("[Zinx] Server Name :%s , listen IP :%v , Port: %d is starting \n", s.Name, s.IP, s.Port)fmt.Printf("[Zinx] Version :%s , MaxConn:%v , MaxPackageSize: %d \n", util.GlobalObject.Version, util.GlobalObject.MaxConn, util.GlobalObject.MaxPackageSize)var cid uint32 = 0go func() {//0 开启消息队列及Worker工作池s.MsgHandler.StartWorkerPool()addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Printf("resolve tcp addr error %v\n", err)return}listener, err := net.ListenTCP(s.IPVersion, addr)if err != nil {fmt.Println("listen ", s.IPVersion, " err ", err)return}fmt.Println("[start] Zinx server success ", s.Name, "Listening...")//阻塞连接,处理业务for {conn, err := listener.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}//判断当前连接数是否超过最大连接数,如果超过则关闭新创建的连接if s.ConnMgr.Len() >= util.GlobalObject.MaxConn {//TODO 给客户端返回一个超出最大连接的错误包fmt.Println("-----------------》 Tcp Conn exceed, conn num=", util.GlobalObject.MaxConn)conn.Close()//关闭当前连接,等待下一次连接【如果当前连接数小于最大连接数】continue}dealConn := NewConnection(s, conn, cid, s.MsgHandler)cid++//开启goroutine处理启动当前conngo dealConn.Start()}}()
}func (s *Server) Stop() {//释放相关资源fmt.Println("[STOP] Zinx server name ", s.Name)s.ConnMgr.ClearConn()
}func (s *Server) Serve() {s.Start()//阻塞,一直读取客户端所发送过来的消息select {}
}func (s *Server) GetConnMgr() ziface.IConnManager {return s.ConnMgr
}func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {s.MsgHandler.AddRouter(msgId, router)
}//注册创OnConnStart钩子函数
func (s *Server) SetOnConnStart(hookFunc func(conn ziface.IConnection)) {s.OnConnStart = hookFunc
}func (s *Server) SetOnConnStop(hookFunc func(conn ziface.IConnection)) {s.OnConnStop = hookFunc
}//调用OnConnStart钩子函数
func (s *Server) CallOnConnStart(conn ziface.IConnection) {if s.OnConnStart != nil {fmt.Println("---------> call OnConnStart()")s.OnConnStart(conn)}
}func (s *Server) CallOnConnStop(conn ziface.IConnection) {if s.OnConnStop != nil {fmt.Println("----------> call OnConnStop()")s.OnConnStop(conn)}
}

测试

myDemo/ZinxV0.9/Server.go
package mainimport ("fmt""myTest/zinx/ziface""myTest/zinx/znet"
)//自定义一个Router,测试路由功能
type PingRouter struct {znet.BaseRouter
}func (pr *PingRouter) Handler(request ziface.IRequest) {fmt.Println("call router handler...")//先读取客户端数据,再回写ping...ping...ping...fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))//回写pingerr := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))if err != nil {fmt.Println(err)}
}//定义第二个Router
type HelloRouter struct {znet.BaseRouter
}func (hr *HelloRouter) Handler(request ziface.IRequest) {fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))if err != nil {fmt.Println(err)}
}//连接创建成功之后需要执行的逻辑
func DoConnBegin(conn ziface.IConnection) {fmt.Println("=====>Do Conn Begin...")if err := conn.SendMsg(202, []byte("do connection begin...")); err != nil {fmt.Println("err")}
}//连接断开之前要执行的逻辑
func DoConnLost(conn ziface.IConnection) {fmt.Println("=====>Do Conn Lost...")fmt.Println("connID=", conn.GetConnectionID(), " is Lost....")
}func main() {s := znet.NewServer("[Zinx v0.9]")//添加自定义路由(PingRouter和HelloRouter)router0 := &PingRouter{}s.AddRouter(0, router0)router1 := &HelloRouter{}s.AddRouter(1, router1)//注册hook钩子函数s.SetOnConnStart(DoConnBegin)s.SetOnConnStop(DoConnLost)s.Serve()
}

测试代码中的myDemo/ZinxV0.9/Client.go和myDemo/ZinxV0.8/Client.go一样。

  • 为了方便测试超过最大连接数的报错信息,我们可以修改配置文件
    在这里插入图片描述
//将最大连接数设置为2,然后我们复制Client.go,可以多起几个Client来进行测试
{"Name": "Zinx Server Application","Version": "V0.9","Host": "0.0.0.0","TcpPort": 8092,"MaxConn": 2,"MaxPackageSize": 1024,"WorkerPoolSize": 10
}

测试最大连接数与连接管理:
在这里插入图片描述

测试钩子函数:
在这里插入图片描述

4 添加连接属性并测试【v0.10】

通过map[string]interface{}来存储连接的属性值,通过RWLock来保证读写connection属性值安全

  • 设置连接属性
  • 获取连接属性
  • 移除连接属性

zinx/ziface/iconnection.go

package zifaceimport "net"type IConnection interface {//启动连接Start()//停止连接Stop()//获取当前连接的Conn对象GetTCPConnection() *net.TCPConn//获取当前连接模块的idGetConnectionID() uint32//获取远程客户端的TCP状态 IP:PortRemoteAddr() net.Addr//发送数据SendMsg(msgId uint32, data []byte) errorSetProperty(key string, value interface{})GetProperty(key string) (interface{}, error)RemoveProperty(key string)
}//定义一个处理连接业务的方法
type HandleFunc func(*net.TCPConn, []byte, int) error

zinx/znet/connection.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""io""myTest/zinx/util""myTest/zinx/ziface""net""sync"
)type Connection struct {Conn       *net.TCPConnConnID     uint32isClosed   boolmsgChannel chan []byte//告知当前的连接已经退出/停止(由Reader告知writer退出)ExitChan     chan boolMsgHandler   *MsgHandleTcpServer    ziface.IServerproperty     map[string]interface{}propertyLock sync.RWMutex
}func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c := &Connection{Conn:       conn,ConnID:     connID,MsgHandler: msgHandle,isClosed:   false,msgChannel: make(chan []byte),ExitChan:   make(chan bool, 1),TcpServer:  server,property: make(map[string]interface{}),}//将conn添加到connMgr中c.TcpServer.GetConnMgr().Add(c)return c
}func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println("[conn Writer  goroutine exit!]", c.RemoteAddr().String())//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端for {select {case data := <-c.msgChannel://有数据写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send data error , ", err)return}case <-c.ExitChan://代表reader已经退出,此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp := NewDataPack()//读取客户端的msg Head 二进制流 8字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head err ", err)break}//拆包,将读取到的headData封装为msgmsg, err := dp.UnPack(headData)if err != nil {fmt.Println("unpack msg err ", err)break}//根据dataLen,再次读取Data,放在msg.Data中,var data []byte//如果数据包中有数据,则读取if msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())//将切片data读满if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data err ", err)break}}msg.SetData(data)//封装请求,改为router处理r := Request{conn: c,msg:  msg,}//判断是否开启workerPool,如果没有开启则直接创建协程处理;如果开启则通过workerPool处理if util.GlobalObject.WorkerPoolSize > 0 {c.MsgHandler.SendMsgToTaskQueue(&r)} else {go c.MsgHandler.DoMsgHandler(&r)}}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()//执行钩子函数c.TcpServer.CallOnConnStart(c)
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = true//连接关闭之前执行hook关闭的钩子函数c.TcpServer.CallOnConnStop(c)c.Conn.Close()c.ExitChan <- true//连接conn关闭时,需要从连接管理模块中移除c.TcpServer.GetConnMgr().Remove(c)close(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New("connection closed\n")}//将data进行封包dp := NewDataPack()binaryMsg, err := dp.Pack(NewMessage(msgId, data))if err != nil {fmt.Println("Pack error msg id=", msgId)return errors.New("pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("write msg id ", msgId, " error ", err)return errors.New("conn write err ")}return nil
}func (c *Connection) SetProperty(key string, value interface{}) {c.propertyLock.Lock()defer c.propertyLock.Unlock()c.property[key] = value
}func (c *Connection) GetProperty(key string) (interface{}, error) {c.propertyLock.RLock()defer c.propertyLock.RUnlock()if value, ok := c.property[key]; ok {return value, nil} else {return nil, errors.New("no property found")}
}func (c *Connection) RemoveProperty(key string) {c.propertyLock.Lock()defer c.propertyLock.Unlock()delete(c.property, key)
}

测试

myDemo/ZinxV0.10/Server.go
package mainimport ("fmt""myTest/zinx/ziface""myTest/zinx/znet"
)//自定义一个Router,测试路由功能
type PingRouter struct {znet.BaseRouter
}func (pr *PingRouter) Handler(request ziface.IRequest) {fmt.Println("call router handler...")//先读取客户端数据,再回写ping...ping...ping...fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))//回写pingerr := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))if err != nil {fmt.Println(err)}
}//定义第二个Router
type HelloRouter struct {znet.BaseRouter
}func (hr *HelloRouter) Handler(request ziface.IRequest) {fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))if err != nil {fmt.Println(err)}
}//连接创建成功之后需要执行的逻辑
func DoConnBegin(conn ziface.IConnection) {fmt.Println("=====>Do Conn Begin...")if err := conn.SendMsg(202, []byte("do connection begin...")); err != nil {fmt.Println("err")}//给conn设置属性conn.SetProperty("Name", "ziyi")conn.SetProperty("士兵突击", "https://www.bilibili.com/video/BV1Lk4y1N7tC/")
}//连接断开之前要执行的逻辑
func DoConnLost(conn ziface.IConnection) {fmt.Println("=====>Do Conn Lost...")fmt.Println("connID=", conn.GetConnectionID(), " is Lost....")//读取属性property, _ := conn.GetProperty("Name")fmt.Println("Get Property Name=", property)property, _ = conn.GetProperty("士兵突击")fmt.Println("Get Property 士兵突击=", property)
}func main() {s := znet.NewServer("[Zinx v0.10]")//添加自定义路由(PingRouter和HelloRouter)router0 := &PingRouter{}s.AddRouter(0, router0)router1 := &HelloRouter{}s.AddRouter(1, router1)//注册hook钩子函数s.SetOnConnStart(DoConnBegin)s.SetOnConnStop(DoConnLost)s.Serve()
}
myDemo/ZinxV0.10/Client.go
package mainimport ("fmt""io""myTest/zinx/znet""net""time"
)/*
模拟客户端
*/
func main() {fmt.Println("client start...")time.Sleep(time.Second * 1)//1 创建服务器连接conn, err := net.Dial("tcp", "127.0.0.1:8092")if err != nil {fmt.Println("client start err ", err)return}for {//发送封装后的数据包dp := znet.NewDataPack()binaryMsg, err := dp.Pack(znet.NewMessage(0, []byte("Zinx client0 test msg")))if err != nil {fmt.Println("client pack msg err ", err)return}if _, err := conn.Write(binaryMsg); err != nil {fmt.Println("client write err ", err)return}//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...//1 先读取流中的head部分,得到Id和dataLenbinaryHead := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(conn, binaryHead); err != nil {fmt.Println("client read head err ", err)break}//将二进制的head拆包到msg中msgHead, err := dp.UnPack(binaryHead)if err != nil {fmt.Println("client unpack msgHead err ", err)break}if msgHead.GetMsgLen() > 0 {//2 有数据, 再根据dataLen进行二次读取,将data读出来msg := msgHead.(*znet.Message)msg.Data = make([]byte, msg.GetMsgLen())if _, err := io.ReadFull(conn, msg.Data); err != nil {fmt.Println("read msg data error ", err)return}fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))}//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片time.Sleep(time.Second * 1)}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/54148.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

英特尔傲腾CAS报错unknown error cache acceleration software could not start cache

英特尔傲腾CAS报错unknown error cache acceleration software could not start cache 文章目录 英特尔傲腾CAS报错unknown error cache acceleration software could not start cache我是怎么遇到这个问题的我是如何解决的实验步骤打Primo Cache蓝屏补丁拔掉原来的系统盘开关机…

【数据结构】常见的排序算法

常见的排序算法 常见的排序算法插入排序之直接插入排序时间复杂度特性总结 插入排序之希尔排序时间复杂度 选择排序之直接选择排序特性总结 选择排序之堆排序时间复杂度特性总结 交换排序之冒泡排序特性总结 交换排序之快速排序hoare版本挖坑法双指针法快速排序的优化1&#xf…

HTML5中Canvas学习笔记:Canvas

目录 一、HTML中Canvas画图strokeStyle 和 fillStyle 的区别是什么&#xff1f; 二、如何设置一幅canvas图中某个颜色透明&#xff1f; 三、H5 canvas中strokeRect参数如果是小数&#xff0c;如何处理&#xff1f; 四、H5 Canvas中如何画圆角矩形框&#xff1f; 一、HTML中…

python GUI nicegui初识一(登录界面创建)

最近尝试了python的nicegui库&#xff0c;虽然可能也有一些不足&#xff0c;但个人感觉对于想要开发不过对ui设计感到很麻烦的人来说是很友好的了&#xff0c;毕竟nicegui可以利用TailwindCSS和Quasar进行ui开发&#xff0c;并且也支持定制自己的css样式。 这里记录一下自己利…

K8S系列文章之 一键部署K8S环境

部署的原理是基于自动化部署工具 Ansible 实现的&#xff0c;需要提前安装Ansible 并配置下主机节点环境 1. 安装 Ansible 首先ansible基于python2.X 环境&#xff0c;默认centos都已经安装好了python2环境 // 最好更新下库 // yum update yum install -y epel-release yum i…

Qt 编译 Android 项目,输出乱码

乱码如下&#xff1a; :-1: error: 娉 C:\Qt\6.5.0\android_arm64_v8a\src\android\java\src\org\qtproject\qt\android\bindings\QtActivity.java浣跨敤鎴栬鐩栦簡宸茶繃鏃剁殑 API銆 娉 鏈夊叧璇︾粏淇℃伅, 璇蜂娇鐢-Xlint:deprecation 閲嶆柊缂栬瘧銆 正确的应该是&#…

20天突破英语四级高频词汇——第①天

2&#xfeff;0天突破英语四级高频词汇~第一天加油(ง •_•)ง&#x1f4aa; &#x1f433;博主&#xff1a;命运之光 &#x1f308;专栏&#xff1a;英语四级高频词汇速记 &#x1f30c;博主的其他文章&#xff1a;点击进入博主的主页 目录 2&#xfeff;0天突破英语四级…

Godot 4 源码分析 - Path2D与PathFollow2D

学习演示项目dodge_the_creeps&#xff0c;发现里面多了一个Path2D与PathFollow2D 研究GDScript代码发现&#xff0c;它主要用于随机生成Mob var mob_spawn_location get_node(^"MobPath/MobSpawnLocation")mob_spawn_location.progress randi()# Set the mobs dir…

【C#学习笔记】类型转换

文章目录 类型转换字符转数字GetNumericValueConvert.ToInt32隐式转换计算 字符串转数字Parse 或 TryParse 方法 字节数组转整数 as&#xff0c;is强制类型转换isas 用户定义的转换 类型转换 我们简单地将值类型分为5种&#xff1a;整数型&#xff0c;浮点型&#xff0c;布尔型…

复原 IP 地址——力扣93

文章目录 题目描述回溯题目描述 回溯 class Solution{public:static constexpr int seg_count=4<

虚拟化中的中断机制:X86与PIC 8259A探索(上)

本系列深入探讨虚拟化中断技术&#xff0c;从X86架构和PIC 8259A的基础&#xff0c;到IOAPIC和MSI的编程&#xff0c;再到MSIX技术与Broiler设备的实战应用&#xff0c;全面剖析中断虚拟化的前沿进展。 X86 中断机制 ​ 在计算机架构中&#xff0c;CPU 运行的速度远远大于外设…

在校外连接校内实验室服务器

zerotier 内网穿透 一、zerotier的操作 去官网注册、登录、创建网络 zerotier官网 我使用微软账号登录的&#xff0c;这个随便 点 Create A Network NETWORK ID点ID进去 二、服务器(校内)上的操作 1. Ubuntu配置SSH 如果出现不在sudoers列表的问题查看这里 sudo apt …