netty核心流程(一):服务端如何建立连接

news/2024/9/22 13:31:26/文章来源:https://www.cnblogs.com/aries-laosi/p/18333482

为了接收连接请求, Netty 服务端应该做些什么事情?

根据Java NIO 的知识,服务端在准备接收客户端连接之前做了下面几个工作,我们可以带着问题往下看。

  • 服务端对连接请求是如何初始化的?
  • 如何把用户定义的处理逻辑 childHandler 加入到 Netty 的处理流程里?
  • 如何在 Socket 上绑定一个端口?
  • 如何把请求连接的 OP_ACCEPT 事件(客户端请求连接的事件)注册到 Selector 上的?
  • Netty 线程是如何轮询 ServerSocketChannel 的网络连接事件的?
  • 对于客户端发来的连接请求,服务端是如何处理的?
  • 连接成功建立后,如何读客户端发来的数据?

我们会根据 Netty 底层的源码来解析上面这几个部分是如何实现的,源码讲解时我们只讲解主线代码,只要能够理解 Netty 大体是如何实现上述功能就可以了,否则会影响理解。

服务端对连接请求是如何初始化的?

我们根据上节课 NettyServer 的代码讲起,首先我们先分析一下端口绑定的那行代码:

我们对 bind() 方法进行代码追踪,找到方法 doBind():

 

initAndRegister() 功能是初始化 ServerSocketChannel,然后把对应的网络事件注册到 Selector 的方法,我们具体看看:

// 创建和初始化一个 ServerSocketChannel,并注册到 Selector 轮询复用组件上去。
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 通过工厂类获取 ServerSocketChannel
        channel = channelFactory.newChannel();
        // 初始化 ServerSocketChannel
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    // 拿出来之前创建的 EventLoopGroup, 然后把 Channel 注册到 EventLoopGroup上,目的是轮询各种 channel 上的网络事件,
    // 我们猜测是不是 让 EvetnLoopGroup 中的独立线程利用一个 Selector 来注册 Channel,并轮询网络事件。
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

可以看到,通过 channelFactory.newChannel() 获得一个 Channel。这个 channelFactory 的实现类很多,这里用的是类 ReflectiveChannelFactory 来实现的,顾名思义是通过反射获得的 Channel。我们再验证下 ReflectiveChannelFactory 是如何给我们实例化 Channel 的。

 

可以看到果然是通过反射后得到的 ServerSocketChannel 的实例。

那么,工厂类是如何知道反射哪个 Channel 呢?这就要看我们上一章写的服务端的代码了。

 

可以看到,服务端用来接收请求的类是 NioServerSocketChannel。那么,这个类究竟有没有封装 ServerSocketChannel 呢?我们看 NioServerSocketChannel 的构造方法就可以了:

在这里,我们就真的找到了创建 ServerSocketChannel 实例的代码。也就是说,NioServerSocketChannel 是对 ServerSocketChannel 的封装。到这里,ServerSocketChannel 的实例化就成功了,接着我们看看对 ServerSocketChannel 做了哪些初始化的工作?

init(channel) 方法是对 ServerSocketChannel 的初始化。因为是服务端代码,对应的实现类就是 ServerBootstrap,再看它对 init(channel) 的具体实现:

 
java
void init(Channel channel) {
    // 对ServerSocketChannel 进行相关网络参数的指定。
    setChannelOptions(channel, newOptionsArray(), logger);
    // 初始化相关的一些属性
    setAttributes(channel, newAttributesArray());

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

    // 第一个拦截器:对网络请求处理链路中加入一个自己内置的一个处理逻辑。初始化了网络请求的处理链路
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                // 第二个拦截器:
                pipeline.addLast(handler);
            }
            // 循环执行下面的任务
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 第三个拦截器:加入设定的拦截器
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

可以看到,初始化了一些相关的网络参数,比如上节课 Netty Demo 中 Server 端代码的 SO_BACKLOG 属性。然后初始化一些相关属性。

最后一段重要的代码是在 Pipeline(拦截器链) 上加入第一个内置的拦截器;第二个拦截器是内部配置的拦截器;然后,通过 eventLoop() 循环处理连接请求,同时还实例化了第三个拦截器 ServerBootstrapAcceptor,然后把第三个拦截器放入拦截器链中,这个过程是启用了一个新的线程来完成的。ServerBootstrapAcceptor 的构造方法的参数包括用户设置的 channel(也就是 NioServerSocketChannel),以及用户设置的 childgroup 线程组,以及处理客户端连接的逻辑 childHandler,最后是一些配置和属性的参数。

初始化就结束了。

如何把用户定义的处理连接请求的逻辑加入到 Netty 里?

然后,进入了 ServerBootstrapAcceptor:

这里会看到第四个拦截器。其中,这第四个拦截器才是真正的用户设置的拦截器。随后还会有个监听器,用于拦截器链中所有的拦截器都执行完了以后,再做一些收尾工作。

这段逻辑包涵了 4 个拦截器的创建,比较复杂:

现在,服务端连接的初始化已经完成了,但是还没有在 Selector 上注册相应的事件,接下来我们看看这个功能是如何实现的。

如何把请求连接的 OP_ACCEPT 事件(客户端请求连接的事件)注册到 Selector 上的?

我们看一下相应的源码:

 

注册网络接受连接请求的事件就在调用 register() 方法里了。

由于我们用的是线程组,这里的实现类是 MultithreadEventLoopGroup,其实这里面就是一组 EventLoop。EventLoop 本质上一个线程,每个 EventLoop 会对应一个 Selector。所以,EventLoop 是一个循序处理网络事件的线程。这里通过调用 next() 返回一个线程组里面的一个线程。

接着,我们追踪到 AbstractNioChannel 类中的 doRegister()方法中:

在这里,我们就能看到已经把 ServerSocketChannel 以及它对应的 OP_ACCEPT(OP_ACCEPT=0) 事件注册到 Selector 上了。

至此,我们已经把连接事件注册到了对应线程的 Selector 上。

给 ServerSocketChannel 绑定端口

我们回到上一章的 Demo,从 bind() 方法开始看:

这里就已经把端口绑定到 socket 上了,就不继续深究内部的代码了。

至此,服务端创建连接的准备工作都分析完了。

Netty 线程是如何轮询 ServerSocketChannel 的网络连接事件的?

首先,我们看看 NioEventLoopGroup 的父类 MultithreadEventLoopGroup 的构造方法:

static {
    // 默认的线程数为 CPU 核数的两倍
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    // 设定线程数
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

MultithreadEventLoopGroup 是一个多线程集合,如果你不设置线程数的话。默认线程数是 CPU 的核数*2。

 

当要增加一个线程时,调用的是 newChild() 方法,我们再看看里面做了什么:

 

我们可以看到,线程组里面的元素是 NioEventLoop 类。

很明显,这是一个单线程的线程池。

然后,我们代码跟踪,可以看到往 Selector 里注册网络事件。

private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
    try {
        // 注册网络事件
        ch.register(unwrappedSelector, interestOps, task);
    } catch (Exception e) {
        throw new EventLoopException("failed to register a channel", e);
    }
}

这里注册的是 OP_ACCEPT 网络事件。

那么,这个线程是在哪里轮询网络事件的呢?这就要找到线程的 run() 方法了

protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:

                // 通过轮询的方式不断尝试监听新的网络事件
                case SelectStrategy.SELECT:
                    // 设定每次轮询的间隔时间
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE; // nothing on the calendar
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        if (!hasTasks()) {
                            // 真正的尝试轮询新的网络事件
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                    // fall through
                default:
                }

通过for(;;)死循环来不断尝试发现新的网络事件,如果有就返回。但有个轮询间隔时间来控制执行频率:

private int select(long deadlineNanos) throws IOException {
    if (deadlineNanos == NONE) {
        // 轮询网络事件
        return selector.select();
    }
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

这里是真正地使用 Java NIO 的 select() 来轮询网络事件。

至此,如果有客户端发起连接的请求,我们的服务端就可以创建连接了。

对于客户端发来的连接请求,服务端是如何处理的?

服务端收到客户端的连接请求后,会做些什么呢?我们还是回到 NioEventLoop 类中的 run() 方法,调用 select() 方法后,会调用下面的方法:

 

这个方法就是处理网络事件集合的:

 

轮询网络事件集合中所有的事件,并对事件进行处理,直到事件集合里的事件都轮询完了。好,我们看下对单一的事件是怎么处理的:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            return;
        }
        if (eventLoop == this) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    try {
        int readyOps = k.readyOps();

        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
           unsafe.forceFlush();
        }

        // 对连接的处理
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

这个方法用来处理所有的网络事件,我们只要关心最后的对连接的处理就可以了,用到的方法是 unsafe.read(),方法的实现类是 AbstractNioMessageChannel。

方法 doReadMessages() 的目的主要是把新建立的连接放入 List 集合里,这个方法如下:

protected int doReadMessages(List<Object> buf) throws Exception {
    // 跟客户端建立连接,获取到对应的 Channel
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            // 把创建好的连接放入一个 List 里面。
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

可以看到调用了底层的工具的方法:SocketUtils.accept(javaChannel())。目的就是建立连接。

 

可以看到调用了底层 NIO 的 serverSocketChannel 的 accept()方法,实现了连接的建立。

至此,连接就建立成功了。

但是,建立完连接就可以不用管了吗?不是的,因为连接建立成功后,我们要把新建立的 SocketChannel 的读事件注册到对应线程的 Selector 上,这样才能收到客户端的数据,这个工作是怎么做到的呢?

上一章讲第三个拦截器的时候,涉及到了类 ServerBootstrapAcceptor,其实它的主要任务是把连接的网络读事件注册到 Selector 上:

 

可以看到,childGroup 是我们在 Demo 中设置的子线程组,专门负责读写事件的注册的。childGroup 首先会拿出一个线程给这个 channel 使用,然后通过 register 方法来把 channel 的读事件注册到这个线程对应的 Selector 上。

服务端创建连接的流程

下面用一个流程图来总结下,Netty 服务端连接的建立流程:

 

总结

 

首先,本章讲解了连接的初始化,包括 ServerSocketChannel 的实例化和初始化。然后,讲解了用户自定义的 Handler 是如何加到 Netty 里的,同时介绍了四个拦截器。

接着,又讲解了 ServerSocketChannel 是如何关注 OP_ACCEPT 事件的,以及对应的 Parent 线程是如何轮询 OP_ACCEPT 网络事件的。

最后,讲解了连接是如何创建的,以及创建成功后是如何用 Child 线程关注 OP_READ 网络事件以及轮询网络读事件的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/774313.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Logisim-008-16位无符号比较器

仓库地址 https://gitee.com/gitliang/logisim-to-cpu

springboot项目使用自定义starter

首先是自定义的starter部分 pom文件<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http:…

SQL执行顺序和逻辑

SQL执行顺序和逻辑 MySQL的执行顺序:(9) SELECT (6) SUM(聚合函数) (10) DISTINCT <select_list> (1) FROM <left_table> (3) <join_type> JOIN <right_table> (2) ON <join_condition> (4) WHERE <where_condition> …

后缀数组 - half

后缀数组 后缀数组可以解决有关后缀的问题废话。那么暴力做法肯定是把每个后缀全部取出来,然后按照字典序排序,但是这样复杂度是 \(\Theta(n^2\log n)\) 的。 后缀数组可以解决以下问题:最长重复子串 多个串的最长公共子串 不同子串个数算法详解 面对这些问题,我们需要 \(3…

Misc专项

一:文件操作与隐写 1、文件类型的识别 1、文件头完好情况: (1)file命令 使用file命令识别:识别出file.doc为jpg类型(2)winhex 通过winhex工具查看文件头类型,根据文件头部内容去判断文件的类型eg:JPG类型(3)notepad++ 下载HEXeditor插件,查看文件的头部信息,和010e…

zookeeper未授权访问(CVE-2014-085)

漏洞描述 默认安装配置完的zookeeper允许未授权访问,管理员未配置访问控制列表(ACL)。导致攻击者可以在默认开放的2181端口下通过执行envi命令获得大量敏感信息(系统名称、java环境)导致任意用户可以在网络不受限的情况下进行未授权访问读取数据 漏洞影响 Apache ZooKeepe…

如何自动实现本地AD中禁用的用户从地址列表中隐藏掉?

我的博客园:https://www.cnblogs.com/CQman/ 如何自动实现本地AD中禁用的用户从地址列表中隐藏掉?需求信息:用户本地AD用户通过ADConnect同步到O365,用户想实现在本地已做同步的OU中禁用某一用户后,其可以自动实现把该用户从地址列表中隐藏掉。用户的ADConnect同步工具装在…

go高并发之路——消息中间件kafka(上)

一般高并发的业务都是某个时间段的请求量特别大,比如本人负责的直播业务,基本上一天就两个高峰段:早上和晚上的特定时间段。其它的时间里,流量基本都比较平稳。那么面对流量高峰,我们可以采取哪些措施呢?常见的有服务器和DB提前扩容、监控告警(盯监控)、流量削峰、加缓…

数组及数组JVM内存划分day4

java中第一个存储数据的容器:数组特点:1、数组的长度大小是固定的2、同一个数组中,存储的元素数据类型是一样的数组的定义语句格式:数据类型[] 数组名;举例:int[] arr; // 定义了一个可以存储int类型的一维数组,数组名叫做arr数组没有初始化,无法直接使用,数组是我们第…

借助流程表单设计器开源为流程化办公赋能

借助流程表单设计器做好数据管理工作,为行业发展贡献整套低代码技术平台解决方案。在经济高速发展的今天,想要实现流程化办公和数字化转型,需要利用更优质的平台为其加油助力。低代码技术平台、流程表单设计器开源为各行各业流程化办公高效赋能,一起摆脱信息孤岛、部门沟通…

AI/机器学习(计算机视觉/NLP)方向面试复习1

1. 判断满二叉树 所有节点的度要么为0,要么为2,且所有的叶子节点都在最后一层。 #include <iostream> using namespace std; class TreeNode { public:int val;TreeNode* left;TreeNode* right; //创建的时候输入参数x,会把x给val,nullptr给left和right TreeNode(int…

【PlantSaver】电容式土壤湿度传感器使用及原理(并以Arduino实验)

1.湿度检测原理 关于这个传感器检测的原理,网上找的资料不多。类似传感器经典的设计是美国DECAGON 公司生产的ECH2O 系列传感器。其结构如下:式中: ε0 = 8.85410-12 为真空介电常数,单位 F/m; S 为板间遮盖面积,单位 m2 ; C 为板间电容量,单位F; δ 为板件厚度,m; ε …