前言
PullRequestHoldService 继承了ServiceThread类,它本身是一个线程,以后台方式无线循环运行,支持长轮询(默认5秒)和短轮询(默认1秒)两种方式(CountDownlatch 方式控制)控制线程执行间隔。5秒钟后,线程内部会检查被挂起的请求(消费者建立连接后,会立即执行一次消息拉取服务,如果能拉到消息直接返回相应,如果拉不到,就会被挂起到 PullMessHoldService 服务),通知消息到达,调用 PullMessageProcessor 消息拉取处理组件,再进行一次拉取处理,如果能拉到消息,就直接返回,如果拉取不到再次挂起。
除了以上方式唤醒被挂起的消息拉取请求,还有一个 NotifyMessageArrivingListener 消息到达监听器,可以监听 topic-queueId-tagCode 维度的消息,进行通知唤醒被挂起的线程。
本次涉及三个核心组件:
- PullRequestHoldService 长轮询消息拉取组件;
- PullMessageProcessor 拉取消息处理组件;
- NotifyMessageArrivingListener 消息到达监听组件;
源码版本:4.9.3
源码架构图
核心数据结构
长轮询消息拉取组件数据结构,维护了一个拉取请求处理列表:Map<topic@queueId, 挂起的请求列表> pullRequestTable 。
public class PullRequestHoldService extends ServiceThread {// 系统时钟private final SystemClock systemClock = new SystemClock();// 长轮询消息拉取请求表,key为topic@queueId,value为PullRequest集合protected ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =new ConcurrentHashMap<String, ManyPullRequest>(1024);
}
pullRequestTable 拉取请求处理列表的 value对象(ManyPullRequest)维护的核心数据结构是被挂起的请求列表。
public class ManyPullRequest {// 拉取请求列表private final ArrayList<PullRequest> pullRequestList = new ArrayList<>();
}
核心行为流程
唤醒流程一
PullMessageProcessor 接受网络通信请求,处理消息拉取请求。从messageStore 消息存储组件拉取消息,如果拉不到就将请求,挂起到pullRequestHoldService长轮询消息拉取组件。
// pull方式拉取消息处理器组件
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {....private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException {// 一堆校验...// 关键点:从消息存储组件拉取消息MessageStorefinal GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);....// 处理查询结果,没有拉到消息时,将请求挂起到hold服务case ResponseCode.PULL_NOT_FOUND:if (brokerAllowSuspend && hasSuspendFlag) {...
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}}...}
PullRequestHoldService 长轮询消息拉取组件,接受挂起的请求,维护在内存中。且以后台线程方式,轮询pullRequestTable,唤醒所有请求,尝试拉取消息。
// 长轮询消息拉取组件
public class PullRequestHoldService extends ServiceThread {// 长轮询消息拉取请求表,key为topic@queueId,value为PullRequest集合protected ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =new ConcurrentHashMap<String, ManyPullRequest>(1024);// 如果没有拉取到请求,就挂起拉取请求,等待当前线程长轮询处理public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (null == mpr) {mpr = new ManyPullRequest();ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);if (prev != null) {mpr = prev;}}mpr.addPullRequest(pullRequest);}@Overridepublic void run() {log.info("{} service started", this.getServiceName());// 循环,处理被挂起的拉取请求while (!this.isStopped()) {try {// 长轮询,默认5秒if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {this.waitForRunning(5 * 1000);} else {// 短轮询,默认1秒this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}long beginLockTimestamp = this.systemClock.now();// 处理被挂起的拉取请求this.checkHoldRequest();long costTime = this.systemClock.now() - beginLockTimestamp;if (costTime > 5 * 1000) {log.info("[NOTIFYME] check hold request cost {} ms.", costTime);}} catch (Throwable e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info("{} service end", this.getServiceName());}protected void checkHoldRequest() {// 遍历被挂起的拉取请求列表for (String key : this.pullRequestTable.keySet()) {String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);if (2 == kArray.length) {String topic = kArray[0];int queueId = Integer.parseInt(kArray[1]);// 从消息存储组件获取当前队列消费的最大偏移量final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);try {// 通知消息消费者this.notifyMessageArriving(topic, queueId, offset);} catch (Throwable e) {log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);}}}}public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);}public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {String key = this.buildKey(topic, queueId);// 获取指定topic-queueId被挂起的拉取请求列表ManyPullRequest mpr = this.pullRequestTable.get(key);if (mpr != null) {// 克隆被挂起的拉取请求列表List<PullRequest> requestList = mpr.cloneListAndClear();if (requestList != null) {List<PullRequest> replayList = new ArrayList<PullRequest>();// 遍历所有请求for (PullRequest request : requestList) {long newestOffset = maxOffset;// 如果拉取的最大偏移量小于当前拉取的起始偏移量,则从消息存储组件获取最大的偏移量开始拉取if (newestOffset <= request.getPullFromThisOffset()) {newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);}// 匹配消息过滤器if (newestOffset > request.getPullFromThisOffset()) {boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));// match by bit map, need eval again when properties is not null.if (match && properties != null) {match = request.getMessageFilter().isMatchedByCommitLog(null, properties);}if (match) {try {// 重点:如果消息到达filter匹配,则执行挂起请求唤醒服务this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}}// 重点:超时处理,如果超时,则执行挂起请求唤醒服务if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}replayList.add(request);}// 没有执行的请求,重新放回被挂起的拉取请求列表if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}}}
}
唤醒流程二
NotifyMessageArrivingListener 通知消息到达监听器,会监听指定topic-指定queueId的消息变化,通知 pullRequestHoldService 长轮询请求处理服务,实时唤醒挂起的对应请求,进行消息拉取。
// 通知消息到达监听器
public class NotifyMessageArrivingListener implements MessageArrivingListener {private final PullRequestHoldService pullRequestHoldService;public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {this.pullRequestHoldService = pullRequestHoldService;}@Overridepublic void arriving(String topic, int queueId, long logicOffset, long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {// 通知长轮询请求处理服务,消息到达,唤醒挂起的请求this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,msgStoreTime, filterBitMap, properties);}
}
完整源码文件
见资源包