【Netty 源码】NioEventLoop 源码分析 篇二
1.NioEventLoop 继承关系
NioEventLoop继承自SingleThreadEventLoop,一次只能执行一个线程任务,因此在父类SingleThreadEventLoop中维护了 Queue tailTasks 线程队列。
NioEventLoop又间接继承了 AbstractScheduledEventExecutor ,因此也具有提交定时任务的能力
2. Selector 的创建时机
1.构造方法执行时赋值Selector
NioEventLoop 有且仅有一个有参构造方法,在构造方法执行时,对成员对象Selector 进行赋值
io.netty.channel.nio.NioEventLoop#NioEventLoop
2.两个Selector类型的成员变量
每个NioEventLoop 都维护了两个Selector
在其有参构造方法执行时,调用 openSelector() 方法,这里截图部分代码片段
final Selector unwrappedSelector;try {//调用java原生的api创建SelectorunwrappedSelector = provider.openSelector();} catch (IOException e) {throw new ChannelException("failed to open a new selector", e);}//如果没有开启对key Set集合的优化,默认返回原生的Selector,原生Selector遍历key时,使用的是set集合,效率低//默认 false,也就是开启对key set的优化if (DISABLE_KEY_SET_OPTIMIZATION) {return new SelectorTuple(unwrappedSelector);}.......
默认 DISABLE_KEY_SET_OPTIMIZATION 等于false
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
因此默认情况下,代码会继续往下执行
......Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.// This allows us to also do this in Java9+ without any extra flags.long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);long publicSelectedKeysFieldOffset =PlatformDependent.objectFieldOffset(publicSelectedKeysField);if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);return null;}// We could not retrieve the offset, lets try reflection as last-resort.}//开启暴力反射Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);if (cause != null) {return cause;}cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);if (cause != null) {return cause;}//将原生的setKeys集合替换selectedKeysField.set(unwrappedSelector, selectedKeySet);publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);.......
在这段代码中,我们可以看到Netty使用反射,将原生的Selector的两个成员变量 selectedKeys,publicSelectedKeys 进行替换,而替换后的对象SelectedSelectionKeySet,使用的是数组去存储keys
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
而原生的keys对象则是set集合
所以Netty对原生的Selector进行优化 第一个Selector对象 selector 遍历keys的效率更高,而第二Selector对象 unwrappedSelector 则是对第一个selector功能的完善,很多能力还是需要原生的Selectoral去实现
3. NioEventLoop 什么时候启动NIO线程
这里我们通过idea断点追踪调用栈类分析
public static void main(String[] args) {EventLoop next = new NioEventLoopGroup().next();next.execute(new Runnable() {@Overridepublic void run() {System.out.println("任务执行");}});
}
通过调用栈,发现main线程最终调用到 io.netty.util.concurrent.SingleThreadEventExecutor#addTask ,然后由nioEventLoopGroup-2-1线程调用run方法执行
NioEventLoop执行线程任务时,会调用父类SingleThreadEventExecutor.execute 方法,然后再调用 addTask 方法,将任务添加到 taskQueue 队列中。
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();//将任务添加到队列中 只是添加 取出使用在后面addTask(task);//此时是main方法调用 false false 就是tureif (!inEventLoop) {// 开始执行线程任务startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}
}
private void startThread() {// 更改线程状态if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {//继续往下跟doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}
}
io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
往 线程池 中提交任务,调用 io.netty.channel.nio.NioEventLoop#run 方法
private void doStartThread() {assert thread == null;//往线程池中提交任务executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {//调用NioEventLoop的run方法 启动nio线程,监听事件并从队列中弹出任务并执行SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = state;if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {if (logger.isErrorEnabled()) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +"be called before run() implementation terminates.");}}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {//关闭selectorcleanup();} finally {// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify// the future. The user may block on the future and once it unblocks the JVM may terminate// and start unloading classes.// See https://github.com/netty/netty/issues/6596.FastThreadLocal.removeAll();STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.countDown();if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}
·死循环监听、处理事件
protected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:select(wakenUp.getAndSet(false));// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and// 'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and// 'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {//处理keysprocessSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}
}
因此我们从这可以知道,在首次调用EventLoop的executor方法,将会启动nio线程,重复调用并不会重复启动nio线程,因为有状态位进行控制
4. NIO线程会阻塞普通任务吗?
NioEventLoop 首先它是单线程 ,不仅仅会启动 Nio线程,有时还需要执行普通任务,那么nio会影响普通任务的执行吗?
Selector.select具有阻塞性,首先我们看netty是怎么处理的
io.netty.channel.nio.NioEventLoop#select
/*** 核心思想:没有task要做时,select阻塞1s,如果有task,wakeup去做。* @param oldWakenUp* @throws IOException*/
private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();//按scheduled的task时间来计算select timeout时间。long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();if (nextWakeupTime != normalizedDeadlineNanos) {nextWakeupTime = normalizedDeadlineNanos;}for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) { //已经有定时task需要执行了,或者超过最长等待时间了if (selectCnt == 0) {//非阻塞,没有数据返回0selector.selectNow();selectCnt = 1;}break;}// If a task was submitted when wakenUp value was true, the task didn't get a chance to call// Selector#wakeup. So we need to check task queue again before executing select operation.// If we don't, the task might be pended until select operation was timed out.// It might be pended until idle timeout if IdleStateHandler existed in pipeline.if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}//设置超时时间,不会一直阻塞 其次//下面select阻塞中,别人唤醒也可以可以的int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - Selected something,// - waken up by user, or// - the task queue has a pending task.// - a scheduled task is ready for processingbreak;}if (Thread.interrupted()) {// Thread was interrupted so reset selected keys and break so we not run into a busy loop.// As this is most likely a bug in the handler of the user or it's client library we will// also log it.//// See https://github.com/netty/netty/issues/2426if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The code exists in an extra method to ensure the method is not too big to inline as this// branch is not very likely to get hit very frequently.selector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}// Harmless exception - log anyway}
}
可以看到 selector.select(timeoutMillis) 设置了超时时间。其次,nio线程只有首次NioEventloop调用executor方法才会启动,后续再次调用不会二次启动,并且会唤醒Selector.select
io.netty.util.concurrent.SingleThreadEventExecutor#execute
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();//将任务添加到队列中 只是添加 取出使用在后面addTask(task);//此时是main方法调用 false false 就是tureif (!inEventLoop) {// 开始执行线程任务//二次调用不会执行startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}//唤醒Selectorif (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}
注意,此时我们是在NioEventloop的父类SingleThreadEventExecutor中跟踪源码,子类NioEventloop 对父类的 wakeup 方法进行了重写
io.netty.channel.nio.NioEventLoop#wakeup
@Override
protected void wakeup(boolean inEventLoop) {if (!inEventLoop && wakenUp.compareAndSet(false, true)) {//唤醒 selector 以便执行普通任务selector.wakeup();}
}
5.什么时候去获取Selector上的事件
回到run方法,处理逻辑都在这个switch代码块中
io.netty.channel.nio.NioEventLoop#run
selectNowSupplier 是 IntSupplier 接口的实现类,重写了get方法,重写后的get方法会调用Selector.selectNow()方法,立即获取当前Selector上的IO事件
hasTasks() 判断当前Queue中是否有任务
在来看这个策略方法 :
如果有任务,获取当前Selector上的IO事件,并立刻返回 ;如果没有任务,返回 SelectStrategy.SELECT,进入匹配case,执行select(wakenUp.getAndSet(false)) 方法 超时阻塞等待获取Selector上的io事件
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
在当前Queue中有任务的情况下,即会处理keys,也会执行所有的任务
try {//处理keysprocessSelectedKeys();
} finally {// Ensure we always run tasks.runAllTasks();
}
6.Netty怎么处理NIO空轮询BUG
1.什么是NIO空轮询BUG
即使无客户端连接,NIO照样不断的从select本应该阻塞的Selector.select()
中wake up出来,导致CPU100%问题
2.Netty处理方式
io.netty.channel.nio.NioEventLoop#select
for (;;) {int selectedKeys = selector.select(timeoutMillis);selectCnt ++;long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The code exists in an extra method to ensure the method is not too big to inline as this// branch is not very likely to get hit very frequently.selector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}}
在这截取了部分代码,可以看到Netty定义了一个 selectCnt ,当 selectCnt > SELECTOR_AUTO_REBUILD_THRESHOLD 时,就会调用selectRebuildSelector(selectCnt)创建一个新的selector。
SELECTOR_AUTO_REBUILD_THRESHOLD的默认值为512.用户可以设置 io.netty.selectorAutoRebuildThreshold 的值来进行控制
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
7.Netty怎么控制NIO事件处理与普通任务执行的任务时间
for循环执行在一个单线程中,又要执行 processSelectedKeys 处理Selector上的io事件,又要执行 runAllTasks 执行普通任务。
因此Netty需要协调两个任务的执行时间。
io.netty.channel.nio.NioEventLoop#ioRatio int类型的变量,默认值为50
private volatile int ioRatio = 50;
io.netty.channel.nio.NioEventLoop#run
for (;;) {//ioRatio默认等于50 ,走elseif (ioRatio == 100) {try {processSelectedKeys();} finally {runAllTasks();}} else {//记录 Selector处理io事件的开始时间final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {//当前时间 - io事件开始时间 = io耗时时间final long ioTime = System.nanoTime() - ioStartTime// io耗时时间 * ioRatio比例 = 普通任务的执行时间runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}}