Golang网络模型netpoll源码解析

news/2024/11/28 5:36:54/文章来源:https://www.cnblogs.com/MelonTe/p/18571255

0、引言

在学习完了Socket编程的基础知识、Linux系统提供的I/O多路复用的实现以及Golang的GMP调度模型之后,我们进而学习Golang的网络模型——netpoll。本文将从为什么需要使用netpoll模型,以及netpoll的具体流程实现两个主要角度来展开学习。当前使用的Go的版本为1.22.4,Linux系统。

1、为什么要使用netpoll模型?

首先,什么是多路复用?

多路,指的是存在着多个需要服务的对象;复用,指的是重复利用一个单元来为上述的多个目标提供服务。

我们知道,Linux系统为用户提供了三个内核实现的IO多路复用技术的系统调用,用发展时间来排序分别为:select->poll->epoll。其中,epoll在当今使用的最为广泛,对比与select调用,它有以下的优势:

  • fd数量灵活:可监听的fd数量上限灵活,使用方可以在调用epoll_create操作时自行指定。
  • 更少的内核拷贝次数:在内核中,使用红黑树的结构来存储需要监听的fd,相比与调用select每次需要将所有的fd拷贝进内核,监听到事件后再全部拷贝回用户态,epoll只需要将需要监听的fd添加到事件表后,即可多次监听。
  • 返回结果明确epoll运行将就绪事件添加到就绪事件列表中,当用户调用epoll_wait操作时,内核只返回就绪事件,而select返回的是所有的事件,需要用户再进行一次遍历,找到就绪事件再处理。

需要注意的是,在不同的条件环境下,epoll的优势可能反而作用不明显。epoll只适用在监听fd基数较大且活跃度不高的场景,如此epoll事件表的空间复用和epoll_wait操作的精准才能体现出其优势;而当处在fd基数较小且活跃度高的场景下,select反而更加简单有效,构造epoll的红黑树结构的消耗会成为其累赘。

考虑到场景的多样性,我们会选择使用epoll去完成内核事件监听的操作,那么如何将golangepoll结合起来呢?

在 Go 语言的并发模型中,GMP 框架实现了一种高效的协程调度机制,它屏蔽了操作系统线程的细节,用户可以通过轻量级的 Goroutine 来实现细粒度的并发操作。然而,底层的 IO 多路复用机制(如 Linux 的 epoll)调度的单位仍然是线程(M)。为了将 IO 调度从线程层面提升到协程层面,充分发挥 Goroutine 的高并发优势,netpoll 应运而生。

接下来我们就来学习netpoll框架的实现。

2、netpoll实现原理

2.1、核心结构

1、pollDesc

为了将IO调度从线程提升到协程层面,netpoll框架有个重要的核心结构pollDesc,它有两个,一个为表层,含有指针指向了里层的pollDesc。本文中讲到的pollDesc都为里层pollDesc

表层pollDesc定位在internel/poll/fd_poll_runtime.go文件中:

type pollDesc struct {runtimeCtx uintptr
}

使用一个runtimeCtx指针指向其底层实现实例。

里层的位于runtime/netpoll.go中。

//网络poller描述符
type pollDesc struct {//next指针,指向在pollCache链表结构中,以下个pollDesc实例。link  *pollDesc      //指向fdfd    uintptr//读事件状态标识器,状态有四种://1、pdReady:表示读操作已就绪,等待处理//2、pdWait:表示g将要被阻塞等待读操作就绪,此时还未阻塞//3、g:读操作的g已经被阻塞,rg指向阻塞的g实例//4、pdNil:空rg atomic.Uintptr wg atomic.Uintptr //...
}

pollDesc的核心字段是读/写标识器rg/wg,它用于标识fd的io事件状态,并且持有被阻塞的g实例。当后续需要唤醒这个g处理读写事件的时候,可以通过pollDesc追溯得到g的实例进行操作。有了pollDesc这个数据结构,Golang就能将对处理socket的调度单位从线程Thread转换成协程G

2、pollCache

pollCache缓冲池采用了单向链表的方式存储多个pollDesc实例。

type pollCache struct {lock  mutexfirst *pollDesc
}

其包含了两个核心方法,分别是alloc()free()

//从pollCache中分配得到一个pollDesc实例
func (c *pollCache) alloc() *pollDesc {lock(&c.lock)//如果链表为空,则进行初始化if c.first == nil {//pdSize = 248const pdSize = unsafe.Sizeof(pollDesc{})//4096 / 248 = 16n := pollBlockSize / pdSizeif n == 0 {n = 1}//分配指定大小的内存空间mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)//完成指定数量的pollDesc创建for i := uintptr(0); i < n; i++ {pd := (*pollDesc)(add(mem, i*pdSize))pd.link = c.firstc.first = pd}}pd := c.firstc.first = pd.linklockInit(&pd.lock, lockRankPollDesc)unlock(&c.lock)return pd
}
//free用于将一个pollDesc放回pollCache
func (c *pollCache) free(pd *pollDesc) {//...lock(&c.lock)pd.link = c.firstc.first = pdunlock(&c.lock)
}

2.2、netpoll框架宏观流程

在宏观的角度下,netpoll框架主要涉及了以下的几个流程:

  • poll_init:底层调用epoll_create指令,在内核态中开辟epoll事件表。
  • poll_open:先构造一个pollDesc实例,然后通过epoll_ctl(ADD)指令,向内核中添加要监听的socket,并将这一个fd绑定在pollDesc中。pollDesc含有状态标识器rg/wg,用于标识事件状态以及存储阻塞的g。
  • poll_wait:当g依赖的事件未就绪时,调用gopark方法,将g置为阻塞态存放在pollDesc中。
  • net_poll:GMP调度器会轮询netpoll流程,通常会用非阻塞的方式发起epoll_wait指令,取出就绪的pollDesc,提前出其内部陷入阻塞态的g然后将其重新添加到GMP的调度队列中。(以及在sysmon流程和gc流程都会触发netpoll)

3、流程源码实现

3.1、流程入口

我们参考以下的简易TCP服务器实现框架,走进netpoll框架的具体源码实现。

// 启动 tcp server 代码示例
func main() {//创建TCP端口监听器,涉及以下事件://1:创建socket fd,调用bind和accept系统接口函数//2:调用epoll_create,创建eventpool//3:调用epoll_ctl(ADD),将socket fd注册到epoll事件表l, _ := net.Listen("tcp", ":8080")// eventloop reactor 模型for {//等待TCP连接到达,涉及以下事件://1:循环+非阻塞调用accept//2:若未就绪,则调用gopark进行阻塞//3:等待netpoller轮询唤醒//4:获取到conn fd后注册到eventpool//5:返回connconn, _ := l.Accept()// goroutine per conngo serve(conn)}
}// 处理一笔到来的 tcp 连接
func serve(conn net.Conn) {//关闭conn,从eventpool中移除fddefer conn.Close()var buf []byte//读取conn中的数据,涉及以下事件://1:循环+非阻塞调用recv(read)//2:若未就绪,通过gopark阻塞,等待netpoll轮询唤醒_, _ = conn.Read(buf)//向conn中写入数据,涉及以下事件://1:循环+非阻塞调用writev (write)//2:若未就绪,通过gopark阻塞,等待netpoll轮询唤醒_, _ = conn.Write(buf)
}

3.2、Socket创建

net.Listen方法为入口,进行创建socket fd,调用的方法栈如下:

方法 文件
net.Listen() net/dial.go
net.ListenConfig.Listen() net/dial.go
net.sysListener.listenTCP() net/tcpsock_posix.go
net.internetSocket() net/ipsock_posix.go
net.socket() net/sock_posix.go

核心的调用在net.socket()方法内,源码核心流程如下:

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {//进行socket系统调用,创建一个sockets, err := sysSocket(family, sotype, proto)//绑定socket fdfd, err = newFD(s, family, sotype, net);//...//进行了以下事件://1、通过syscall bind指令绑定socket的监听地址//2、通过syscall listen指令发起对socket的监听//3、完成epollEvent表的创建(全局执行一次)//4、将socket fd注册到epoll事件表中,监听读写就绪事件err := fd.listenStream(ctx, laddr, listenerBacklog(), ctrlCtxFn);
}

首先先执行了sysSocket系统调用,创建一个socket,它是一个整数值,用于标识操作系统中打开的文件或网络套接字;接着调用newFD方法包装成netFD对象,以便实现更高效的异步 IO 和 Goroutine 调度。

3.3、poll_init

紧接3.2中的net.socket方法,在内部还调用了net.netFD.listenStream()poll_init的调用栈如下:

方法 文件
net.netFD.listenStream() net/sock_posix.go
net.netFD.init() net/fd_unix.go
poll.FD.init() internal/poll/fd_unix.go
poll.pollDesc.init() internal/poll/fd_poll_runtime.go
runtime.poll_runtime_pollServerInit() runtime/netpoll.go
runtime.netpollinit() runtime/netpoll_epoll.go

net.netFD.listenStream()核心步骤如下:

func (fd *netFD) listenStream(ctx context.Context, laddr sockaddr, backlog int, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) error {//....//通过Bind系统调用绑定监听地址if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {return os.NewSyscallError("bind", err)}//通过Listen系统调用对socket进行监听if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {return os.NewSyscallError("listen", err)}//fd.init()进行了以下操作://1、完成eventPool的创建//2、将socket fd注册到epoll事件表中if err = fd.init(); err != nil {return err}//...return nil
}
  • 使用Bind系统调用绑定需要监听的地址
  • 使用Listen系统调用监听socket
  • 调用fd.init完成eventpool的创建以及fd的注册

net.netFD.init()方法在内部转而调用poll.FD.init()

func (fd *netFD) init() error {return fd.pfd.Init(fd.net, true)
}func (fd *FD) Init(net string, pollable bool) error {fd.SysFile.init()// We don't actually care about the various network types.if net == "file" {fd.isFile = true}if !pollable {fd.isBlocking = 1return nil}err := fd.pd.init(fd)if err != nil {// If we could not initialize the runtime poller,// assume we are using blocking mode.fd.isBlocking = 1}return err
}

然后又转入到poll.pollDesc.init()的调用中。

func (pd *pollDesc) init(fd *FD) error {//通过sysOnce结构,完成epoll事件表的唯一一次创建serverInit.Do(runtime_pollServerInit)//完成init后,进行poll_openctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))//...//绑定里层的pollDesc实例pd.runtimeCtx = ctxreturn nil
}

这里的poll.pollDesc表层pollDesc,表层pd的init是poll_initpoll_open流程的入口:

  • 执行serverInit.Do(runtime_pollServerInit),其中serverInit是名为sysOnce的特殊结构,它会保证执行的方法在全局只会被执行一次,然后执行runtime_pollServerInit,完成poll_init操作
  • 完成poll_init后,调用runtime_pollOpen(uintptr(fd.Sysfd))将fd加入到eventpool中,完成poll_open操作
  • 绑定里层的pollDesc实例

我们先来关注serverInit.Do(runtime_pollServerInit)中,执行的runtime_pollServerInit方法,它定位在runtime/netpoll.go下:

//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {netpollGenericInit()
}
func netpollGenericInit() {if netpollInited.Load() == 0 {lockInit(&netpollInitLock, lockRankNetpollInit)lock(&netpollInitLock)if netpollInited.Load() == 0 {//进入netpollinit调用netpollinit()netpollInited.Store(1)}unlock(&netpollInitLock)}
}
func netpollinit() {var errno uintptr//进行epollcreate系统调用,创建epoll事件表epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)//...//创建pipe管道,接收信号,如程序终止://r:信号接收端,会注册对应的read事件到epoll事件表中//w:信号发送端,有信号到达的时候,会往w发送信号,并对r产生读就绪事件r, w, errpipe := nonblockingPipe()//...//在epollEvent中注册监听r的读就绪事件ev := syscall.EpollEvent{Events: syscall.EPOLLIN,}*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRderrno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)//...//使用全局变量缓存pipe的读写端netpollBreakRd = uintptr(r)netpollBreakWr = uintptr(w)
}

netpollinit()方法内部,进行了以下操作:

  • 执行epoll_create指令创建了epoll事件表,并返回epoll文件描述符epfd

  • 创建了两个pipe管道,当向w端写入信号的时候,r端会发生读就绪事件。

  • 注册监听r的读就绪事件。

  • 缓存管道。

在这里,我们创建了两个管道r以及w,并且在eventpool中注册了r的读就绪事件的监听,当我们向w管道写入数据的时候,r管道就会产生读就绪事件,从而打破阻塞的epoll_wait操作,进而执行其他的操作。

3.3、poll_open

方法 文件
net.netFD.listenStream() net/sock_posix.go
net.netFD.init() net/fd_unix.go
poll.FD.init() internal/poll/fd_unix.go
poll.pollDesc.init() internal/poll/fd_poll_runtime.go
runtime.poll_runtime_pollOpen() runtime/netpoll.go
runtime.netpollopen runtime/netpoll_epoll.go

poll.pollDesc.init()方法中,完成了poll_init流程后,就会进入到poll_open流程,执行runtime.poll_runtime_pollOpen()

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {//获取一个pollDesc实例pd := pollcache.alloc()lock(&pd.lock)wg := pd.wg.Load()if wg != pdNil && wg != pdReady {throw("runtime: blocked write on free polldesc")}rg := pd.rg.Load()if rg != pdNil && rg != pdReady {throw("runtime: blocked read on free polldesc")}//绑定socket fd到pollDesc中pd.fd = fd//...//初始化读写状态标识器为无状态pd.rg.Store(pdNil)pd.wg.Store(pdNil)//...unlock(&pd.lock)//将fd添加进epoll事件表中errno := netpollopen(fd, pd)//...//返回pollDesc实例return pd, 0
}
func netpollopen(fd uintptr, pd *pollDesc) uintptr {var ev syscall.EpollEvent//通过epollctl操作,在EpollEvent中注册针对fd的监听事件//操作类型宏指令:EPOLL_CTL_ADD——添加fd并注册监听事件//事件类型:epollevent.events://1、EPOLLIN:监听读就绪事件//2、EPOLLOUT:监听写就绪事件//3、EPOLLRDHUP:监听中断事件//4、EPOLLET:使用边缘触发模式ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLETtp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())*(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tpreturn syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}

不仅在net.Listen()流程中会触发poll open,在net.Listener.Accept流程中也会,当我们获取到了连接之后,也需要为这个连接封装成一个pollDesc实例,然后执行poll_open流程将其注册到epoll事件表中。

func (fd *netFD) accept()(netfd *netFD, err error){// 通过 syscall accept 接收到来的 conn fdd, rsa, errcall, err := fd.pfd.Accept()// ...// 封装到来的 conn fdnetfd, err = newFD(d, fd.family, fd.sotype, fd.net)// 将 conn fd 注册到 epoll 事件表中err = netfd.init()// ...return netfd,nil
}

3.4、poll_close

当连接conn需要关闭的时候,最终会进入到poll_close流程,执行epoll_ctl(DELETE)删除对应的fd。

方法 文件
net.conn.Close net/net.go
net.netFD.Close net/fd_posix.go
poll.FD.Close internal/poll/fd_unix.go
poll.FD.decref internal/poll/fd_mutex.go
poll.FD.destroy internal/poll/fd_unix.go
poll.pollDesc.close internal/poll/fd_poll_runtime.go
poll.runtime_pollClose internal/poll/fd_poll_runtime.go
runtime.poll_runtime_pollClose runtime/netpoll.go
runtime.netpollclose runtime/netpoll_epoll.go
syscall.EpollCtl runtime/netpoll_epoll.go
//go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose
func poll_runtime_pollClose(pd *pollDesc) {if !pd.closing {throw("runtime: close polldesc w/o unblock")}wg := pd.wg.Load()if wg != pdNil && wg != pdReady {throw("runtime: blocked write on closing polldesc")}rg := pd.rg.Load()if rg != pdNil && rg != pdReady {throw("runtime: blocked read on closing polldesc")}netpollclose(pd.fd)pollcache.free(pd)
}
func netpollclose(fd uintptr) uintptr {var ev syscall.EpollEventreturn syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int32(fd), &ev)
}

3.5、poll_wait

poll_wait流程最终会执行gopark将g陷入到用户态阻塞

方法 文件
poll.pollDesc.wait internal/poll/fd_poll_runtime.go
poll.runtime_pollWait internal/poll/fd_poll_runtime.go
runtime.poll_runtime_pollWait runtime/netpoll.go
runtime.netpollblock runtime/netpoll.go
runtime.gopark runtime/proc.go
runtime.netpollblockcommit runtime/netpoll.go

在表层pollDesc中,会通过其内部的里层pollDesc指针,调用到runtime下的netpollblock方法。

/*针对某个 pollDesc 实例,监听指定的mode 就绪事件- 返回true——已就绪  返回false——因超时或者关闭导致中断- 其他情况下,会通过 gopark 操作将当前g 阻塞在该方法中
*/
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {//针对mode事件,获取相应的状态gpp := &pd.rgif mode == 'w' {gpp = &pd.wg}for {//关心的io事件就绪,直接返回if gpp.CompareAndSwap(pdReady, pdNil) {return true}//关心的io事件未就绪,则置为等待状态,G将要被阻塞if gpp.CompareAndSwap(pdNil, pdWait) {break}//...}//...//将G置为阻塞态gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)//当前g从阻塞态被唤醒,重置标识器old := gpp.Swap(pdNil)if old > pdWait {throw("runtime: corrupted polldesc")}//判断是否是因为所关心的事件触发而唤醒return old == pdReady
}

在gopark方法中,会闭包调用netpollblockcommit方法,其中会根据g关心的事件类型,将其实例存储到pollDesc的rg或wg容器中。

// 将 gpp 状态标识器的值由 pdWait 修改为当前 g 
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))if r {//增加等待轮询器的例程计数。//调度器使用它来决定是否阻塞//如果没有其他事情可做,则等待轮询器。netpollAdjustWaiters(1)}return r
}

接着我们来关注何时会触发poll_wait流程。

首先是在listener.Accept流程中,如果当前尚未有连接到达,则执行poll wait将当前g阻塞挂载在该socket fd对应pollDesc的rg中。

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {//...for {//以非阻塞模式发起一次accept,尝试接收conns, rsa, errcall, err := accept(fd.Sysfd)if err == nil {return s, rsa, "", err}switch err {//忽略中断类错误case syscall.EINTR:continue//尚未有到达的conncase syscall.EAGAIN://进入poll_wait流程,监听fd的读就绪事件,当有conn到达表现为fd可读。if fd.pd.pollable() {//假如读操作未就绪,当前g会被阻塞在方法内部,直到因为超时或者就绪被netpoll ready唤醒。if err = fd.pd.waitRead(fd.isFile); err == nil {continue}}//...}
}
// 指定 mode 为 r 标识等待的是读就绪事件,然后走入更底层的 poll_wait 流程
func (pd *pollDesc) waitRead(isFile bool) error {return pd.wait('r', isFile)
}

其次分别是在conn.Read/conn.Write流程中,假若conn fd下读操作未就绪(无数据到达)/写操作未就绪(缓冲区空间不足),则会执行poll wait将g阻塞并挂载在对应的pollDesc中的rg/wg中。

func (fd *FD) Read(p []byte) (int, error) {//...for {//非阻塞模式进行一次read调用n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)if err != nil {n = 0//进入poll_wait流程,并标识关心读就绪事件if err == syscall.EAGAIN && fd.pd.pollable() {if err = fd.pd.waitRead(fd.isFile); err == nil {continue}}}err = fd.eofError(n, err)return n, err}
}
func (fd *FD)Write(p []byte)(int,error){// ... for{// ...// 以非阻塞模式执行一次syscall write操作n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])if n >0{nn += n}// 缓冲区内容都已写完,直接退出if nn ==len(p){return nn, err}// 走入 poll_wait 流程,并标识关心的是该 fd 的写就绪事件if err == syscall.EAGAIN && fd.pd.pollable(){// 倘若写操作未就绪,当前g 会 park 阻塞在该方法内部,直到因超时或者事件就绪而被 netpoll ready 唤醒if err = fd.pd.waitWrite(fd.isFile); err ==nil{continue}}// ...  }

3.6、net_poll

netpoll流程至关重要,它会在底层调用系统的epoll_wait操作,找到触发事件的fd,然后再逆向找到绑定fd的pollDesc实例,返回内部阻塞的g叫给上游处理唤醒。其调用栈如下:

方法 文件
runtime.netpoll runtime/netpoll_epoll.go
runtime.netpollready runtime/netpoll.go
runtime.netpollunblock runtime/netpoll.go

netpoll具体的源码如下:

//netpoll用于轮询检查是否有就绪的io事件
//若发现了就绪的io事件,检查是否有pollDesc中的g关心其事件
//若找到了关心其io事件就绪的g,添加到list返回给上游处理
func netpoll(delay int64) (gList, int32) {if epfd == -1 {return gList{}, 0}var waitms int32//根据传入的delay参数,决定调用epoll_wait的模式://delay < 0:设为阻塞模式(在 gmp 调度流程中,如果某个 p 迟迟获取不到可执行的 g 时,会通过该模式,使得 thread 陷入阻塞态,但该情况全局最多仅有一例)//delay = 0:设为非阻塞模式(通常情况下为此模式,包括 gmp 常规调度流程、gc 以及全局监控线程 sysmon 都是以此模式触发的 netpoll 流程)//delay > 0:设为超时模式(在 gmp 调度流程中,如果某个 p 迟迟获取不到可执行的 g 时,并且通过 timer 启动了定时任务时,会令 thread 以超时模式执行 epoll_wait 操作)if delay < 0 {waitms = -1} else if delay == 0 {waitms = 0} else if delay < 1e6 {waitms = 1} else if delay < 1e15 {waitms = int32(delay / 1e6)} else {waitms = 1e9}//最多接收128个io就绪事件var events [128]syscall.EpollEvent
retry://以指定模式调用epoll_waitn, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)//...//存储关心io事件就绪的G实例var toRun gListdelta := int32(0)//遍历返回的就绪事件for i := int32(0); i < n; i++ {ev := events[i]if ev.Events == 0 {continue}//pipe接收端的信号处理,检查是否需要退出netpollif *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {if ev.Events != syscall.EPOLLIN {println("runtime: netpoll: break fd ready for", ev.Events)throw("runtime: netpoll: break fd ready for something unexpected")}//...continue}var mode int32//记录io就绪事件的类型if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {mode += 'r'}if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {mode += 'w'}// 根据 epollevent.data 获取到监听了该事件的 pollDesc 实例if mode != 0 {tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))pd := (*pollDesc)(tp.pointer())//...//检查是否为G所关心的事件delta += netpollready(&toRun, pd, mode)}}return toRun, delta
}
func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {delta := int32(0)var rg, wg *gif mode == 'r' || mode == 'r'+'w' {//就绪事件包含读就绪,尝试唤醒pd内部的rgrg = netpollunblock(pd, 'r', true, &delta)}if mode == 'w' || mode == 'r'+'w' {//就绪事件包含读就绪,尝试唤醒pd内部的wgwg = netpollunblock(pd, 'w', true, &delta)}//存在G实例,则加入list中if rg != nil {toRun.push(rg)}if wg != nil {toRun.push(wg)}return delta
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {//获取存储的g实例gpp := &pd.rgif mode == 'w' {gpp = &pd.wg}for {old := gpp.Load()//...new := pdNilif ioready {new = pdReady}//将gpp的值从g置换成pdReadyif gpp.CompareAndSwap(old, new) {if old == pdWait {old = pdNil} else if old != pdNil {*delta -= 1}//返回需要唤醒的g实例return (*g)(unsafe.Pointer(old))}}
}

那么,我们也同样需要关注在哪个环节进入了net_poll流程。

首先,是在GMP调度器中的findRunnable方法中被调用,用于找到可执行的G实例。具体的实现在之前的GMP调度文章中有讲解,这里只关心涉及到net_poll方面的源码。

findRunnable方法定位在runtime/proc.go

func findRunnable()(gp *g, inheritTime, tryWakeP bool){// ../*同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程:- epoll事件表初始化过- 有 g 在等待io 就绪事件- 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程*/if netpollinited()&& atomic.Load(&netpollWaiters)>0&& atomic.Load64(&sched.lastpoll)!=0{// 以非阻塞模式发起一轮 netpoll,如果有 g 需要唤醒,一一唤醒之,并返回首个 g 给上层进行调度if list := netpoll(0);!list.empty(){// non-blocking// 获取就绪 g 队列中的首个 ggp := list.pop()// 将就绪 g 队列中其余 g 一一置为就绪态,并添加到全局队列injectglist(&list)// 把首个g 也置为就绪态casgstatus(gp,_Gwaiting,_Grunnable)// ...   //返回 g 给当前 p进行调度return gp,false,false}}// .../*同时满足下述三个条件,发起一次【阻塞或超时模式】的 netpoll 流程:- epoll事件表初始化过- 有 g 在等待io 就绪事件- 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程*/if netpollinited()&&(atomic.Load(&netpollWaiters)>0|| pollUntil !=0)&& atomic.Xchg64(&sched.lastpoll,0)!=0{// 默认为阻塞模式  delay :=int64(-1)// 存在定时时间,则设为超时模式if pollUntil !=0{delay = pollUntil - now// ...   }// 以【阻塞或超时模式】发起一轮 netpolllist := netpoll(delay)// block until new work is available }// ...    
}

其次,是位于同文件下的sysmon方法中,它会被一个全局监控者G执行,每隔10ms发一次非阻塞的net_poll流程。

// The main goroutine.
func main(){
// ...
// 新建一个 m,直接运行 sysmon 函数systemstack(func(){newm(sysmon,nil,-1)})// ...
}// 全局唯一监控线程的执行函数
func sysmon(){
// ...
for{
// ...
/*同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程:- epoll事件表初始化过- 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程- 距离上一次发起 netpoll 流程的时间间隔已超过 10 ms*/lastpoll :=int64(atomic.Load64(&sched.lastpoll))if netpollinited()&& lastpoll !=0&& lastpoll+10*1000*1000< now {// 以非阻塞模式发起 netpolllist := netpoll(0)// non-blocking - returns list of goroutines// 获取到的  g 置为就绪态并添加到全局队列中if!list.empty(){// ...injectglist(&list)// ...}}// ...  }
}

最后,还会发生在GC流程中。

func pollWork() bool{// ...// 若全局队列或 p 的本地队列非空,则提前返回/*同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程:- epoll事件表初始化过- 有 g 在等待io 就绪事件- 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程*/if netpollinited()&& atomic.Load(&netpollWaiters)>0&& sched.lastpoll !=0{// 所有取得 g 更新为就绪态并添加到全局队列if list := netpoll(0);!list.empty(){injectglist(&list)return true}}// ...
}

4、参考博文

感谢观看,本篇博文参考了小徐先生的文章,非常推荐大家去观看并且进入到源码中学习,链接如下:

万字解析 golang netpoll 底层原理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/841929.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

骁龙 8 Elite 至尊版 对比 电脑端cpu

台式/笔记本 参考https://socpk.com/cpu/ 骁龙 8 Elite是260 M4 (4+6) 是360而M4对比笔记本cpu参考

线性版本HierHolzer正确性说明

晚上在研究怎么求欧拉图回路,看到 \(O(n+m)\) 版本的 HierHolzer 算法实现,让我很迷惑。 void dfs(int x){for(int i = 1;i <= 500; ++i){if(g[x][i]){--g[x][i]; --g[i][x];dfs(i);}}ans[++cnt] = x; }OI-Wiki 上对于这段代码的描述是这样的:将找回路的 DFS 和 Hierholz…

plus_one

2024/11/26 --2024/11/28 验证哥德巴赫猜想 打印漏斗 1.统计字符 1. 不需要数组,在循环中统计各个种类的字符 2. 一定把各个种类初始化为0 3. 一个一个字符输入,不是字符串一次输入哦 4. 空格是 回车是 \n 5. 大小写字母的ASCII码不连续,所以是(s >= a&& s<…

Python基础语法 11月22日到11月26日学习过程

Python的环境配置 python安装安装地址官网网址:https://www.python.org 华为云镜像站地址:https://mirrors.huaweicloud.com/homepython根目录介绍根目录截图python的根目录【安装目录】:D:\soft\Python37Scriptspip # 从python官网上下载第三方的库 pip3.7 pip3Lib # py…

快速搭建和访问 FTP 服务器

随着以 minio 为代表的分布式系统的广泛应用,使用 FTP 的场景就越来越少了,目前仍然在一些简单的应用场景中使用。 本篇博客使用 fauria/vsftpd 的 docker 镜像,介绍 FTP 服务器搭建的两种方式:匿名访问方式 和 使用账号密码访问方式。然后使用 SpringBoot 程序通过代码访问…

Beta阶段——第十周Scrum Meeting记录

1.目前进度: (1)实现沙盒模式,基础逻辑门组件的搭建功能; (2)组件的增加,移动,旋转,删除; (3)逻辑电路的布线及删除; (4)高低电平测试;2.目前团队中存在的问题: (1)前期未能很好的使用Github仓库,导致工作进度难以同步; (2)大多数成员对Unity和C#编程语…

从软件工程的角度,谈模块为什么总是不兼容

前言 今天刚刷上Apatch,发现其没有提供Zygisk,又去酷安搜了一搜,似乎有人反应刷Lsposed不起作用,大致了解了一下,并查了些资料。下面我开始猜测以及进行理论。 说是从软件工程出发,但是实际上我并不算一个好学生,更无法代表软件工程,这或许很标题党,但是我确实想以这个…

uml用例图-2024/11/26

超市进销存管理系统

MySQL报错:sql_mode=only_full_group_by解决方法

MySQL报错:sql_mode=only_full_group_by解决方法 登录mysql之后,执行命令查看当前的sql_mode配置 select @@global.sql_mode;​​ 可以发现MySQL的sql_mode是开启了ONLY_FULL_GROUP_NY。 解决方法 把 sql_mode 中的 ONLY_FULL_GROUP_NY​去掉,其他不变即可。 找到MySQL的配置…

使用Lombok导致打印的tostring中缺少父类的属性

背景 实体类UserDto extends BaseEntity,两个类的上方都有标注,Lombok的@Data注解,但是使用时UserDto的实例对象调用toString方法时发现,只打印出来自身子类的属性信息,并没有打印出来父类的信息。@Data public class UserDto extends BaseEntity { /*** 姓名*/@TableFiel…

20222322 2024-2025-1 《网络与系统攻防技术》实验五实验报告

1.实验内容 1.1实验要求 (1)从www.besti.edu.cn、baidu.com、sina.com.cn中选择一个DNS域名进行查询,获取相关信息。 (2)尝试获取BBS、论坛、QQ、MSN中某一好友的IP地址,并查询获取该好友所在的具体地理位置。 (3)使用nmap开源软件对靶机环境进行扫描,回答以下问题并给…

华为鸿蒙智家品牌升级背后:开拓者,引领者,赋能者

今天,华为重磅推出全新品牌“华为鸿蒙智家”亮相华为Mate品牌盛典。 华为作为产业的开拓者,一直引领产业进化,带动产业从懵懂到成熟。这一次品牌升级将借势鸿蒙,为空间智能产业打开更大的想象空间。持续进化,带来更高阶的智感 作为一个热门赛道,科技巨头和家电企业均积极…