前言
我们都知道Netty
的NioEventLoop
是负责轮询检测IO事件、处理IO事件、执行所有任务等三个过程,服务端一旦在Netty服务端启动,就具备新连接处理的能力,而Netty处理新连接的整体步骤大致如下:
- NioEventLoop轮询检测是否有新连接。
- 检测到新连接时,为当前连接创建NioSocketChannel,也就是客户端连接的channel。
- 为channel分配一个NioEventLoop。
- 将当前channel的读写事件注册到NioEventLoop的selector上,此后这个channel的读写事件都由当前NioEventLoop处理。
本文就会基于这个脉络对Netty新连接接入的处理流程进行分析。
源码分析
整体流程
在服务端NioEventLoop启动之后,其selector就会开始轮询检测IO事件,假设我们的服务端端口为8888,那么在命令行键入:
telnet 127.0.0.1 8888
那么NioEventLoop的run方法就会通过select方法轮询检测到连接事件,于是代码就会走到processSelectedKeys来处理这个连接任务。
@Overrideprotected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}default:// fallthrough}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {//处理IO事件processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}//略}}
因为上一步检测到了IO事件,所以selectedKeys不为空,于是将selectedKeys读写索引进行重置之后,直接调用processSelectedKeysOptimized进行处理。
private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}
于是我们来到processSelectedKeysOptimized方法,拿到这个key和channel调用processSelectedKey(k, (AbstractNioChannel) a);
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {for (int i = 0;; i ++) {final SelectionKey k = selectedKeys[i];if (k == null) {break;}// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363for (;;) {i++;if (selectedKeys[i] == null) {break;}selectedKeys[i] = null;}selectAgain();// Need to flip the optimized selectedKeys to get the right reference to the array// and reset the index to -1 which will then set to 0 on the for loop// to start over again.//// See https://github.com/netty/netty/issues/1523selectedKeys = this.selectedKeys.flip();i = -1;}}}
因为我们的事件是连接事件,所以走到了unsafe.read();。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();if (!ch.isOpen()) {// Connection already closed - no need to handle write.return;}}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}
接着代码就走到AbstractNioMessageChannel的read方法,这就是我们的核心步骤所在,它整体处理流程为:
- 调用JDK底层API创建一个channel,并基于这个channel封装成一个NioSocketChannel。
- 将这个channel添加到readBuf这个列表中。
- 更新当前连接数,并判断当前连接数是否超过最大值(默认为16),如果超过则结束本次循环,反之继续处理新连接,完成后进入步骤4。
- 基于pipeline传 播ChannelRead、channelReadComplete,如果有错误还会执行exceptionCaught等事件。
@Overridepublic void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}
NioSocketChannel创建
从上述的流程我们了解了新连接接入的整体流程,接下来我们需要对整体流程进行展开分析,先说说收到新连接接入请求时NioSocketChannel创建这一步。
int localRead = doReadMessages(readBuf);
步入这段代码,我们来到了NioServerSocketChannel的doReadMessages方法 ,可以看到它的步骤比较简单:
- 基于javaChannel通过JDK底层代码创建一个channel
- 调用accept接受传入的连接请求并创建一个新的 SocketChannel 实例来表示与客户端的连接。
- 将其封装为NioSocketChannelbuf列表中,该列表即readBuf中,后续事件的传播就会通过这个列表拿到我们的channel进行传播。
- 返回1,说明添加了一个channel到列表中。
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = javaChannel().accept();try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}
连接数更新
完成channel的创建之后会进行连接数的更新,代码如下,可以看到这段代码会基于上一步的返回值1进行调用。
allocHandle.incMessagesRead(localRead);
于是我们来到了DefaultMaxMessagesRecvByteBufAllocator的incMessagesRead,可以看到该方法仅仅对totalMessages 进行数值更新,而totalMessages 的作用我们会在连接数校验时提及,这里先略过。
@Overridepublic final void incMessagesRead(int amt) {totalMessages += amt;}
处理完一个channel之后代码就会来到这段判断
while (allocHandle.continueReading())
步入其内部我们即可明白totalMessages 是用于校验当前连接数是大于最大连接连接总数的,如果不是则继续处理新连接,反之结束循环。
@Overridepublic boolean continueReading() {return config.isAutoRead() &&attemptedBytesRead == lastBytesRead &&totalMessages < maxMessagePerRead &&totalBytesRead < Integer.MAX_VALUE;}
新连接NioEventLoop分配和Selector注册
我们在我完成尽可能的channel创建之后,来到这段代码,可以看到它会从readBuf拿到我们所有的channel调用fireChannelRead完成事件传播。
int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}
而fireChannelRead内部会从headContext开始遍历pipeline上所有的inboundHandler
@Overridepublic final ChannelPipeline fireChannelRead(Object msg) {AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;}
每个处理器执行完之后都会依靠DefaultChannelPipeline的channelRead完成向下传播。
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);}
最终来到ServerBootstrapAcceptor,可以看到这段代码做到的事情也是非常清晰的:
- 添加childHandler
- 完成选项配置。
- 完成属性配置。
- 为当前channel分配一个eventLoop线程。
@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}
添加childHandler
child.pipeline().addLast(childHandler);
对应配置代码
.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new ChildHandler());//..}});
完成选项配置
for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}
对应的选项即是我们配置ServerBootstrap时用到的childOption中设置的选项。
//开始设置各种参数b.group(bossGroup, workerGroup)//设置IO模型.channel(NioServerSocketChannel.class)//用于给每个连接都设置一些TCP参数.childOption(ChannelOption.TCP_NODELAY, true)
完成属性配置
for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}
对应配置代码
b.group(bossGroup, workerGroup)//设置IO模型.channel(NioServerSocketChannel.class)//用于给每个连接都设置一些TCP参数.childOption(ChannelOption.TCP_NODELAY, true)//给每一个连接设置attr.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
为当前channel分配一个eventLoop线程
这步骤就比较核心了,register方法会进行:
- eventLoop获取。
try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}
启动线程,然后基于这个线程执行register0
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");}if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}
内部逻辑
private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}
线程启动后会调用fireChannelActive
private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
于是调用了HeadContext的readIfIsAutoRead
@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();readIfIsAutoRead();}
拿到channel调用read
private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {channel.read();}}
这个channel会调用
@Overridepublic Channel read() {pipeline.read();return this;}
然后
@Overridepublic final ChannelPipeline read() {tail.read();return this;}
最终不断遍历来到了HeadContext
@Overridepublic ChannelHandlerContext read() {final AbstractChannelHandlerContext next = findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeRead();} else {Runnable task = next.invokeReadTask;if (task == null) {next.invokeReadTask = task = new Runnable() {@Overridepublic void run() {next.invokeRead();}};}executor.execute(task);}return this;}
然后调用DefaultChannelPipeline的read
@Overridepublic void read(ChannelHandlerContext ctx) {unsafe.beginRead();}
其内部调用AbstractChannel的doBeginRead
@Overridepublic final void beginRead() {assertEventLoop();if (!isActive()) {return;}try {doBeginRead();} catch (final Exception e) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireExceptionCaught(e);}});close(voidPromise());}}
最终完成读事件的注册,自此所有流程分析完成
@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}
常见面试题
简单介绍一下服务端channel和客户端channel的区别
回答这个问题我们不妨看看channel的类图,可以看到顶层的channel定义了channel的行为,而AbstractChannel定义的channel的大体骨架,而我们的服务端和客户端都是基于AbstractChannel衍生创建的,接下来我们不妨基于代码的形式了解一下其设计。
先来看看顶层channel接口,它定义了socket、读、写、连接等事件绑定以及id、unsafe、pipeline等重要组件获取的方法。
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {/*** Returns the globally unique identifier of this {@link Channel}.*/ChannelId id();/*** Return the {@link EventLoop} this {@link Channel} was registered to.*/EventLoop eventLoop();/*** Returns the parent of this channel.** @return the parent channel.* {@code null} if this channel does not have a parent channel.*/Channel parent();/*** Returns the configuration of this channel.*/ChannelConfig config();/*** Returns {@code true} if the {@link Channel} is open and may get active later*/boolean isOpen();/*** Returns {@code true} if the {@link Channel} is registered with an {@link EventLoop}.*/boolean isRegistered();/*** Return {@code true} if the {@link Channel} is active and so connected.*/boolean isActive();/*** Return the {@link ChannelMetadata} of the {@link Channel} which describe the nature of the {@link Channel}.*/ChannelMetadata metadata();/*** Returns the local address where this channel is bound to. The returned* {@link SocketAddress} is supposed to be down-cast into more concrete* type such as {@link InetSocketAddress} to retrieve the detailed* information.** @return the local address of this channel.* {@code null} if this channel is not bound.*/SocketAddress localAddress();/*** Returns the remote address where this channel is connected to. The* returned {@link SocketAddress} is supposed to be down-cast into more* concrete type such as {@link InetSocketAddress} to retrieve the detailed* information.** @return the remote address of this channel.* {@code null} if this channel is not connected.* If this channel is not connected but it can receive messages* from arbitrary remote addresses (e.g. {@link DatagramChannel},* use {@link DatagramPacket#recipient()} to determine* the origination of the received message as this method will* return {@code null}.*/SocketAddress remoteAddress();/*** Returns the {@link ChannelFuture} which will be notified when this* channel is closed. This method always returns the same future instance.*/ChannelFuture closeFuture();/*** Returns {@code true} if and only if the I/O thread will perform the* requested write operation immediately. Any write requests made when* this method returns {@code false} are queued until the I/O thread is* ready to process the queued write requests.*/boolean isWritable();/*** Get how many bytes can be written until {@link #isWritable()} returns {@code false}.* This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.*/long bytesBeforeUnwritable();/*** Get how many bytes must be drained from underlying buffers until {@link #isWritable()} returns {@code true}.* This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.*/long bytesBeforeWritable();/*** Returns an <em>internal-use-only</em> object that provides unsafe operations.*/Unsafe unsafe();/*** Return the assigned {@link ChannelPipeline}.*/ChannelPipeline pipeline();/*** Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.*/ByteBufAllocator alloc();@OverrideChannel read();@OverrideChannel flush();/*** <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods* are only provided to implement the actual transport, and must be invoked from an I/O thread except for the* following methods:* <ul>* <li>{@link #invoker()}</li>* <li>{@link #localAddress()}</li>* <li>{@link #remoteAddress()}</li>* <li>{@link #closeForcibly()}</li>* <li>{@link #register(EventLoop, ChannelPromise)}</li>* <li>{@link #deregister(ChannelPromise)}</li>* <li>{@link #voidPromise()}</li>* </ul>*/interface Unsafe {/*** Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when* receiving data.*/RecvByteBufAllocator.Handle recvBufAllocHandle();/*** Return the {@link SocketAddress} to which is bound local or* {@code null} if none.*/SocketAddress localAddress();/*** Return the {@link SocketAddress} to which is bound remote or* {@code null} if none is bound yet.*/SocketAddress remoteAddress();/*** Register the {@link Channel} of the {@link ChannelPromise} and notify* the {@link ChannelFuture} once the registration was complete.*/void register(EventLoop eventLoop, ChannelPromise promise);/*** Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify* it once its done.*/void bind(SocketAddress localAddress, ChannelPromise promise);/*** Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.* If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just* pass {@code null} to it.** The {@link ChannelPromise} will get notified once the connect operation was complete.*/void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);/*** Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the* operation was complete.*/void disconnect(ChannelPromise promise);/*** Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the* operation was complete.*/void close(ChannelPromise promise);/*** Closes the {@link Channel} immediately without firing any events. Probably only useful* when registration attempt failed.*/void closeForcibly();/*** Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the* {@link ChannelPromise} once the operation was complete.*/void deregister(ChannelPromise promise);/*** Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the* {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing.*/void beginRead();/*** Schedules a write operation.*/void write(Object msg, ChannelPromise promise);/*** Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.*/void flush();/*** Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.* It will never be notified of a success or error and so is only a placeholder for operations* that take a {@link ChannelPromise} as argument but for which you not want to get notified.*/ChannelPromise voidPromise();/*** Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.*/ChannelOutboundBuffer outboundBuffer();}
}
而AbstractChannel则定义了核心变量的创建和初始化构造方法,还有上述接口大部分核心方法的实现。
private final Channel parent;private final ChannelId id;private final Unsafe unsafe;private final DefaultChannelPipeline pipeline;private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);private final CloseFuture closeFuture = new CloseFuture(this);private volatile SocketAddress localAddress;private volatile SocketAddress remoteAddress;private volatile EventLoop eventLoop;private volatile boolean registered;/** Cache for the string representation of this channel */private boolean strValActive;private String strVal;/*** Creates a new instance.** @param parent* the parent of this channel. {@code null} if there's no parent.*/protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}
接下来就是服务端channel了,从构造方法不难看出NioServerSocketChannel感兴趣的事件为OP_ACCEPT即客户端连接,而配置是用NioServerSocketChannelConfig进行管理。
public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);config = new NioServerSocketChannelConfig(this, javaChannel().socket());}
而客户端AbstractNioByteChannel感兴趣的事件为OP_READ,说明更关注消息读取。
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {super(parent, ch, SelectionKey.OP_READ);}
简述Netty新连接处理过程
由上述源码可知,其核心步骤为:
- 服务端对应的bossGroup轮询到accpect事件,即检测到了新的连接。
- 为当前连接创建NioSocketChannel,并完成unsafe、pipeline等重要组件初始化。
- 为当前channel分配nioEventLoop,并将这个channel的读事件注册到selector上,自此这条新连接就可以正常进行网络交互和数据读写了。
netty是在哪里检测到有新连接接入的
bossGroup的nioEventLoop线程会去轮询accept事件,然后通过jdk底层的accept方法完成这个连接的创建。
新连接时怎么注册到NioEventLoop线程的
bossGroup的NioEventLoop线程,轮询到accept得到一个新连接之后,pipeline会通过遍历的方式走到ServerBootstrapAcceptor的channelRead,该方法中childGroup就会对客户端的channel分配一个NioEventLoop线程。
try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}
查看register方法,可以看到next的调用。
@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}
步入next内部,可以看到chooser基于轮询算法从childGroup拿到一个NioEventLoop,最终调用register将这个channel注册到NioEventLoop线程上。
@Overridepublic EventExecutor next() {return chooser.next();}
参考文献
netty源码分析(17)- 新连接接入处理逻辑总结:https://www.jianshu.com/p/d983e690411d
netty源码分析(14)- channel代码架构总结:https://www.jianshu.com/p/17681b62be95