Netty新连接接入源码分析

前言

我们都知道NettyNioEventLoop是负责轮询检测IO事件、处理IO事件、执行所有任务等三个过程,服务端一旦在Netty服务端启动,就具备新连接处理的能力,而Netty处理新连接的整体步骤大致如下:

  1. NioEventLoop轮询检测是否有新连接。
  2. 检测到新连接时,为当前连接创建NioSocketChannel,也就是客户端连接的channel。
  3. 为channel分配一个NioEventLoop。
  4. 将当前channel的读写事件注册到NioEventLoop的selector上,此后这个channel的读写事件都由当前NioEventLoop处理。

本文就会基于这个脉络对Netty新连接接入的处理流程进行分析。

源码分析

整体流程

在服务端NioEventLoop启动之后,其selector就会开始轮询检测IO事件,假设我们的服务端端口为8888,那么在命令行键入:

telnet 127.0.0.1 8888

那么NioEventLoop的run方法就会通过select方法轮询检测到连接事件,于是代码就会走到processSelectedKeys来处理这个连接任务。

 @Overrideprotected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}default:// fallthrough}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {//处理IO事件processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}//略}}

因为上一步检测到了IO事件,所以selectedKeys不为空,于是将selectedKeys读写索引进行重置之后,直接调用processSelectedKeysOptimized进行处理。

private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}

于是我们来到processSelectedKeysOptimized方法,拿到这个key和channel调用processSelectedKey(k, (AbstractNioChannel) a);

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {for (int i = 0;; i ++) {final SelectionKey k = selectedKeys[i];if (k == null) {break;}// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363for (;;) {i++;if (selectedKeys[i] == null) {break;}selectedKeys[i] = null;}selectAgain();// Need to flip the optimized selectedKeys to get the right reference to the array// and reset the index to -1 which will then set to 0 on the for loop// to start over again.//// See https://github.com/netty/netty/issues/1523selectedKeys = this.selectedKeys.flip();i = -1;}}}

因为我们的事件是连接事件,所以走到了unsafe.read();。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();if (!ch.isOpen()) {// Connection already closed - no need to handle write.return;}}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}

接着代码就走到AbstractNioMessageChannel的read方法,这就是我们的核心步骤所在,它整体处理流程为:

  1. 调用JDK底层API创建一个channel,并基于这个channel封装成一个NioSocketChannel。
  2. 将这个channel添加到readBuf这个列表中。
  3. 更新当前连接数,并判断当前连接数是否超过最大值(默认为16),如果超过则结束本次循环,反之继续处理新连接,完成后进入步骤4。
  4. 基于pipeline传 播ChannelRead、channelReadComplete,如果有错误还会执行exceptionCaught等事件。
@Overridepublic void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}

NioSocketChannel创建

从上述的流程我们了解了新连接接入的整体流程,接下来我们需要对整体流程进行展开分析,先说说收到新连接接入请求时NioSocketChannel创建这一步。

int localRead = doReadMessages(readBuf);

步入这段代码,我们来到了NioServerSocketChannel的doReadMessages方法 ,可以看到它的步骤比较简单:

  1. 基于javaChannel通过JDK底层代码创建一个channel
  2. 调用accept接受传入的连接请求并创建一个新的 SocketChannel 实例来表示与客户端的连接。
  3. 将其封装为NioSocketChannelbuf列表中,该列表即readBuf中,后续事件的传播就会通过这个列表拿到我们的channel进行传播。
  4. 返回1,说明添加了一个channel到列表中。
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = javaChannel().accept();try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}

连接数更新

完成channel的创建之后会进行连接数的更新,代码如下,可以看到这段代码会基于上一步的返回值1进行调用。

 allocHandle.incMessagesRead(localRead);

于是我们来到了DefaultMaxMessagesRecvByteBufAllocator的incMessagesRead,可以看到该方法仅仅对totalMessages 进行数值更新,而totalMessages 的作用我们会在连接数校验时提及,这里先略过。

@Overridepublic final void incMessagesRead(int amt) {totalMessages += amt;}

处理完一个channel之后代码就会来到这段判断

while (allocHandle.continueReading())

步入其内部我们即可明白totalMessages 是用于校验当前连接数是大于最大连接连接总数的,如果不是则继续处理新连接,反之结束循环。

  @Overridepublic boolean continueReading() {return config.isAutoRead() &&attemptedBytesRead == lastBytesRead &&totalMessages < maxMessagePerRead &&totalBytesRead < Integer.MAX_VALUE;}

新连接NioEventLoop分配和Selector注册

我们在我完成尽可能的channel创建之后,来到这段代码,可以看到它会从readBuf拿到我们所有的channel调用fireChannelRead完成事件传播。

 int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}

而fireChannelRead内部会从headContext开始遍历pipeline上所有的inboundHandler

@Overridepublic final ChannelPipeline fireChannelRead(Object msg) {AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;}

每个处理器执行完之后都会依靠DefaultChannelPipeline的channelRead完成向下传播。

 @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);}

最终来到ServerBootstrapAcceptor,可以看到这段代码做到的事情也是非常清晰的:

  1. 添加childHandler
  2. 完成选项配置。
  3. 完成属性配置。
  4. 为当前channel分配一个eventLoop线程。
@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}
添加childHandler
 child.pipeline().addLast(childHandler);

对应配置代码

 .childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new ChildHandler());//..}});
完成选项配置
 for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}

对应的选项即是我们配置ServerBootstrap时用到的childOption中设置的选项。

 //开始设置各种参数b.group(bossGroup, workerGroup)//设置IO模型.channel(NioServerSocketChannel.class)//用于给每个连接都设置一些TCP参数.childOption(ChannelOption.TCP_NODELAY, true)
完成属性配置
 for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}

对应配置代码

 b.group(bossGroup, workerGroup)//设置IO模型.channel(NioServerSocketChannel.class)//用于给每个连接都设置一些TCP参数.childOption(ChannelOption.TCP_NODELAY, true)//给每一个连接设置attr.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
为当前channel分配一个eventLoop线程

这步骤就比较核心了,register方法会进行:

  1. eventLoop获取。
 try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}

启动线程,然后基于这个线程执行register0

@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");}if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}

内部逻辑

private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}

线程启动后会调用fireChannelActive

private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}

于是调用了HeadContext的readIfIsAutoRead

@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();readIfIsAutoRead();}

拿到channel调用read

private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {channel.read();}}

这个channel会调用

@Overridepublic Channel read() {pipeline.read();return this;}

然后

@Overridepublic final ChannelPipeline read() {tail.read();return this;}

最终不断遍历来到了HeadContext

 @Overridepublic ChannelHandlerContext read() {final AbstractChannelHandlerContext next = findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeRead();} else {Runnable task = next.invokeReadTask;if (task == null) {next.invokeReadTask = task = new Runnable() {@Overridepublic void run() {next.invokeRead();}};}executor.execute(task);}return this;}

然后调用DefaultChannelPipeline的read

@Overridepublic void read(ChannelHandlerContext ctx) {unsafe.beginRead();}

其内部调用AbstractChannel的doBeginRead

@Overridepublic final void beginRead() {assertEventLoop();if (!isActive()) {return;}try {doBeginRead();} catch (final Exception e) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireExceptionCaught(e);}});close(voidPromise());}}

最终完成读事件的注册,自此所有流程分析完成

@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}

常见面试题

简单介绍一下服务端channel和客户端channel的区别

回答这个问题我们不妨看看channel的类图,可以看到顶层的channel定义了channel的行为,而AbstractChannel定义的channel的大体骨架,而我们的服务端和客户端都是基于AbstractChannel衍生创建的,接下来我们不妨基于代码的形式了解一下其设计。

在这里插入图片描述

先来看看顶层channel接口,它定义了socket、读、写、连接等事件绑定以及id、unsafe、pipeline等重要组件获取的方法。

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {/*** Returns the globally unique identifier of this {@link Channel}.*/ChannelId id();/*** Return the {@link EventLoop} this {@link Channel} was registered to.*/EventLoop eventLoop();/*** Returns the parent of this channel.** @return the parent channel.*         {@code null} if this channel does not have a parent channel.*/Channel parent();/*** Returns the configuration of this channel.*/ChannelConfig config();/*** Returns {@code true} if the {@link Channel} is open and may get active later*/boolean isOpen();/*** Returns {@code true} if the {@link Channel} is registered with an {@link EventLoop}.*/boolean isRegistered();/*** Return {@code true} if the {@link Channel} is active and so connected.*/boolean isActive();/*** Return the {@link ChannelMetadata} of the {@link Channel} which describe the nature of the {@link Channel}.*/ChannelMetadata metadata();/*** Returns the local address where this channel is bound to.  The returned* {@link SocketAddress} is supposed to be down-cast into more concrete* type such as {@link InetSocketAddress} to retrieve the detailed* information.** @return the local address of this channel.*         {@code null} if this channel is not bound.*/SocketAddress localAddress();/*** Returns the remote address where this channel is connected to.  The* returned {@link SocketAddress} is supposed to be down-cast into more* concrete type such as {@link InetSocketAddress} to retrieve the detailed* information.** @return the remote address of this channel.*         {@code null} if this channel is not connected.*         If this channel is not connected but it can receive messages*         from arbitrary remote addresses (e.g. {@link DatagramChannel},*         use {@link DatagramPacket#recipient()} to determine*         the origination of the received message as this method will*         return {@code null}.*/SocketAddress remoteAddress();/*** Returns the {@link ChannelFuture} which will be notified when this* channel is closed.  This method always returns the same future instance.*/ChannelFuture closeFuture();/*** Returns {@code true} if and only if the I/O thread will perform the* requested write operation immediately.  Any write requests made when* this method returns {@code false} are queued until the I/O thread is* ready to process the queued write requests.*/boolean isWritable();/*** Get how many bytes can be written until {@link #isWritable()} returns {@code false}.* This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.*/long bytesBeforeUnwritable();/*** Get how many bytes must be drained from underlying buffers until {@link #isWritable()} returns {@code true}.* This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.*/long bytesBeforeWritable();/*** Returns an <em>internal-use-only</em> object that provides unsafe operations.*/Unsafe unsafe();/*** Return the assigned {@link ChannelPipeline}.*/ChannelPipeline pipeline();/*** Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.*/ByteBufAllocator alloc();@OverrideChannel read();@OverrideChannel flush();/*** <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods* are only provided to implement the actual transport, and must be invoked from an I/O thread except for the* following methods:* <ul>*   <li>{@link #invoker()}</li>*   <li>{@link #localAddress()}</li>*   <li>{@link #remoteAddress()}</li>*   <li>{@link #closeForcibly()}</li>*   <li>{@link #register(EventLoop, ChannelPromise)}</li>*   <li>{@link #deregister(ChannelPromise)}</li>*   <li>{@link #voidPromise()}</li>* </ul>*/interface Unsafe {/*** Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when* receiving data.*/RecvByteBufAllocator.Handle recvBufAllocHandle();/*** Return the {@link SocketAddress} to which is bound local or* {@code null} if none.*/SocketAddress localAddress();/*** Return the {@link SocketAddress} to which is bound remote or* {@code null} if none is bound yet.*/SocketAddress remoteAddress();/*** Register the {@link Channel} of the {@link ChannelPromise} and notify* the {@link ChannelFuture} once the registration was complete.*/void register(EventLoop eventLoop, ChannelPromise promise);/*** Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify* it once its done.*/void bind(SocketAddress localAddress, ChannelPromise promise);/*** Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.* If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just* pass {@code null} to it.** The {@link ChannelPromise} will get notified once the connect operation was complete.*/void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);/*** Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the* operation was complete.*/void disconnect(ChannelPromise promise);/*** Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the* operation was complete.*/void close(ChannelPromise promise);/*** Closes the {@link Channel} immediately without firing any events.  Probably only useful* when registration attempt failed.*/void closeForcibly();/*** Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the* {@link ChannelPromise} once the operation was complete.*/void deregister(ChannelPromise promise);/*** Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the* {@link ChannelPipeline}.  If there's already a pending read operation, this method does nothing.*/void beginRead();/*** Schedules a write operation.*/void write(Object msg, ChannelPromise promise);/*** Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.*/void flush();/*** Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.* It will never be notified of a success or error and so is only a placeholder for operations* that take a {@link ChannelPromise} as argument but for which you not want to get notified.*/ChannelPromise voidPromise();/*** Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.*/ChannelOutboundBuffer outboundBuffer();}
}

而AbstractChannel则定义了核心变量的创建和初始化构造方法,还有上述接口大部分核心方法的实现。

private final Channel parent;private final ChannelId id;private final Unsafe unsafe;private final DefaultChannelPipeline pipeline;private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);private final CloseFuture closeFuture = new CloseFuture(this);private volatile SocketAddress localAddress;private volatile SocketAddress remoteAddress;private volatile EventLoop eventLoop;private volatile boolean registered;/** Cache for the string representation of this channel */private boolean strValActive;private String strVal;/*** Creates a new instance.** @param parent*        the parent of this channel. {@code null} if there's no parent.*/protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}

接下来就是服务端channel了,从构造方法不难看出NioServerSocketChannel感兴趣的事件为OP_ACCEPT即客户端连接,而配置是用NioServerSocketChannelConfig进行管理。

 public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);config = new NioServerSocketChannelConfig(this, javaChannel().socket());}

而客户端AbstractNioByteChannel感兴趣的事件为OP_READ,说明更关注消息读取。

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {super(parent, ch, SelectionKey.OP_READ);}

简述Netty新连接处理过程

由上述源码可知,其核心步骤为:

  1. 服务端对应的bossGroup轮询到accpect事件,即检测到了新的连接。
  2. 为当前连接创建NioSocketChannel,并完成unsafe、pipeline等重要组件初始化。
  3. 为当前channel分配nioEventLoop,并将这个channel的读事件注册到selector上,自此这条新连接就可以正常进行网络交互和数据读写了。

netty是在哪里检测到有新连接接入的

bossGroup的nioEventLoop线程会去轮询accept事件,然后通过jdk底层的accept方法完成这个连接的创建。

新连接时怎么注册到NioEventLoop线程的

bossGroup的NioEventLoop线程,轮询到accept得到一个新连接之后,pipeline会通过遍历的方式走到ServerBootstrapAcceptor的channelRead,该方法中childGroup就会对客户端的channel分配一个NioEventLoop线程。

 try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}

查看register方法,可以看到next的调用。

 @Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}

步入next内部,可以看到chooser基于轮询算法从childGroup拿到一个NioEventLoop,最终调用register将这个channel注册到NioEventLoop线程上。

 @Overridepublic EventExecutor next() {return chooser.next();}

参考文献

netty源码分析(17)- 新连接接入处理逻辑总结:https://www.jianshu.com/p/d983e690411d

netty源码分析(14)- channel代码架构总结:https://www.jianshu.com/p/17681b62be95

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/277025.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Chart.js 实现实时动态折线图 并限制最大长度

<!DOCTYPE html> <html><head><title>模拟</title><script src"https://lib.sinaapp.com/js/jquery/3.1.0/jquery-3.1.0.min.js"></script><script src"https://cdn.staticfile.org/Chart.js/3.9.1/chart.js"…

Axure元件库的介绍以及个人简介和登录界面案例展示

目录 一. 元件介绍 二. 基本元件的使用 2.1 形状元件 2.2 图片元件 2.3 占位符 2.4 文本 2.5 线段元件 2.6 热区文件 三. 表单元件的使用 3.1 文本框 3.2 文本域 3.3 下拉列表 3.4 列表框 3.5 复选框 3.6 单选按钮 四. 菜单与表格元件的使用 4.1 树 4.2 表格…

深度学习 Day12——P1实现mnist手写数字识别

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 文章目录 前言1 我的环境2 代码实现与执行结果2.1 前期准备2.1.1 引入库2.1.2 设置GPU&#xff08;如果设备上支持GPU就使用GPU,否则使用C…

测试总监给我分享的《接口自动化测试》总结,让我成功的入门接口自动化门槛......

前两天在测试技术交流群里&#xff0c;听了一位字节跳动的测试总监分享的接口自动化测试的内容&#xff0c;对接口自动化更加了解了&#xff0c;也为自己接下来在公司实施接口自动化项目提供了思路。 前言 自动化测试&#xff0c;算是近几年比较火热的一个话题&#xff0c;当…

Linux centos7安装redis 6.2.14 gz并且使用systemctl为开机自启动 / 彻底删除 redis

1.下载 && 减压 wget http://download.redis.io/releases/redis-6.2.14.tar.gz tar -zvxf redis-6.2.14.tar.gz 2.编译&#xff08;分开运行&#xff09; cd redis-6.2.14 make cd src make install 安装目录展示 3.redis.conf 配置更改 daemonize yes supervised s…

【LeetCode:2132. 用邮票贴满网格图 | 二维前缀和 + 二维差分和】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

使用@ApiModel和@ApiModelProperty的技巧

在现代软件开发中&#xff0c;提供清晰全面的 API 文档 至关重要。ApiModel 和 ApiModelProperty 这样的代码注解在此方面表现出色&#xff0c;通过增强模型及其属性的元数据来丰富文档内容。它们的主要功能是为这些元素命名和描述&#xff0c;使生成的 API 文档更加明确。 Api…

云原生之深入解析网络服务Istio、eBPF和RSocket Broker

一、服务治理 ① “服务治理”简介 在微服务时代&#xff0c;一个复杂的应用程序被分解为多个组件化、协作和连接的单元&#xff0c;服务往往会承担越来越多的业务责任&#xff0c;这使得服务治理的难度前所未有&#xff0c;仅仅依靠微服务框架级的治理是不够的&#xff0c;构…

centos7部署docker

文章目录 &#xff08;1&#xff09;安装前准备&#xff08;2&#xff09;卸载旧版Docker&#xff08;3&#xff09;安装docker&#xff08;4&#xff09;配置镜像加速 &#xff08;1&#xff09;安装前准备 在开始安装之前&#xff0c;首先需要检查内核版本。使用 uname -r 命…

0x21 树与图的遍历

0x21 树与图的遍历 树与图最常见的储存方式就是使用一个邻接表保存它们的边集。邻接表以head数组为表头&#xff0c;使用ver和edge数组分别存储边的终点和权值&#xff0c;使用next数组模拟链表指针&#xff08;就像我们在0x13节中讲解邻接表所给出的代码那样&#xff09;。 …

【评测脚本】agent资源监控

背景 在之前的文章中提到过,我们在测试过程中需要对机器的资源进行评测。在实际工作中,我们还会经常遇到的场景就是对于agent-server类型的业务,当部署完成后,需要对部署在机器上的agent进行资源占用的观测,不能舍本逐末,由于agent的异常资源占用,导致原有业务受机器资…

iptables基础 iptables-save iptables-persistent持久化

介绍 iptables由上而下&#xff0c;由Tables&#xff0c;Chains&#xff0c;Rules组成。 一、iptables的表tables与链chains iptables有Filter, NAT, Mangle, Raw四种内建表&#xff1a; 1. Filter表 Filter是iptables的默认表&#xff0c;它有以下三种内建链(chains)&…