构建消息文件ConsumeQueue和IndexFile。
- ConsumeQueue: 看作是CommitLog的消息偏移量索引文件, 存储了它所属Topic的消息在Commit Log中的偏移量。消费者拉取消息的时候, 可以从Consume Queue中快速的根据偏移量定位消息在Commit Log中的位置。
- IndexFile索引文件: 看作是CommitLog的消息时间范围索引文件。IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。
ConsumeQueue和IndexFile, 加快了客户端的消费速度或者是查询效率。
文章目录
- 1.ReputMessageService消息重放服务
- 2.doReput执行重放
- 2.1 isCommitLogAvailable是否需要重放
- 2.2 getData获取重放数据
- 2.2.1 selectMappedBuffer截取一段内存
- 2.3 checkMessageAndReturnSize检查消息并构建请求
- 2.4 doDispatch分发请求
1.ReputMessageService消息重放服务
ReputMessageService服务将会在循环中异步的每隔1ms对于写入CommitLog的消息进行重放, 将消息构建成为DispatchRequest对象, 然后将DispatchRequest对象分发给各个CommitLogDispatcher处理, 这些CommitLogDispatcher通常会尝试构建ConsumeQueue索引、IndexFile索引以及SQL92布隆过滤器。
/*** ReputMessageService的方法*/
@Override
public void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");/** 运行时逻辑* 如果服务没有停止,则在死循环中执行重放的操作*/while (!this.isStopped()) {try {//睡眠1msThread.sleep(1);//执行重放this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
每隔1ms执行一次duReput方法, 执行重放方法。
2.doReput执行重放
所谓的重放就是完成诸如ConsumeQueue索引、IndexFile索引、布隆过滤器、唤醒长轮询线程和被hold住的请求等操作。
- 如果重放偏移量reputFromOffset小于commitlog的最小物理偏移量, 那么设置为commitlog的最小偏移量, 如果重放偏移量小于commitlog的最大偏移量, 那么循环重放。
- 调用getData方法。根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile, 然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer, 这段内存存储着将要重放的消息。
- 开始循环读取这段ByteBuffer中的消息, 依次重放。
- 如果存在消息, 调用checkMessageAndReturnSize, 检查当前消息的属性并且构建一个DispatchRequest对象返回。
- 调用doDispatch方法分发重放请求。
- CommitLogDispatcherBuildConsumeQueue: 根据DispatchRequest写ConsumeQueue文件, 构建ConsumeQueue索引。
- CommitLogDispatcherBuildIndex: 根据DispatchRequest写indexFile文件, 构建indexFile索引。
- CommitLogDispatcherCalcBitMap: 根据DispatchRequest构建布隆过滤器, 加速SQL92过滤效率, 避免每次都解析sql。
- 如果broker角色不是SLAVE, 且支持长轮询, 并且消息送达的监听器不为null, 那么通过该监听器的arriving方法触发调用pullRequestHoldService的pullRequestHoldService方法, 唤醒挂起的拉取消息请求, 表示有新的消息落盘, 可以进行拉取了。
- 如果读取到MappedFile文件尾, 那么获取下一个文件的起始索引继续重放。
/*** DefaultMessageStore的方法* <p>* 执行重放*/
private void doReput() {//如果重放偏移量reputFromOffset小于commitlog的最小物理偏移量,那么设置为commitlog的最小物理偏移量if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();}/*** 如果重放偏移量小于commitlog的最大物理偏移量,那么循环重放*/for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {//如果消息允许重复复制(默认为 false)并且reputFromOffset大于等于已确定的偏移量confirmOffset,那么结束循环if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}/** 根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile* 然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer,这段内存存储着将要重放的消息*/SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result != null) {try {//将截取的起始物理偏移量设置为重放偏起始移量this.reputFromOffset = result.getStartOffset();/** 开始读取这段ByteBuffer中的消息,依次进行重放*/for (int readSize = 0; readSize < result.getSize() && doNext; ) {//检查消息的属性并且构建一个DispatchRequest对象返回DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);//消息大小,如果是基于Dledger技术的高可用DLedgerCommitLog则取bufferSizeint size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {//如果大小大于0,表示有消息if (size > 0) {/** 分发请求* 1. CommitLogDispatcherBuildConsumeQueue:根据DispatchRequest写ConsumeQueue文件,构建ConsumeQueue索引。* 2. CommitLogDispatcherBuildIndex:根据DispatchRequest写IndexFile文件,构建IndexFile索引。* 3. CommitLogDispatcherCalcBitMap:根据DispatchRequest构建布隆过滤器,加速SQL92过滤效率,避免每次都解析sql。*/DefaultMessageStore.this.doDispatch(dispatchRequest);//如果broker角色不是SLAVE,并且支持长轮询,并且消息送达的监听器不为nullif (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()&& DefaultMessageStore.this.messageArrivingListener != null) {//通过该监听器的arriving方法触发调用pullRequestHoldService的pullRequestHoldService方法//即唤醒挂起的拉取消息请求,表示有新的消息落盘,可以进行拉取了//这里涉及到RocketMQ的consumer消费push模式的实现,后面会专门讲解consumer消费DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());notifyMessageArrive4MultiQueue(dispatchRequest);}//设置重放偏起始移量加上当前消息大小this.reputFromOffset += size;//设置读取的大小加上当前消息大小readSize += size;//如果是SLAVE角色,那么存储数据的统计信息更新if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).add(dispatchRequest.getMsgSize());}} else if (size == 0) {//如果等于0,表示读取到MappedFile文件尾//获取下一个文件的起始索引this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);//设置readSize为0,将会结束循环readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;// If user open the dledger pattern or the broker is master node,// it will not ignore the exception and fix the reputFromOffset variableif (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {//如果重做完毕,则跳出循环doNext = false;}}
}
2.1 isCommitLogAvailable是否需要重放
用于判断CommitLog是否需要执行重放。
/*** ReputMessageService的方法* CommitLog是否需要执行重放*/
private boolean isCommitLogAvailable() {//重放偏移量是否小于commitlog的最大物理偏移量return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
2.2 getData获取重放数据
根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile, 然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer, 这段内存存储着将要重放的消息。
/*** CommitLog的方法** 获取CommitLog的数据*/
public SelectMappedBufferResult getData(final long offset) {return this.getData(offset, offset == 0);
}public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {//获取CommitLog文件大小,默认1Gint mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();//根据指定的offset从mappedFileQueue中对应的CommitLog文件的MappedFileMappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);if (mappedFile != null) {//通过指定物理偏移量,除以文件大小,得到指定的相对偏移量int pos = (int) (offset % mappedFileSize);//从指定相对偏移量开始截取一段ByteBuffer,这段内存存储着将要重放的消息。SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);return result;}return null;
}
2.2.1 selectMappedBuffer截取一段内存
从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer, 这段内存存储着将要重放的消息。这段ByteBuffer和原mappedByteBuffer共享同一块内存, 但是拥有自己的指针。
然后根据起始物理索引、截取的ByteBuffer、截取的ByteBuffer大小以及当前CommitLog对象构建一个SelectMappedBufferResult对象返回。
/*** MappedFile的方法* @param pos 相对偏移量*/
public SelectMappedBufferResult selectMappedBuffer(int pos) {//获取写入位置,即最大偏移量int readPosition = getReadPosition();//如果指定相对偏移量小于最大偏移量并且大于等于0,那么截取内存if (pos < readPosition && pos >= 0) {if (this.hold()) {//从mappedByteBuffer截取一段内存ByteBuffer byteBuffer = this.mappedByteBuffer.slice();byteBuffer.position(pos);int size = readPosition - pos;ByteBuffer byteBufferNew = byteBuffer.slice();byteBufferNew.limit(size);//根据起始物理索引、新的ByteBuffer、ByteBuffer大小、当前CommitLog对象构建一个SelectMappedBufferResult对象返回return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);}}return null;
}
2.3 checkMessageAndReturnSize检查消息并构建请求
检查这段内存中的下一条消息, 读取消息的各种属性即可, 不需要读取消息body, 根据属性构建一个DispatchRequest对象。
/*** CommitLog的方法** @param byteBuffer 一段内存* @param checkCRC 是否校验CRC* @param readBody 是否读取消息体*/
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,final boolean readBody) {try {// 1 TOTAL SIZE//消息条目总长度int totalSize = byteBuffer.getInt();// 2 MAGIC CODE//消息的magicCode属性,魔数,用来判断消息是正常消息还是空消息int magicCode = byteBuffer.getInt();switch (magicCode) {case MESSAGE_MAGIC_CODE:break;case BLANK_MAGIC_CODE://读取到文件末尾return new DispatchRequest(0, true /* success */);default:log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));return new DispatchRequest(-1, false /* success */);}byte[] bytesContent = new byte[totalSize];//消息体CRC校验码int bodyCRC = byteBuffer.getInt();//消息消费队列idint queueId = byteBuffer.getInt();//消息flagint flag = byteBuffer.getInt();//消息在消息消费队列的偏移量long queueOffset = byteBuffer.getLong();//消息在commitlog中的偏移量long physicOffset = byteBuffer.getLong();//消息系统flag,例如是否压缩、是否是事务消息int sysFlag = byteBuffer.getInt();//消息生产者调用消息发送API的时间戳long bornTimeStamp = byteBuffer.getLong();//消息发送者的IP和端口号ByteBuffer byteBuffer1;if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {byteBuffer1 = byteBuffer.get(bytesContent, 0, 4 + 4);} else {byteBuffer1 = byteBuffer.get(bytesContent, 0, 16 + 4);}//消息存储时间long storeTimestamp = byteBuffer.getLong();//broker的IP和端口号ByteBuffer byteBuffer2;if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {byteBuffer2 = byteBuffer.get(bytesContent, 0, 4 + 4);} else {byteBuffer2 = byteBuffer.get(bytesContent, 0, 16 + 4);}//消息重试次数int reconsumeTimes = byteBuffer.getInt();//事务消息物理偏移量long preparedTransactionOffset = byteBuffer.getLong();//消息体长度int bodyLen = byteBuffer.getInt();if (bodyLen > 0) {//读取消息体if (readBody) {byteBuffer.get(bytesContent, 0, bodyLen);if (checkCRC) {int crc = UtilAll.crc32(bytesContent, 0, bodyLen);if (crc != bodyCRC) {log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);return new DispatchRequest(-1, false/* success */);}}} else {//不需要读取消息体,那么跳过这段内存byteBuffer.position(byteBuffer.position() + bodyLen);}}//Topic名称内容大小byte topicLen = byteBuffer.get();byteBuffer.get(bytesContent, 0, topicLen);//topic的值String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);long tagsCode = 0;String keys = "";String uniqKey = null;//消息属性大小short propertiesLength = byteBuffer.getShort();Map<String, String> propertiesMap = null;if (propertiesLength > 0) {byteBuffer.get(bytesContent, 0, propertiesLength);//消息属性String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);propertiesMap = MessageDecoder.string2messageProperties(properties);keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);//客户端生成的uniqId,也被称为msgId,从逻辑上代表客户端生成的唯一一条消息uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);//tagString tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);//普通消息的tagsCode被设置为tag的hashCodeif (tags != null && tags.length() > 0) {tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);}/** 延迟消息处理* 对于延迟消息,tagsCode被替换为延迟消息的发送时间,主要用于后续判断消息是否到期*/{//消息属性中获取延迟级别DELAY字段,如果是延迟消息则生产者会在构建消息的时候设置进去String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);//如果topic是SCHEDULE_TOPIC_XXXX,即延迟消息的topicif (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {int delayLevel = Integer.parseInt(t);if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();}if (delayLevel > 0) {//tagsCode被替换为延迟消息的发送时间,即真正投递时间tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}}}}//读取的当前消息的大小int readLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength);//不相等则记录BUGif (totalSize != readLength) {doNothingForDeadCode(reconsumeTimes);doNothingForDeadCode(flag);doNothingForDeadCode(bornTimeStamp);doNothingForDeadCode(byteBuffer1);doNothingForDeadCode(byteBuffer2);log.error("[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",totalSize, readLength, bodyLen, topicLen, propertiesLength);return new DispatchRequest(totalSize, false/* success */);}//根据读取的消息属性内容,构建为一个DispatchRequest对象并返回return new DispatchRequest(topic,queueId,physicOffset,totalSize,tagsCode,storeTimestamp,queueOffset,keys,uniqKey,sysFlag,preparedTransactionOffset,propertiesMap);} catch (Exception e) {}//读取异常return new DispatchRequest(-1, false /* success */);
}
2.4 doDispatch分发请求
ReputMessageService服务的核心代码, 循环调用DefaultMessageStore内部的dispatcherList中的CommitLogDispatcher的dispatch方法, 处理这个请求。
- CommitLogDispatcherBuildConsumeQueue: 根据DispatchRequest写ConsumeQueue文件, 构建ConsumeQueue索引。
- CommitLogDispatcherBuildIndex: 根据DispatchRequest写indexFile文件, 构建indexFile索引。
- CommitLogDispatcherCalcBitMap: 根据DispatchRequest构建布隆过滤器, 加速SQL92过滤效率, 避免每次都解析sql。
/*** DefaultMessageStore的方法** @param req 分发请求*/
public void doDispatch(DispatchRequest req) {//循环调用CommitLogDispatcher#dispatch处理for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);}
}