【Netty 源码】NioEventLoop 源码分析 篇二

【Netty 源码】NioEventLoop 源码分析 篇二

1.NioEventLoop 继承关系

NioEventLoop继承自SingleThreadEventLoop,一次只能执行一个线程任务,因此在父类SingleThreadEventLoop中维护了 Queue tailTasks 线程队列。

NioEventLoop又间接继承了 AbstractScheduledEventExecutor ,因此也具有提交定时任务的能力

image-20240310191243677

2. Selector 的创建时机

1.构造方法执行时赋值Selector

NioEventLoop 有且仅有一个有参构造方法,在构造方法执行时,对成员对象Selector 进行赋值

io.netty.channel.nio.NioEventLoop#NioEventLoop

image-20240316200151955

2.两个Selector类型的成员变量

每个NioEventLoop 都维护了两个Selector

image-20240316194153562

在其有参构造方法执行时,调用 openSelector() 方法,这里截图部分代码片段

final Selector unwrappedSelector;try {//调用java原生的api创建SelectorunwrappedSelector = provider.openSelector();} catch (IOException e) {throw new ChannelException("failed to open a new selector", e);}//如果没有开启对key Set集合的优化,默认返回原生的Selector,原生Selector遍历key时,使用的是set集合,效率低//默认 false,也就是开启对key set的优化if (DISABLE_KEY_SET_OPTIMIZATION) {return new SelectorTuple(unwrappedSelector);}.......

默认 DISABLE_KEY_SET_OPTIMIZATION 等于false

    private static final boolean DISABLE_KEY_SET_OPTIMIZATION =SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

因此默认情况下,代码会继续往下执行

									......Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.// This allows us to also do this in Java9+ without any extra flags.long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);long publicSelectedKeysFieldOffset =PlatformDependent.objectFieldOffset(publicSelectedKeysField);if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);return null;}// We could not retrieve the offset, lets try reflection as last-resort.}//开启暴力反射Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);if (cause != null) {return cause;}cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);if (cause != null) {return cause;}//将原生的setKeys集合替换selectedKeysField.set(unwrappedSelector, selectedKeySet);publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);.......

在这段代码中,我们可以看到Netty使用反射,将原生的Selector的两个成员变量 selectedKeys,publicSelectedKeys 进行替换,而替换后的对象SelectedSelectionKeySet,使用的是数组去存储keys

        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

image-20240316195357937

而原生的keys对象则是set集合

image-20240316195726333

所以Netty对原生的Selector进行优化 第一个Selector对象 selector 遍历keys的效率更高,而第二Selector对象 unwrappedSelector 则是对第一个selector功能的完善,很多能力还是需要原生的Selectoral去实现

3. NioEventLoop 什么时候启动NIO线程

这里我们通过idea断点追踪调用栈类分析

public static void main(String[] args) {EventLoop next = new NioEventLoopGroup().next();next.execute(new Runnable() {@Overridepublic void run() {System.out.println("任务执行");}});
}

image-20240316201853365

通过调用栈,发现main线程最终调用到 io.netty.util.concurrent.SingleThreadEventExecutor#addTask ,然后由nioEventLoopGroup-2-1线程调用run方法执行

NioEventLoop执行线程任务时,会调用父类SingleThreadEventExecutor.execute 方法,然后再调用 addTask 方法,将任务添加到 taskQueue 队列中。

public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();//将任务添加到队列中 只是添加  取出使用在后面addTask(task);//此时是main方法调用 false false 就是tureif (!inEventLoop) {// 开始执行线程任务startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}
}
private void startThread() {// 更改线程状态if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {//继续往下跟doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}
}

io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread

往 线程池 中提交任务,调用 io.netty.channel.nio.NioEventLoop#run 方法

 private void doStartThread() {assert thread == null;//往线程池中提交任务executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {//调用NioEventLoop的run方法 启动nio线程,监听事件并从队列中弹出任务并执行SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = state;if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {if (logger.isErrorEnabled()) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +"be called before run() implementation terminates.");}}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {//关闭selectorcleanup();} finally {// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify// the future. The user may block on the future and once it unblocks the JVM may terminate// and start unloading classes.// See https://github.com/netty/netty/issues/6596.FastThreadLocal.removeAll();STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.countDown();if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}

·死循环监听、处理事件

protected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:select(wakenUp.getAndSet(false));// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and//    'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and//    'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {//处理keysprocessSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}
}

因此我们从这可以知道,在首次调用EventLoop的executor方法,将会启动nio线程,重复调用并不会重复启动nio线程,因为有状态位进行控制

4. NIO线程会阻塞普通任务吗?

NioEventLoop 首先它是单线程 ,不仅仅会启动 Nio线程,有时还需要执行普通任务,那么nio会影响普通任务的执行吗?

Selector.select具有阻塞性,首先我们看netty是怎么处理的

io.netty.channel.nio.NioEventLoop#select

/*** 核心思想:没有task要做时,select阻塞1s,如果有task,wakeup去做。* @param oldWakenUp* @throws IOException*/
private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();//按scheduled的task时间来计算select timeout时间。long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();if (nextWakeupTime != normalizedDeadlineNanos) {nextWakeupTime = normalizedDeadlineNanos;}for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) { //已经有定时task需要执行了,或者超过最长等待时间了if (selectCnt == 0) {//非阻塞,没有数据返回0selector.selectNow();selectCnt = 1;}break;}// If a task was submitted when wakenUp value was true, the task didn't get a chance to call// Selector#wakeup. So we need to check task queue again before executing select operation.// If we don't, the task might be pended until select operation was timed out.// It might be pended until idle timeout if IdleStateHandler existed in pipeline.if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}//设置超时时间,不会一直阻塞 其次//下面select阻塞中,别人唤醒也可以可以的int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - Selected something,// - waken up by user, or// - the task queue has a pending task.// - a scheduled task is ready for processingbreak;}if (Thread.interrupted()) {// Thread was interrupted so reset selected keys and break so we not run into a busy loop.// As this is most likely a bug in the handler of the user or it's client library we will// also log it.//// See https://github.com/netty/netty/issues/2426if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The code exists in an extra method to ensure the method is not too big to inline as this// branch is not very likely to get hit very frequently.selector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}// Harmless exception - log anyway}
}

可以看到 selector.select(timeoutMillis) 设置了超时时间。其次,nio线程只有首次NioEventloop调用executor方法才会启动,后续再次调用不会二次启动,并且会唤醒Selector.select

io.netty.util.concurrent.SingleThreadEventExecutor#execute

public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();//将任务添加到队列中 只是添加  取出使用在后面addTask(task);//此时是main方法调用 false false 就是tureif (!inEventLoop) {// 开始执行线程任务//二次调用不会执行startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}//唤醒Selectorif (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}

注意,此时我们是在NioEventloop的父类SingleThreadEventExecutor中跟踪源码,子类NioEventloop 对父类的 wakeup 方法进行了重写

io.netty.channel.nio.NioEventLoop#wakeup

@Override
protected void wakeup(boolean inEventLoop) {if (!inEventLoop && wakenUp.compareAndSet(false, true)) {//唤醒 selector 以便执行普通任务selector.wakeup();}
}

5.什么时候去获取Selector上的事件

回到run方法,处理逻辑都在这个switch代码块中

io.netty.channel.nio.NioEventLoop#run

image-20240323162005881

selectNowSupplier 是 IntSupplier 接口的实现类,重写了get方法,重写后的get方法会调用Selector.selectNow()方法,立即获取当前Selector上的IO事件
hasTasks() 判断当前Queue中是否有任务

在来看这个策略方法 :

​ 如果有任务,获取当前Selector上的IO事件,并立刻返回 ;如果没有任务,返回 SelectStrategy.SELECT,进入匹配case,执行select(wakenUp.getAndSet(false)) 方法 超时阻塞等待获取Selector上的io事件

 public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

在当前Queue中有任务的情况下,即会处理keys,也会执行所有的任务

try {//处理keysprocessSelectedKeys();
} finally {// Ensure we always run tasks.runAllTasks();
}

6.Netty怎么处理NIO空轮询BUG

1.什么是NIO空轮询BUG

即使无客户端连接,NIO照样不断的从select本应该阻塞的Selector.select()中wake up出来,导致CPU100%问题

在这里插入图片描述

2.Netty处理方式

io.netty.channel.nio.NioEventLoop#select

for (;;) {int selectedKeys = selector.select(timeoutMillis);selectCnt ++;long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The code exists in an extra method to ensure the method is not too big to inline as this// branch is not very likely to get hit very frequently.selector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}}

在这截取了部分代码,可以看到Netty定义了一个 selectCnt ,当 selectCnt > SELECTOR_AUTO_REBUILD_THRESHOLD 时,就会调用selectRebuildSelector(selectCnt)创建一个新的selector。

SELECTOR_AUTO_REBUILD_THRESHOLD的默认值为512.用户可以设置 io.netty.selectorAutoRebuildThreshold 的值来进行控制

int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);

7.Netty怎么控制NIO事件处理与普通任务执行的任务时间

for循环执行在一个单线程中,又要执行 processSelectedKeys 处理Selector上的io事件,又要执行 runAllTasks 执行普通任务。

因此Netty需要协调两个任务的执行时间。

io.netty.channel.nio.NioEventLoop#ioRatio int类型的变量,默认值为50

private volatile int ioRatio = 50;

io.netty.channel.nio.NioEventLoop#run

  for (;;) {//ioRatio默认等于50 ,走elseif (ioRatio == 100) {try {processSelectedKeys();} finally {runAllTasks();}} else {//记录 Selector处理io事件的开始时间final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {//当前时间 - io事件开始时间 = io耗时时间final long ioTime = System.nanoTime() - ioStartTime// io耗时时间 * ioRatio比例 = 普通任务的执行时间runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/577406.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

六千字详解!一篇看懂 ArrayList 的扩容机制(完整源码解析)

☀️今天花了很久写了这篇关于 ArrayList 扩容机制源码解析的博客&#xff0c;在阅读源码的过程中发现了很多之前有误解的地方&#xff0c;也加深了对代码的理解&#xff0c;所以写下了这篇博客。 &#x1f3b6;本文附带了流程中所有的代码和附加解析&#xff0c;我有信心一定能…

五款常用在线JavaScript加密混淆工具详解:jscrambler、JShaman、jsfack、ipaguard和jjencode

摘要 本篇技术博客将介绍五款常用且好用的在线JavaScript加密混淆工具&#xff0c;包括 jscrambler、JShaman、jsfack、freejsobfuscator 和 jjencode。通过对这些工具的功能及使用方法进行详细解析&#xff0c;帮助开发人员更好地保护和加密其 JavaScript 代码&#xff0c;提…

STM32之HAL开发——串口配置(CubeMX)

串口引脚初始化&#xff08;CubeMX&#xff09; 选择RCC时钟来源 选择时钟频率&#xff0c;配置为最高频率72MHZ 将单片机调试模式打开 SW模式 选择窗口一配置为异步通信模式 点击IO口设置页面&#xff0c;可以看到当前使用的串口一的引脚。如果想使用复用功能&#xff0c;只需…

原型链-(前端面试 2024 版)

来讲一讲原型链 原型链只存在于函数之中 四个规则 1、引用类型&#xff0c;都具有对象特性&#xff0c;即可自由扩展属性。 2、引用类型&#xff0c;都有一个隐式原型 __proto__ 属性&#xff0c;属性值是一个普通的对象。 3、引用类型&#xff0c;隐式原型 __proto__ 的属…

Vue——案例01(查询用户)

一、案例实现页面 二、案例实现效果 1. 查询效果 2. 年龄升序 3. 年龄降序 4. 原顺序 三、案例实现思路 1. 定义界面所需标签样式 <div id"app"><h2>查询用户:</h2><input type"text" placeholder"请输入名字"/><b…

代码随想录算法训练营第35天| 435. 无重叠区间、763.划分字母区间、56. 合并区间

435. 无重叠区间 题目链接&#xff1a;无重叠区间 题目描述&#xff1a;给定一个区间的集合 intervals &#xff0c;其中 intervals[i] [starti, endi] 。返回 需要移除区间的最小数量&#xff0c;使剩余区间互不重叠 。 解题思想&#xff1a; 这道题目和射气球很像。 *“需…

rust使用Command库调用cmd命令或者shell命令,并支持多个参数和指定文件夹目录

想要在不同的平台上运行flutter doctor命令&#xff0c;就需要知道对应的平台是windows还是linux&#xff0c;如果是windows就需要调用cmd命令&#xff0c;如果是linux平台&#xff0c;就需要调用sh命令&#xff0c;所以可以通过cfg!实现不同平台的判断&#xff0c;然后调用不同…

代码随想录算法训练营第二十四天|77.组合、216.组合Ⅲ

文档链接&#xff1a;https://programmercarl.com/ LeetCode77.组合 题目链接&#xff1a;https://leetcode.cn/problems/combinations/ 思路&#xff1a; 回溯三部曲&#xff1a; 第一步&#xff1a;确定函数返回值和参数类型 第二步&#xff1a;确定终止条件 第三步&a…

JUC/多线程的基本使用(一)

一、基本使用 Thread、Runnable、FutureTask Java多线程-CSDN博客https://blog.csdn.net/m0_71534259/article/details/132381495?spm1001.2014.3001.5501 二、查看进程线程的方法 windows 任务管理器可以查看进程和线程数&#xff0c;也可以用来杀死进程 tasklist 查看…

ICLR2024:南洋理工发布!改几个参数就为大模型注入后门

随着大语言模型&#xff08;LLMs&#xff09;在处理自然语言处理&#xff08;NLP&#xff09;相关任务中的广泛应用&#xff0c;它们在人们日常生活中的作用日益凸显。例如&#xff0c;ChatGPT等模型已被用于各种文本生成、分类和情感分析任务。然而&#xff0c;这些模型潜在的…

上位机图像处理和嵌入式模块部署(qmacvisual非opencv算法编写)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 我们都知道&#xff0c;qmacvisual本身依赖于qtopencv的实现。大部分的界面都是依赖于qt的实现。图像算法部分&#xff0c;也是大部分都依赖于open…

RockChip Android8.1 Settings

一:Settings一级菜单 1、AndroidManifest.xml 每个APP对应都有一个AndroidManifest.xml,从该文件入手分析最为合适。 packages/apps/Settings/AndroidManifest.xml 根据<category android:name="android.intent.category.LAUNCHER" />可找到当前当前APP a…