[RocketMQ] Broker 消息重放服务源码解析 (十三)

构建消息文件ConsumeQueue和IndexFile。

  1. ConsumeQueue: 看作是CommitLog的消息偏移量索引文件, 存储了它所属Topic的消息在Commit Log中的偏移量。消费者拉取消息的时候, 可以从Consume Queue中快速的根据偏移量定位消息在Commit Log中的位置。
  2. IndexFile索引文件: 看作是CommitLog的消息时间范围索引文件。IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。

ConsumeQueue和IndexFile, 加快了客户端的消费速度或者是查询效率。

文章目录

      • 1.ReputMessageService消息重放服务
      • 2.doReput执行重放
        • 2.1 isCommitLogAvailable是否需要重放
        • 2.2 getData获取重放数据
          • 2.2.1 selectMappedBuffer截取一段内存
        • 2.3 checkMessageAndReturnSize检查消息并构建请求
        • 2.4 doDispatch分发请求

1.ReputMessageService消息重放服务

ReputMessageService服务将会在循环中异步的每隔1ms对于写入CommitLog的消息进行重放, 将消息构建成为DispatchRequest对象, 然后将DispatchRequest对象分发给各个CommitLogDispatcher处理, 这些CommitLogDispatcher通常会尝试构建ConsumeQueue索引、IndexFile索引以及SQL92布隆过滤器。

在这里插入图片描述

/*** ReputMessageService的方法*/
@Override
public void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");/** 运行时逻辑* 如果服务没有停止,则在死循环中执行重放的操作*/while (!this.isStopped()) {try {//睡眠1msThread.sleep(1);//执行重放this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

每隔1ms执行一次duReput方法, 执行重放方法。

2.doReput执行重放

所谓的重放就是完成诸如ConsumeQueue索引、IndexFile索引、布隆过滤器、唤醒长轮询线程和被hold住的请求等操作。

  1. 如果重放偏移量reputFromOffset小于commitlog的最小物理偏移量, 那么设置为commitlog的最小偏移量, 如果重放偏移量小于commitlog的最大偏移量, 那么循环重放。
  2. 调用getData方法。根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile, 然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer, 这段内存存储着将要重放的消息。
  3. 开始循环读取这段ByteBuffer中的消息, 依次重放。
    1. 如果存在消息, 调用checkMessageAndReturnSize, 检查当前消息的属性并且构建一个DispatchRequest对象返回。
    2. 调用doDispatch方法分发重放请求。
      1. CommitLogDispatcherBuildConsumeQueue: 根据DispatchRequest写ConsumeQueue文件, 构建ConsumeQueue索引。
      2. CommitLogDispatcherBuildIndex: 根据DispatchRequest写indexFile文件, 构建indexFile索引。
      3. CommitLogDispatcherCalcBitMap: 根据DispatchRequest构建布隆过滤器, 加速SQL92过滤效率, 避免每次都解析sql。
    3. 如果broker角色不是SLAVE, 且支持长轮询, 并且消息送达的监听器不为null, 那么通过该监听器的arriving方法触发调用pullRequestHoldService的pullRequestHoldService方法, 唤醒挂起的拉取消息请求, 表示有新的消息落盘, 可以进行拉取了。
    4. 如果读取到MappedFile文件尾, 那么获取下一个文件的起始索引继续重放。
/*** DefaultMessageStore的方法* <p>* 执行重放*/
private void doReput() {//如果重放偏移量reputFromOffset小于commitlog的最小物理偏移量,那么设置为commitlog的最小物理偏移量if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();}/*** 如果重放偏移量小于commitlog的最大物理偏移量,那么循环重放*/for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {//如果消息允许重复复制(默认为 false)并且reputFromOffset大于等于已确定的偏移量confirmOffset,那么结束循环if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}/** 根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile* 然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer,这段内存存储着将要重放的消息*/SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result != null) {try {//将截取的起始物理偏移量设置为重放偏起始移量this.reputFromOffset = result.getStartOffset();/** 开始读取这段ByteBuffer中的消息,依次进行重放*/for (int readSize = 0; readSize < result.getSize() && doNext; ) {//检查消息的属性并且构建一个DispatchRequest对象返回DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);//消息大小,如果是基于Dledger技术的高可用DLedgerCommitLog则取bufferSizeint size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {//如果大小大于0,表示有消息if (size > 0) {/** 分发请求* 1.  CommitLogDispatcherBuildConsumeQueue:根据DispatchRequest写ConsumeQueue文件,构建ConsumeQueue索引。* 2.  CommitLogDispatcherBuildIndex:根据DispatchRequest写IndexFile文件,构建IndexFile索引。* 3.  CommitLogDispatcherCalcBitMap:根据DispatchRequest构建布隆过滤器,加速SQL92过滤效率,避免每次都解析sql。*/DefaultMessageStore.this.doDispatch(dispatchRequest);//如果broker角色不是SLAVE,并且支持长轮询,并且消息送达的监听器不为nullif (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()&& DefaultMessageStore.this.messageArrivingListener != null) {//通过该监听器的arriving方法触发调用pullRequestHoldService的pullRequestHoldService方法//即唤醒挂起的拉取消息请求,表示有新的消息落盘,可以进行拉取了//这里涉及到RocketMQ的consumer消费push模式的实现,后面会专门讲解consumer消费DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());notifyMessageArrive4MultiQueue(dispatchRequest);}//设置重放偏起始移量加上当前消息大小this.reputFromOffset += size;//设置读取的大小加上当前消息大小readSize += size;//如果是SLAVE角色,那么存储数据的统计信息更新if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).add(dispatchRequest.getMsgSize());}} else if (size == 0) {//如果等于0,表示读取到MappedFile文件尾//获取下一个文件的起始索引this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);//设置readSize为0,将会结束循环readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;// If user open the dledger pattern or the broker is master node,// it will not ignore the exception and fix the reputFromOffset variableif (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {//如果重做完毕,则跳出循环doNext = false;}}
}

2.1 isCommitLogAvailable是否需要重放

用于判断CommitLog是否需要执行重放。

/*** ReputMessageService的方法* CommitLog是否需要执行重放*/
private boolean isCommitLogAvailable() {//重放偏移量是否小于commitlog的最大物理偏移量return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}

2.2 getData获取重放数据

根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile, 然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer, 这段内存存储着将要重放的消息。

/*** CommitLog的方法** 获取CommitLog的数据*/
public SelectMappedBufferResult getData(final long offset) {return this.getData(offset, offset == 0);
}public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {//获取CommitLog文件大小,默认1Gint mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();//根据指定的offset从mappedFileQueue中对应的CommitLog文件的MappedFileMappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);if (mappedFile != null) {//通过指定物理偏移量,除以文件大小,得到指定的相对偏移量int pos = (int) (offset % mappedFileSize);//从指定相对偏移量开始截取一段ByteBuffer,这段内存存储着将要重放的消息。SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);return result;}return null;
}
2.2.1 selectMappedBuffer截取一段内存

从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer, 这段内存存储着将要重放的消息。这段ByteBuffer和原mappedByteBuffer共享同一块内存, 但是拥有自己的指针。

然后根据起始物理索引、截取的ByteBuffer、截取的ByteBuffer大小以及当前CommitLog对象构建一个SelectMappedBufferResult对象返回。

/*** MappedFile的方法* @param pos 相对偏移量*/
public SelectMappedBufferResult selectMappedBuffer(int pos) {//获取写入位置,即最大偏移量int readPosition = getReadPosition();//如果指定相对偏移量小于最大偏移量并且大于等于0,那么截取内存if (pos < readPosition && pos >= 0) {if (this.hold()) {//从mappedByteBuffer截取一段内存ByteBuffer byteBuffer = this.mappedByteBuffer.slice();byteBuffer.position(pos);int size = readPosition - pos;ByteBuffer byteBufferNew = byteBuffer.slice();byteBufferNew.limit(size);//根据起始物理索引、新的ByteBuffer、ByteBuffer大小、当前CommitLog对象构建一个SelectMappedBufferResult对象返回return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);}}return null;
}

2.3 checkMessageAndReturnSize检查消息并构建请求

检查这段内存中的下一条消息, 读取消息的各种属性即可, 不需要读取消息body, 根据属性构建一个DispatchRequest对象。

/*** CommitLog的方法** @param byteBuffer 一段内存* @param checkCRC   是否校验CRC* @param readBody   是否读取消息体*/
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,final boolean readBody) {try {// 1 TOTAL SIZE//消息条目总长度int totalSize = byteBuffer.getInt();// 2 MAGIC CODE//消息的magicCode属性,魔数,用来判断消息是正常消息还是空消息int magicCode = byteBuffer.getInt();switch (magicCode) {case MESSAGE_MAGIC_CODE:break;case BLANK_MAGIC_CODE://读取到文件末尾return new DispatchRequest(0, true /* success */);default:log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));return new DispatchRequest(-1, false /* success */);}byte[] bytesContent = new byte[totalSize];//消息体CRC校验码int bodyCRC = byteBuffer.getInt();//消息消费队列idint queueId = byteBuffer.getInt();//消息flagint flag = byteBuffer.getInt();//消息在消息消费队列的偏移量long queueOffset = byteBuffer.getLong();//消息在commitlog中的偏移量long physicOffset = byteBuffer.getLong();//消息系统flag,例如是否压缩、是否是事务消息int sysFlag = byteBuffer.getInt();//消息生产者调用消息发送API的时间戳long bornTimeStamp = byteBuffer.getLong();//消息发送者的IP和端口号ByteBuffer byteBuffer1;if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {byteBuffer1 = byteBuffer.get(bytesContent, 0, 4 + 4);} else {byteBuffer1 = byteBuffer.get(bytesContent, 0, 16 + 4);}//消息存储时间long storeTimestamp = byteBuffer.getLong();//broker的IP和端口号ByteBuffer byteBuffer2;if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {byteBuffer2 = byteBuffer.get(bytesContent, 0, 4 + 4);} else {byteBuffer2 = byteBuffer.get(bytesContent, 0, 16 + 4);}//消息重试次数int reconsumeTimes = byteBuffer.getInt();//事务消息物理偏移量long preparedTransactionOffset = byteBuffer.getLong();//消息体长度int bodyLen = byteBuffer.getInt();if (bodyLen > 0) {//读取消息体if (readBody) {byteBuffer.get(bytesContent, 0, bodyLen);if (checkCRC) {int crc = UtilAll.crc32(bytesContent, 0, bodyLen);if (crc != bodyCRC) {log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);return new DispatchRequest(-1, false/* success */);}}} else {//不需要读取消息体,那么跳过这段内存byteBuffer.position(byteBuffer.position() + bodyLen);}}//Topic名称内容大小byte topicLen = byteBuffer.get();byteBuffer.get(bytesContent, 0, topicLen);//topic的值String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);long tagsCode = 0;String keys = "";String uniqKey = null;//消息属性大小short propertiesLength = byteBuffer.getShort();Map<String, String> propertiesMap = null;if (propertiesLength > 0) {byteBuffer.get(bytesContent, 0, propertiesLength);//消息属性String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);propertiesMap = MessageDecoder.string2messageProperties(properties);keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);//客户端生成的uniqId,也被称为msgId,从逻辑上代表客户端生成的唯一一条消息uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);//tagString tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);//普通消息的tagsCode被设置为tag的hashCodeif (tags != null && tags.length() > 0) {tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);}/** 延迟消息处理* 对于延迟消息,tagsCode被替换为延迟消息的发送时间,主要用于后续判断消息是否到期*/{//消息属性中获取延迟级别DELAY字段,如果是延迟消息则生产者会在构建消息的时候设置进去String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);//如果topic是SCHEDULE_TOPIC_XXXX,即延迟消息的topicif (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {int delayLevel = Integer.parseInt(t);if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();}if (delayLevel > 0) {//tagsCode被替换为延迟消息的发送时间,即真正投递时间tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}}}}//读取的当前消息的大小int readLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength);//不相等则记录BUGif (totalSize != readLength) {doNothingForDeadCode(reconsumeTimes);doNothingForDeadCode(flag);doNothingForDeadCode(bornTimeStamp);doNothingForDeadCode(byteBuffer1);doNothingForDeadCode(byteBuffer2);log.error("[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",totalSize, readLength, bodyLen, topicLen, propertiesLength);return new DispatchRequest(totalSize, false/* success */);}//根据读取的消息属性内容,构建为一个DispatchRequest对象并返回return new DispatchRequest(topic,queueId,physicOffset,totalSize,tagsCode,storeTimestamp,queueOffset,keys,uniqKey,sysFlag,preparedTransactionOffset,propertiesMap);} catch (Exception e) {}//读取异常return new DispatchRequest(-1, false /* success */);
}

2.4 doDispatch分发请求

ReputMessageService服务的核心代码, 循环调用DefaultMessageStore内部的dispatcherList中的CommitLogDispatcher的dispatch方法, 处理这个请求。

  1. CommitLogDispatcherBuildConsumeQueue: 根据DispatchRequest写ConsumeQueue文件, 构建ConsumeQueue索引。
  2. CommitLogDispatcherBuildIndex: 根据DispatchRequest写indexFile文件, 构建indexFile索引。
  3. CommitLogDispatcherCalcBitMap: 根据DispatchRequest构建布隆过滤器, 加速SQL92过滤效率, 避免每次都解析sql。
/*** DefaultMessageStore的方法** @param req 分发请求*/
public void doDispatch(DispatchRequest req) {//循环调用CommitLogDispatcher#dispatch处理for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/20657.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UE4 像素流的一些使用技巧

一、测试像素流的三种方法&#xff0c;前提是熟悉官网像素流送那套流程&#xff0c;这里只是讲如何不用打包就能测试的方法 1.第一种方法是vs安装unrealvs扩展&#xff0c;因为安装这个拓展后加可以加命令行参数启动项目https://docs.unrealengine.com/4.26/zh-CN/ProductionP…

怎么用PDF24 Tools工具在线进行PDF文件合并

PDF文件是经常会被用到&#xff0c;它在我们的日常生活和工作中扮演着重要的角色。PDF文件合并是将多个PDF文件合并为单个文件&#xff0c;这个过程通常是为了方便管理多个PDF文件&#xff0c;或者将多个PDF文件合并为一个整体以便于共享或打印。既然如此&#xff0c;如何快速合…

Spring5学习笔记--详细一文通

Spring5学习笔记--详细一文通 1 Spring 框架概述1.1 Spring 5 简述1.2 Spring5入门案例1.2.1 Spring5下载1.1.2 打开 idea 工具&#xff0c;创建普通 Java 工程1.2.3 导入 Spring5 相关 jar 包1.2.4 创建普通类&#xff0c;在这个类创建普通方法1.2.5 创建 Spring 配置文件&…

迅捷录屏软件使用中的注意事项

取消勾选时就不会出现右侧的悬浮框滑动的窗口了。 取消鼠标高亮后&#xff0c;录制的视频就不会出现一个空心的小圆圈。

实战解决百度旋转验证码

1、效果演示 2、如何识别 2.1准备数据集 首先需要使用爬虫&#xff0c;对验证码图片进行采集&#xff0c;尽量每一种类型都要采集到。 2.2图像矫正 接下来对采集的数据进行人工校正 2.3数据清洗 &#xff08;1&#xff09;对数据进行进行旋转&#xff0c;达到增加数据量的目…

ubuntu RPM should not be used directly install RPM packages, use Alien instead!

ubuntu RPM should not be used directly install RPM packages, use Alien instead! 所以我们最好下载deb版本的安装包 安装 参考文章

多线程设计模式【线程安全、 Future 设计模式、Master-Worker 设计模式 】(一)-全面详解(学习总结---从入门到深化)

目录 Single Thread Execution 设计模式 线程安全 Future 设计模式 Master-Worker 设计模式 生产者消费者设计模式 定义不可变对象的策略 Single Thread Execution 设计模式 机场过安检 Single Thread Execution 模式是指在同一时刻只能有一个线程去访问共享资源&#xff…

Qt提取excel表单中数据

这是一个excel表单&#xff0c;目标是把其中的数据提取出来。 文章学习自&#xff1a;QT中将excel中的数据快速的读取出来显示在tablewidget中/将tablewidget中的数据快速的写入excel中_qt将excel表格中指定范围内容显示在界面中_Jessica_1409573408的博客-CSDN博客 程序如下&…

基于Javaweb实现ATM机系统开发实战(八)实时查询余额功能实现

老规矩&#xff0c;先看前端页面&#xff0c;把前端页面上没有的表达式都删掉&#xff1a; 创建servlet接受和处理请求&#xff1a; package com.atm.servlet;import com.atm.pojo.User; import com.atm.service.UserService; import com.atm.service.impl.UserServiceImpl;im…

Low-Light Image Enhancement via Stage-Transformer-Guided Network 论文阅读笔记

这是TCSVT 2023年的一篇暗图增强的论文 文章的核心思想是&#xff0c;暗图有多种降质因素&#xff0c;单一stage的model难以实现多降质因素的去除&#xff0c;因此需要一个multi-stage的model&#xff0c;文章中设置了4个stage。同时提出了用预设query向量来代表不同的降质因素…

Flink使用总结

本文主要是为Flink的java客户端使用和flink-sql使用的大致介绍&#xff0c;具体使用查看文档页面。 java client使用 文档 Apache Flink Documentation | Apache Flink 数据处理模型 maven依赖 <?xml version"1.0" encoding"UTF-8"?> <pro…

代价函数(Cost Function)

基本概念 代价函数也被称作平方误差函数&#xff0c;有时也被称为平方误差代价函数。我们之所以要求出误差的平方和&#xff0c;是因为误差平方代价函数&#xff0c;对于大多数问题&#xff0c;特别是回归问题&#xff0c;都是一个合理的选择。还有其他的代价函数也能很好地发挥…