RocketMQ源码 Broker-PullRequestHoldService 长轮询消息拉取组件源码分析

前言

PullRequestHoldService 继承了ServiceThread类,它本身是一个线程,以后台方式无线循环运行,支持长轮询(默认5秒)和短轮询(默认1秒)两种方式(CountDownlatch 方式控制)控制线程执行间隔。5秒钟后,线程内部会检查被挂起的请求(消费者建立连接后,会立即执行一次消息拉取服务,如果能拉到消息直接返回相应,如果拉不到,就会被挂起到 PullMessHoldService 服务),通知消息到达,调用 PullMessageProcessor 消息拉取处理组件,再进行一次拉取处理,如果能拉到消息,就直接返回,如果拉取不到再次挂起。

除了以上方式唤醒被挂起的消息拉取请求,还有一个 NotifyMessageArrivingListener 消息到达监听器,可以监听 topic-queueId-tagCode 维度的消息,进行通知唤醒被挂起的线程。

本次涉及三个核心组件:

  1. PullRequestHoldService 长轮询消息拉取组件;
  2. PullMessageProcessor 拉取消息处理组件;
  3. NotifyMessageArrivingListener 消息到达监听组件;

源码版本:4.9.3

源码架构图

核心数据结构

长轮询消息拉取组件数据结构,维护了一个拉取请求处理列表:Map<topic@queueId, 挂起的请求列表> pullRequestTable 。

public class PullRequestHoldService extends ServiceThread {// 系统时钟private final SystemClock systemClock = new SystemClock();// 长轮询消息拉取请求表,key为topic@queueId,value为PullRequest集合protected ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =new ConcurrentHashMap<String, ManyPullRequest>(1024);
}

pullRequestTable 拉取请求处理列表的 value对象(ManyPullRequest)维护的核心数据结构是被挂起的请求列表。

public class ManyPullRequest {// 拉取请求列表private final ArrayList<PullRequest> pullRequestList = new ArrayList<>();
}

核心行为流程

唤醒流程一

PullMessageProcessor 接受网络通信请求,处理消息拉取请求。从messageStore 消息存储组件拉取消息,如果拉不到就将请求,挂起到pullRequestHoldService长轮询消息拉取组件。

// pull方式拉取消息处理器组件
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {....private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException {// 一堆校验...// 关键点:从消息存储组件拉取消息MessageStorefinal GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);....// 处理查询结果,没有拉到消息时,将请求挂起到hold服务case ResponseCode.PULL_NOT_FOUND:if (brokerAllowSuspend && hasSuspendFlag) {...
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}}...}

PullRequestHoldService 长轮询消息拉取组件,接受挂起的请求,维护在内存中。且以后台线程方式,轮询pullRequestTable,唤醒所有请求,尝试拉取消息。

// 长轮询消息拉取组件
public class PullRequestHoldService extends ServiceThread {// 长轮询消息拉取请求表,key为topic@queueId,value为PullRequest集合protected ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =new ConcurrentHashMap<String, ManyPullRequest>(1024);// 如果没有拉取到请求,就挂起拉取请求,等待当前线程长轮询处理public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (null == mpr) {mpr = new ManyPullRequest();ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);if (prev != null) {mpr = prev;}}mpr.addPullRequest(pullRequest);}@Overridepublic void run() {log.info("{} service started", this.getServiceName());// 循环,处理被挂起的拉取请求while (!this.isStopped()) {try {// 长轮询,默认5秒if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {this.waitForRunning(5 * 1000);} else {// 短轮询,默认1秒this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}long beginLockTimestamp = this.systemClock.now();// 处理被挂起的拉取请求this.checkHoldRequest();long costTime = this.systemClock.now() - beginLockTimestamp;if (costTime > 5 * 1000) {log.info("[NOTIFYME] check hold request cost {} ms.", costTime);}} catch (Throwable e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info("{} service end", this.getServiceName());}protected void checkHoldRequest() {// 遍历被挂起的拉取请求列表for (String key : this.pullRequestTable.keySet()) {String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);if (2 == kArray.length) {String topic = kArray[0];int queueId = Integer.parseInt(kArray[1]);// 从消息存储组件获取当前队列消费的最大偏移量final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);try {// 通知消息消费者this.notifyMessageArriving(topic, queueId, offset);} catch (Throwable e) {log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);}}}}public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);}public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {String key = this.buildKey(topic, queueId);// 获取指定topic-queueId被挂起的拉取请求列表ManyPullRequest mpr = this.pullRequestTable.get(key);if (mpr != null) {// 克隆被挂起的拉取请求列表List<PullRequest> requestList = mpr.cloneListAndClear();if (requestList != null) {List<PullRequest> replayList = new ArrayList<PullRequest>();// 遍历所有请求for (PullRequest request : requestList) {long newestOffset = maxOffset;// 如果拉取的最大偏移量小于当前拉取的起始偏移量,则从消息存储组件获取最大的偏移量开始拉取if (newestOffset <= request.getPullFromThisOffset()) {newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);}// 匹配消息过滤器if (newestOffset > request.getPullFromThisOffset()) {boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));// match by bit map, need eval again when properties is not null.if (match && properties != null) {match = request.getMessageFilter().isMatchedByCommitLog(null, properties);}if (match) {try {// 重点:如果消息到达filter匹配,则执行挂起请求唤醒服务this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}}// 重点:超时处理,如果超时,则执行挂起请求唤醒服务if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}replayList.add(request);}// 没有执行的请求,重新放回被挂起的拉取请求列表if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}}}
}

唤醒流程二

NotifyMessageArrivingListener 通知消息到达监听器,会监听指定topic-指定queueId的消息变化,通知 pullRequestHoldService 长轮询请求处理服务,实时唤醒挂起的对应请求,进行消息拉取。

// 通知消息到达监听器
public class NotifyMessageArrivingListener implements MessageArrivingListener {private final PullRequestHoldService pullRequestHoldService;public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {this.pullRequestHoldService = pullRequestHoldService;}@Overridepublic void arriving(String topic, int queueId, long logicOffset, long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {// 通知长轮询请求处理服务,消息到达,唤醒挂起的请求this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,msgStoreTime, filterBitMap, properties);}
}

完整源码文件

见资源包

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/283346.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安卓端出现https请求失败(转)

背景# 某天早上&#xff0c;正在一个会议时&#xff0c;突然好几个同事被叫出去了&#xff1b;后面才知道&#xff0c;是有业务同事反馈到领导那里&#xff0c;我们app里面某个功能异常。 具体是这样&#xff0c;我们安卓版本的app是禁止截屏的&#xff08;应该是app里做了拦…

Oracle-应用会话集中在RAC集群一个节点问题

问题&#xff1a; 用户一套Oracle19c RAC集群&#xff0c;出现一个奇怪的现象&#xff0c;通过SCAN IP访问的连接会话都集中在节点一实例&#xff0c;而且用户并没有做任何的节点服务访问去控制会话的连接节点&#xff0c;比如常见的通过集群的高可用服务去控制应用访问连接集中…

【BIG_FG_CSDN】*VMware17pro*Linux*Redhit6网络管理(个人向——学习笔记)

物理机中的网络 查看物理网络的方法 “网络连接”—>单点选中网络的选项-->菜单栏中“查看此连接状态”-->“详细信息” “网络连接”中的VM网卡 在主机上对应的有VMware Network Adapter VMnet1和VMware Network Adapter VMnet8两块虚拟网卡&#xff0c;它们分别…

【SpringMVC】SpringMVC简介、过程分析、bean的加载和控制

文章目录 1. SpringMVC简介2. SpringMVC入门案例文件结构第一步&#xff1a;坐标导入第二步&#xff1a;创建SpringMVC容器的控制器类第三步&#xff1a;初始化SpringMVC环境&#xff0c;设定Spring加载对应的bean第四步&#xff1a;初始化Servlet容器&#xff0c;加载SpringMV…

【数据结构】(堆)Top-k|堆排序

目录 概念&#xff1a; 堆的实现 构建 初始化 销毁 插入元素 往上调整 删除堆顶元素 往下调整 返回堆顶元素 返回有效个数 是否为空 堆排序 Top-k问题 ​编辑 创建数据 堆top-k 概念&#xff1a; 堆是将数据按照完全二叉树存储方式存储到一维数组中&#xff…

C语言使用posix正则表达式库

在C语言中&#xff0c;你可以使用 POSIX 正则表达式库&#xff08;regex.h&#xff09;来进行正则表达式的模式匹配。POSIX 正则表达式库提供了一组函数来编译、执行和释放正则表达式。 下面是使用 POSIX 正则表达式库的基本步骤&#xff1a; 包含头文件 <regex.h>&…

Spring Boot 3 + Vue 3 整合 WebSocket (STOMP协议) 实现实时通信

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

【MySQL】数据库基础入门 安装MySQL

目录 介绍&#xff1a; 安装MySQL: 设置 root 账号密码 2.配置环境变量 2.找到 Path 系统变量, 点击 "编辑" 介绍&#xff1a; MySQL是一个开源的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;它是一种用于管理和存储数据的软件。 安装MySQL: …

前端登录界面网站设计模板--HTML+CSS

🎀登录表单 💖效果展示 💖HTML代码展示 <!DOCTYPE html> <html lang="en" > <head></

Linux驱动(中断、异步通知):红外对射,并在Qt StatusBus使用指示灯进行显示

本文工作&#xff1a; 1、Linux驱动与应用程序编写&#xff1a;使用了设备树、中断、异步通知知识点&#xff0c;实现了红外对射状态的异步信息提醒。 2、QT程序编写&#xff1a;自定义了一个“文本指示灯”类&#xff0c;并放置在QMainWidget的StatusBus中。 3、C与C混合编程与…

[Verilog] Verilog 数据类型

主页&#xff1a; 元存储博客 文章目录 前言1. bit 类型2. reg 类型3 wire类型4 integer类型5 real类型6 parameter类型7 enum类型8 array 类型9 向量类型10 time 类型11 string 类型 前言 在 Verilog 中&#xff0c;有几种不同的数据类型可以用于声明和操作变量。 在 Verilo…

Missing artifact org.wltea.analyzer:ik-analyzer:jar:5.0

没有找到【org.wltea.analyzer】 找到了【org.wltea.ik-analyzer】 https://github.com/wks/ik-analyzer https://github.com/wks/ik-analyzer.git https://code.google.com/archive/p/ik-analyzer/downloads?page2 C:\Users\Administrator\Desktop\ik-analyzer-master>m…