2、RocketMQ源码分析(二)

RocketMQ的底层通信模块remoting

remoting是RocketMQ的底层通信模块,RocketMQ底层通讯是使用Netty来实现的。本文通过对remoting源码进行分析,来说明remoting如何实现高性能通信的。

二、Remoting 通信模块结构
remoting 的网络通信是基于 Netty 实现,模块中类的继承关系如下:

在这里插入图片描述
可见通信的类继承自类RemotingService,RemotingService的定义如下:

RemotingService是RPC 远程服务基础类。主要定义所有的远程服务类的基础方法:

public interface RemotingService {// 服务启动void start();// 服务停止void shutdown();//注册RPC钩子函数void registerRPCHook(RPCHook rpcHook);
}

抽象出服务端与客户端都需要的三个方法,其中注册hook是为了能够在远程通讯之后或调用之前执行用户自定义的逻辑。

RemotingServer/RemotingClient
远程服务器/客户端基础接口,两者中的方法基本类似,故这里重点介绍一下 RemotingServer,定位 RPC 远程操作的相关“业务方法”。

public interface RemotingServer extends RemotingService {/***  注册处理器* @param requestCode   请求码* @param processor     处理器* @param executor      线程池*    这三者是绑定关系:*       根据请求的code  找到处理对应请求的处理器与线程池 并完成业务处理。*/void registerProcessor(final int requestCode, final NettyRequestProcessor processor,final ExecutorService executor);/***  注册缺省处理器* @param processor  缺省处理器* @param executor   线程池*/void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);// 获取服务端口int localListenPort();/***  根据 请求码 获取 处理器和线程池* @param requestCode  请求码* @return*/Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);/***  同步调用* @param channel   通信通道* @param request   业务请求对象* @param timeoutMillis   超时时间* @return  响应结果封装*/RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,RemotingTimeoutException;/***  异步调用* @param channel  通信通道* @param request  业务请求对象* @param timeoutMillis  超时时间* @param invokeCallback  响应结果回调对象*/void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback) throws InterruptedException,RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;/***  单向调用 (不关注返回结果)* @param channel   通信通道* @param request   业务请求对象* @param timeoutMillis  超时时间*/void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,RemotingSendRequestException;
}

从上面的代码可以看出,RemotingServer的主要功能是注册请求协议处理器、请求调用方法。

NettyRemotingServer::服务端的实现类,实现了 RemotingServer 接口,继承 NettyRemotingAbstract 抽象类


public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);// Netty服务端启动器private final ServerBootstrap serverBootstrap;// worker组private final EventLoopGroup eventLoopGroupSelector;// boss组 private final EventLoopGroup eventLoopGroupBoss;// Netty服务端配置信息类private final NettyServerConfig nettyServerConfig;// 公共线程池   (在注册协议处理器的时候,若未给处理器指定线程池,那么就是用该公共线程池)private final ExecutorService publicExecutor;//  Netty Channel 特殊状态监听器private final ChannelEventListener  channelEventListener;// 定时器  (功能: 扫描 responseTable表,将过期的responseFuture移除)private final Timer timer = new Timer("ServerHouseKeepingService", true);// 用于在pipeline指定handler中 执行任务的线程池private DefaultEventExecutorGroup defaultEventExecutorGroup;// 服务端绑定的端口private int port = 0;private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";private static final String TLS_HANDLER_NAME = "sslHandler";private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";// 用于处理 SSL 握手连接的处理器private HandshakeHandler handshakeHandler;// 协议编码 处理器private NettyEncoder encoder;// 连接管理 处理器private NettyConnectManageHandler connectionManageHandler;// 核心业务 处理器private NettyServerHandler serverHandler;// 参数1: nettyServerConfig  Netty服务端配置信息// 参数2: channelEventListener  channel特殊状态监听器public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {// 调用父类  就是通过 Semaphore 设置请求并发限制// 1. 设置 单行请求的并发限制// 2. 设置 异步请求的并发限制super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;// 创建公共线程池 publicExecutor   线程数量为:4int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 下面就是根据操作系统平台来选择创建 bossGroup 和 workGroup的逻辑if (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}// 加载SSL连接的相关方法 (不在本篇的分析范围内)loadSslContext();}
}

NettyRemotingServer当中重要的参数:

父类的属性 semaphoreOneway , **semaphoreAsync ** 用来控制请求并发量的
serverBootstrap Netty服务器启动器
nettyServerConfig Netty服务器配置信息
channelEventListener Netty Channel状态监听器
eventLoopGroupSelector worker组
eventLoopGroupBoss boss组

NettyRemotingServer的启动

  // 启动Netty 服务器@Overridepublic void start() {// Netty pipeline中的指定 handler 采用该线程池执行this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});// 初始化 处理器 handler// 1. handshakeHandler  SSL连接// 2. encoder  编码器// 3. connectionManageHandler 连接管理器处理器// 4. serverHandler 核心业务处理器prepareSharableHandlers();// 下面就是 Netty 创建服务端启动器的固定流程 ServerBootstrap childHandler =// 配置服务端 启动对象// 配置工作组 boss 和 worker 组this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)// 设置服务端ServerSocketChannel 类型.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 设置服务端ch选项.option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false)// 设置客户端ch选项.childOption(ChannelOption.TCP_NODELAY, true)// 设置 接收缓冲区 和 发送缓冲区的 大小.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())// 设置服务器端口.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {// 初始化 客户端ch pipeline 的逻辑, 同时指定了线程池为 defaultEventExecutorGroupch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {// 客户端开启 内存池,使用的内存池 是 PooledByteBufAllocator.DEFAULTchildHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {//  服务器 绑定端口ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}// 条件成立: channel状态监听器不为空, 则创建 网络异常事件执行器if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 提交定时任务,每一秒 执行一次// 扫描 responseTable 表, 将过期的 responseFuture 移除this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}

上述代码 基本上就是 模板Netty创建服务端的代码,主要做了如下几件事:

启动Netty服务器
开启 channel状态监听线程
开启 扫描 responseFuture 的定时任务
在这里插入图片描述

通过这个结构图可以看出,RocketMQ 在 Netty 原生的多线程 Reactor 模型上做了一系列的扩展和优化,使用多个线程池来处理数据

1、一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP 网络连接请求,建立好连接,创建 SocketChannel,并注册到 selector 上。RocketMQ 的源码中会自动根据 OS 的类型选择 NIO 和 Epoll,也可以通过参数配置,然后监听真正的网络数据。
2、拿到网络数据后,再丢给 Worker 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),
3、在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,这些工作交给 defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为 8 )去做。
4、而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,然后封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。

NettyRemotingAbstract:抽象类NettyRemotingAbstract是NettyRemotingServer的父类,主要定义了请求并发量、控制响应对象和各种请求处理函数。

public abstract class NettyRemotingAbstract {// 控制 单向请求的 并发量protected final Semaphore semaphoreOneway;// 控制 异步请求的 并发量protected final Semaphore semaphoreAsync;// 响应对象映射表  (key: opaque  value:responseFuture)protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =new ConcurrentHashMap<Integer, ResponseFuture>(256);// 请求处理器映射表 (key: requestCode  value:(processor,executor)  )protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);// Netty事件监听线程池protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();// 默认的请求处理器对  包含(processor,executor) protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;// SSL相关protected volatile SslContext sslContext;// 扩展钩子protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
}

结合RocketMQ源码分析

01初始化流程
在Broker创建的时候会去初始化NettyRemotingServer类,调用其构造方法。

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 初始化用于接收客户端的TCP连接this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 根据配置设置NIO还是Epoll来作为Selector线程池// 默认在Linux环境为true,windows环境下都是NIO,相对来说Linux更快。if (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}// 加载SSLloadSslContext();
}

首先会初始化ServerBootstrap,NettyServerConfig等相关参数,同时会判断是在哪个操作系统,如果是在Linux平台则会使用EpollEventLoopGroup作为线程池,如果不是则会使用NioEventLoopGroup作为线程池。

02启动流程
在Broker作为服务端启动与NameServer作为服务端启动的时候都会来初始化,启动一个Netty的服务端进行相关的通讯。


@Override
public void start() {// 默认的处理线程池组,用于处理后面多个NettyHandle的操作this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();// 正常的一种Netty的服务端的启动逻辑// 其中包括解码器,编码器,心跳管理器,连接管理器,消息类型处理分发ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog()).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 定时扫描responseTable,获取返回结果,并处理超时业务。this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {// 超时请求扫描NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
}

初始化线程池组处理多个NettyHandle操作。
初始化ServerBootStrap,设置tcp参数,编解码器,心跳处理器,连接管理器,请求分发。
定时扫描ResponseTable,获取返回结果并处理超时业务。

NettyEncoder类
将RemotingCommand对象序列化为ByteBuffer对象。根据serializerType的不同,Header会编码为JSON或者二进制。

 @Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)throws Exception {try {remotingCommand.fastEncodeHeader(out);byte[] body = remotingCommand.getBody();if (body != null) {out.writeBytes(body);}} catch (Exception e) {log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);if (remotingCommand != null) {log.error(remotingCommand.toString());}RemotingUtil.closeChannel(ctx.channel());}
}

将RemotingCommand对象编码成ByteBuf进行数据的传递,先将Header部分进行编码,然后将body追加到ByteBuf中,Header的编码相对复杂因为在传递过程中我们得让被传递方知道该ByteBuf如何进行拆解,同时我们也要防止各种不正常的情况方法,这是就需要我们规定一些内容,这些都会在Netty里面去详讲,这里就提及一下。(第一个字节表示编解码类型)

NettyDecoder类
NettyEncoder继承自LengthFieldBasedFrameDecoder,主要有用于解码入站数据流,并将数据流解码为RemotingCommand对象。

@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {ByteBuf frame = null;try {frame = (ByteBuf) super.decode(ctx, in);if (null == frame) {return null;}return RemotingCommand.decode(frame);} catch (Exception e) {log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);RemotingUtil.closeChannel(ctx.channel());} finally {if (null != frame) {frame.release();}}return null;
}

首先会调用LengthFieldBasedFrameDecoder.decoder方法进行首次解码,然后调用NettyDecoder.decoder进行二次解码,完成Header与body的解码并转换成RemotingCommand对象。而之所以需要两次第一个就是为了解出对应的长度,即数据流中前4个字节的值表示有效数据域的长度,除开前4个字节外的内容都是有效数据域的内容,不存在偏移量。

IdleStateHandler类
进行心跳检查类,客户端与服务端之间需要保持长连接则需要通过一个心跳机制来保证有效性,当其中一者处于宕机或者网络延迟的情况下判断是否有效,如果无效及时进行释放资源。(该类三个参数,第一个为读空闲时间,第二个为写空闲时间,第三个为读或写空闲时间,如果在规定时间内没有触发对应事件就会触发定时任务的执行)

NettyConnectManageHandler类
用于监听pipeline中入站/出站的事件,主要进行日志记录等操作。

NettyServerHandler类
核心业务处理器,处理的时候会去调用channelRead0进行相关逻辑,最终会去执行processMessageReceived方法。


public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;if (cmd != null) {switch (cmd.getType()) {// 处理请求的过程case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:// 处理响应的过程processResponseCommand(ctx, cmd);break;default:break;}}
}

TYPE类型为REQUEST_COMMAND表示请求消息,则调用processRequestCommand方法进行处理。

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {// 从processorTable中找到对应的Processor,如果不存在则使用defaultRequestProcessor处理器final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {// 获取nameServerString remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());doBeforeRpcHooks(remoteAddr, cmd);final RemotingResponseCallback callback = new RemotingResponseCallback() {@Override// 服务端处理完请求之后调用public void callback(RemotingCommand response) {doAfterRpcHooks(remoteAddr, cmd, response);// 不是单向if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());try {// 往客户端输出结果ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}}};// 如果是异步处理的Processif (pair.getObject1() instanceof AsyncNettyRequestProcessor) {AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {// 不是异步知道拿到Processor进行相关处理NettyRequestProcessor processor = pair.getObject1();RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) {log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) == 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}
}

首先从processorTable获取对应的Processor,而processorTable的内容都是通过启动Broker或者是NameServer进行存入的,存储的结构为map以Code作为key以Processor作为值,当我们使用的时候只需要根据code查询即可,如果没有找到就直接使用默认的defaultRequestProcessor处理器。
如果获取到的Processor为异步的则调用异步处理请求,如果不是则直接调用Processor对应的processRequest方法进行处理。
TYPE类型为RESPONSE_COMMAND表示响应消息,则调用processResponseCommand进行处理。


public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {final int opaque = cmd.getOpaque();// 从map缓存获取正在进行的其中一个请求final ResponseFuture responseFuture = responseTable.get(opaque);if (responseFuture != null) {responseFuture.setResponseCommand(cmd);// 移除本次请求responseTable.remove(opaque);// 执行回调if (responseFuture.getInvokeCallback() != null) {executeInvokeCallback(responseFuture);} else {responseFuture.putResponse(cmd);responseFuture.release();}} else {log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(cmd.toString());}
}

超时请求扫描
在每次请求时将ResponseFuture放入Map中,通过定时扫描来判断记录时间与超时时间来比较判断是否超时。

public void scanResponseTable() {final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();while (it.hasNext()) {Entry<Integer, ResponseFuture> next = it.next();ResponseFuture rep = next.getValue();// 当前时间大于请求开始时间+超时等待时间+1秒,则任务超时了if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {rep.release();it.remove();rfList.add(rep);log.warn("remove timeout request, " + rep);}}// 执行被移除 ResponseFuture对象的executeInvokeCallback方法  for (ResponseFuture rf : rfList) {try {executeInvokeCallback(rf);} catch (Throwable e) {log.warn("scanResponseTable, operationComplete Exception", e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/239512.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

有两个篮子,分别为A 和 B,篮子A里装有鸡蛋,篮子B里装有苹 果,请用面向对象的思想实现两个篮子里的物品交换

问题&#xff1a; 有两个篮子&#xff0c;分别为A 和 B&#xff0c;篮子A里装有鸡蛋&#xff0c;篮子B里装有苹 果&#xff0c;请用面向对象的思想实现两个篮子里的物品交换 代码 package cn.ljh.algorithmic;/*** author JH*/ public class Demo07 {public static void main…

LangChain(0.0.340)官方文档三:Prompts上——自定义提示模板、使用实时特征或少量示例创建提示模板

文章目录 一、 Prompt templates1.1 langchain_core.prompts1.2 PromptTemplate1.2.1 简介1.2.2 ICEL1.2.3 Validate template 1.3 ChatPromptTemplate1.3.1 使用role创建1.3.2 使用MessagePromptTemplate创建1.3.3 自定义MessagePromptTemplate1.3.3.1 自定义消息角色名1.3.3.…

DCCK“启航计划“3+2第一课机器视觉导论

用相机代替人眼去获取图像&#xff0c;然后处理图像&#xff0c;给出指令。 如&#xff1a;相机获取可口可乐的液面高度图片&#xff0c;通过连接线床给图像处理程序&#xff0c;程序给出合格不合格的判断再执行后续操作 作用&#xff1a;机器不会疲劳&#xff0c;机器判断标准…

推荐3个完美替代 Navicat 的工具

现在企业&#xff0c;mysql数据库用的比较多&#xff0c;mysql数据库客户端的需求也就比较大&#xff0c;navicat就被大家所熟知。 这个工具&#xff0c;确实好用&#xff0c;功能也非常强大&#xff0c;但是&#xff0c;它的强大&#xff0c;是需要付费&#xff0c;或者用一些…

11.29 知识回顾(视图层、模板层)

一、视图层 1.1 响应对象 响应---》本质都是 HttpResponse -HttpResponse---》字符串 -render----》放个模板---》模板渲染是在后端完成 -js代码是在客户端浏览器里执行的 -模板语法是在后端执行的 -redirect----》重定向 -字符串参数不是…

Hdoop学习笔记(HDP)-Part.15 安装HIVE

目录 Part.01 关于HDP Part.02 核心组件原理 Part.03 资源规划 Part.04 基础环境配置 Part.05 Yum源配置 Part.06 安装OracleJDK Part.07 安装MySQL Part.08 部署Ambari集群 Part.09 安装OpenLDAP Part.10 创建集群 Part.11 安装Kerberos Part.12 安装HDFS Part.13 安装Ranger …

DataGtip如何跳过试用设置永久使用?(Windows2021-2023版本通用)

文章目录 一.打开DataGrip二.下载DataGrip的设置安装包三.最终步骤&#xff0c;输入Activation code 一.打开DataGrip 出现以下界面即表示需要用户激活&#xff0c;否则无法试用&#xff0c;这里打开后点击Exit退出。 二.下载DataGrip的设置安装包 1.下载安装包 点击跳转到百…

JVM 内存回收算法

文章目录 JVM 内存回收算法有哪些&#xff1a;一、分代收集1.分代收集理论2.垃圾收集 二、垃圾收集算法1. 标记-清除算法2. 复制算法3. 标记-整理算法 JVM就是Java虚拟机&#xff0c;JVM的内回收对其原理的认识也是很有必要的&#xff0c;当底层的系统出现内存溢出或者内存泄漏…

无脑018——win11部署whisper,语音转文字

1.conda创建环境 conda create -n whisper python3.9 conda activate whisper安装pytorch pip install torch1.8.1cu101 torchvision0.9.1cu101 torchaudio0.8.1 -f https://download.pytorch.org/whl/torch_stable.html安装whisper pip install -U openai-whisper2.准备模型…

服务器和Linux ,安装R rstudio ,常用软件

服务器的基本概念&#xff1a; 如服务器的基本结构&#xff0c;节点&#xff0c;端口的概念等。 服务器的基本设置和管理&#xff1a; 如何配置新服务器&#xff0c; 如何管理服务器&#xff0c;如何分配账户并确保他们互不访问&#xff0c; 如何全局安装软件让所有人都可以…

【深度优先】LeetCode1932:合并多棵二叉搜索树

作者推荐 动态规划LeetCode2552&#xff1a;优化了6版的1324模式 题目 给你 n 个 二叉搜索树的根节点 &#xff0c;存储在数组 trees 中&#xff08;下标从 0 开始&#xff09;&#xff0c;对应 n 棵不同的二叉搜索树。trees 中的每棵二叉搜索树 最多有 3 个节点 &#xff0…

树莓派搭建开发环境

背景 自从上次心血来潮给树莓派装完系统&#xff0c;一直没想好怎么具体使用它的场景&#xff0c;它就这样默默地躺在抽屉吃灰了一年 再次想起它&#xff0c;是一个周日的下午&#xff1a;收到之前在腾讯云买的云服务器快过期的提醒&#xff0c;一个4核8G内存的ubuntu&#x…