[RocketMQ] Broker asyncPutMessage处理消息以及存储的高性能设计措施 (十一)

asyncPutMessage方法真正的用来存储消息。

文章目录

      • 1.asyncPutMessage存储普通消息
        • 1.1 checkStoreStatus检查存储状态
        • 1.2 checkMessage检查消息
      • 2.CommitLog#asyncPutMessage异步存储消息
        • 2.1 处理延迟消息
        • 2.2 获取最新mappedFile
          • 2.2.1 tryCreateMappedFile创建新的MappedFile
          • 2.2.2 putRequestAndReturnMappedFile异步创建MappedFile
          • 2.2.3 AllocateMappedFileService创建MappedFile
          • 2.2.4 mmapOperation执行mmp操作
            • 2.2.4.1 mmap方法
            • 2.2.4.2 采用堆外内存
          • 2.2.5 warmMappedFile文件预热
          • 2.2.6 mlock锁定内存
        • 2.3 appendMessage追加存储消息
          • 2.3.1 doAppend执行追加
          • 2.3.2 消息序列化
      • 3.存储高性能设计总结

1.asyncPutMessage存储普通消息

DefaultMessageStore#asyncPutMessage()

  1. checkStoreStatus, checkMessage, checkLmqMessage校验。
  2. CommitLog#asyncPutMessage存储消息, 更新耗时时间和失败次数。

在这里插入图片描述

    @Overridepublic CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {PutMessageStatus checkStoreStatus = this.checkStoreStatus();if (checkStoreStatus != PutMessageStatus.PUT_OK) {return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));}PutMessageStatus msgCheckStatus = this.checkMessage(msg);if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));}long beginTime = this.getSystemClock().now();CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);putResultFuture.thenAccept((result) -> {long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().add(1);}});return putResultFuture;}

1.1 checkStoreStatus检查存储状态

  1. 如果DefaultMessageStore是shutdown状态, 返回SERVICE_NOT_AVAILABLE。
  2. 如果broker是SLAVE角色, 返回SERVICE_NOT_AVAILABLE, 不能将消息写入SLAVE角色。
  3. 如果不支持写入, 返回SERVICE_NOT_AVAILABLE, 可能因为broker的磁盘已满、写入逻辑队列错误、写入索引文件错误等等原因。‘
  4. 如果操作系统页缓存繁忙, 返回OS_PAGECACHE_BUSY, 如果broker持有锁的时间超过osPageCacheBusyTimeOutMills, 算作操作系统页缓存繁忙。
  5. 最后返回PUT_OK, 表示可用存储消息。

在这里插入图片描述

/*** DefaultMessageStore的方法* <p>* 检查存储状态*/
private PutMessageStatus checkStoreStatus() {//如果DefaultMessageStore是shutdown状态,返回SERVICE_NOT_AVAILABLEif (this.shutdown) {log.warn("message store has shutdown, so putMessage is forbidden");return PutMessageStatus.SERVICE_NOT_AVAILABLE;}//如果broker是SLAVE角色,则返回SERVICE_NOT_AVAILABLE,不能将消息写入SLAVE角色if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("broke role is slave, so putMessage is forbidden");}return PutMessageStatus.SERVICE_NOT_AVAILABLE;}//如果不支持写入,那么返回SERVICE_NOT_AVAILABLE//可能因为broker的磁盘已满、写入逻辑队列错误、写入索引文件错误等等原因if (!this.runningFlags.isWriteable()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("the message store is not writable. It may be caused by one of the following reasons: " +"the broker's disk is full, write to logic queue error, write to index file error, etc");}return PutMessageStatus.SERVICE_NOT_AVAILABLE;} else {this.printTimes.set(0);}//如果操作系统页缓存繁忙,则返回OS_PAGECACHE_BUSY//如果broker持有锁的时间超过osPageCacheBusyTimeOutMills,则算作操作系统页缓存繁忙if (this.isOSPageCacheBusy()) {return PutMessageStatus.OS_PAGECACHE_BUSY;}//返回PUT_OK,表示可以存储消息return PutMessageStatus.PUT_OK;
}

1.2 checkMessage检查消息

  1. 如果topic长度大于127, 返回MESSAGE_ILLEGAL。
  2. 如果设置的属性长度大于32767, 返回MESSAGE_ILLEGAL, properties过长。
  3. 返回PUT_OK, 检查通过。

在这里插入图片描述

/*** DefaultMessageStore的方法* <p>* 检查消息*/
private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {//如果topic长度大于127,则返回MESSAGE_ILLEGAL,表示topic过长了if (msg.getTopic().length() > Byte.MAX_VALUE) {log.warn("putMessage message topic length too long " + msg.getTopic().length());return PutMessageStatus.MESSAGE_ILLEGAL;}//如果设置的属性长度大于32767,则返回MESSAGE_ILLEGAL,表示properties过长了if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());return PutMessageStatus.MESSAGE_ILLEGAL;}return PutMessageStatus.PUT_OK;
}

2.CommitLog#asyncPutMessage异步存储消息

在这里插入图片描述

  1. 处理延迟消息的逻辑。
    1. 如果是延迟消息, 即DelayTimeLevel大于0, 替换topic为SCHEDULE_TOPIC_XXXX, 替换queueId为延迟队列id, id = level - 1, 如果延迟级别大于最大级别, 则设置为最大级别18, 默认延迟2h, 这些参数可以在broker端配置类MessageStoreConfig中配置。
    2. 最后保存真实topic到消息的REAL_TOPIC属性, 保存queueId到消息的REAL_QID属性, 方便后面恢复。
  2. 消息编码。获取线程的本地变量, 包含一个线程独立的encoder和keyBuilder对象。将消息内容编码, 存储到encoder中的encoderBuffer中, 它是通过ByteBuffer.allocateDirect(size)得到的一个直接缓冲区。消息写入之后, 调用encoderBuffer.flip(), 将Buffer从写模式切换到读模式, 可以读取到数据。
  3. 加锁并写入消息。
    1. 一个broker将所有的消息都追加到同一个逻辑CommitLog日志文件中, 需要通过获取putMessageLock锁来控制并发。一种锁是ReentrantLock可重入锁, 一种是CAS锁, 根据StoreConfig的useReentrantLockWhenPutMessage决定是否是ReentrantLock锁, 默认为true, 使用ReentrantLock。
    2. 从mappedFileQueue中的mappedFiles集合中获取最后一个MappedFile。如果最新的mappedFile为null, 或者mappedFile满了, 会新建mappedFile。
    3. 通过mappedFile调用appendMessage方法追加消息, 仅仅是追加消息到byteBuffer的内存中。如果是writeBuffer则表示消息写入了堆外内存中, 如果是mappedByteBuffer, 则表示消息写入了page chache中。
    4. 追加成功之后解锁。如果是剩余空间不足, 则会重新初始化一个MappedFile并再次尝试追加。
  4. 如果存在写满的MappedFile并且启用了文件内存预热, 那么这里调用unlockMappedFile对MappedFile执行解锁。
  5. 更新消息统计信息。随后调用submitFlushRequest方法提交刷盘请求, 将会根据刷盘策略进行刷盘。随后调用submitReplicaRequest方法提交副本请求, 用于主从主从同步。
    在这里插入图片描述
/*** CommitLog的方法* <p>* 异步存储消息** @param msg* @return*/public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {// Set the storage time//设置存储时间msg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)//设置消息正文CRCmsg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();
//        int queueId msg.getQueueId();/** 1 处理延迟消息的逻辑** 替换topic和queueId,保存真实topic和queueId*///根据sysFlag获取事务状态,普通消息的sysFlag为0final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());//如果不是事务消息,或者commit提交事务小iif (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery//获取延迟级别,判断是否是延迟消息if (msg.getDelayTimeLevel() > 0) {//如果延迟级别大于最大级别,则设置为最大级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}//获取延迟队列的topic,固定为 SCHEDULE_TOPIC_XXXXtopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;//根据延迟等级获取对应的延迟队列id, id = level - 1int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId//使用扩展属性REAL_TOPIC 记录真实topicMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());//使用扩展属性REAL_QID 记录真实queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));//更改topic和queueId为延迟队列的topic和queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}}//发送消息的地址InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}//存储消息的地址InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}/** 2 消息编码*///获取线程本地变量,其内部包含一个线程独立的encoder和keyBuilder对象PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();//将消息内容编码,存储到encoder内部的encoderBuffer中,它是通过ByteBuffer.allocateDirect(size)得到的一个直接缓冲区//消息写入之后,会调用encoderBuffer.flip()方法,将Buffer从写模式切换到读模式,可以读取到数据PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}//编码后的encoderBuffer暂时存入msg的encodedBuff中msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);//存储消息上下文PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));/** 3 加锁并写入消息* 一个broker将所有的消息都追加到同一个逻辑CommitLog日志文件中,因此需要通过获取putMessageLock锁来控制并发。*///持有锁的时间long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;/** 有两种锁,一种是ReentrantLock可重入锁,另一种spin则是CAS锁* 根据StoreConfig的useReentrantLockWhenPutMessage决定是否使用可重入锁,默认为true,使用可重入锁。*/putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {/** 从mappedFileQueue中的mappedFiles集合中获取最后一个MappedFile*/MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();//加锁后的起始时间long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// global//设置存储的时间戳为加锁后的起始时间,保证有序msg.setStoreTimestamp(beginLockTimestamp);/** 如果最新mappedFile为null,或者mappedFile满了,那么会新建mappedFile并返回*/if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}/**  追加存储消息*/result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE://文件剩余空间不足,那么初始化新的文件并尝试再次存储unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}//加锁的持续时间elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;} finally {//重置开始时间,释放锁beginTimeInLock = 0;putMessageLock.unlock();}if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);}//如果存在写满的MappedFile并且启用了文件内存预热,那么这里对MappedFile执行解锁if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// Statistics//存储数据的统计信息更新storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());/** 4 提交刷盘请求,将会根据刷盘策略进行刷盘*/CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);/** 5 提交副本请求,用于主从同步*/CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;});}

2.1 处理延迟消息

在这里插入图片描述

  • 如果DelayTimeLevel大于0, 则表示延迟消息, topic换为"SCHEDULE_TOPIC_XXXX", queueId为延迟队列id, id = level - 1, 保存真实topic到消息的REAL_TOPIC属性, 保存queueId到消息的REAL_QID属性。
/** 1 处理延迟消息的逻辑** 替换topic和queueId,保存真实topic和queueId*/
//根据sysFlag获取事务状态,普通消息的sysFlag为0
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
//如果不是事务消息,或者commit提交事务消息
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery//获取延迟级别,判断是否是延迟消息if (msg.getDelayTimeLevel() > 0) {//如果延迟级别大于最大级别,则设置为最大级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}//获取延迟队列的topic,固定为 SCHEDULE_TOPIC_XXXXtopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;//根据延迟等级获取对应的延迟队列id, id = level - 1int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId//使用扩展属性REAL_TOPIC 记录真实topicMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());//使用扩展属性REAL_QID 记录真实queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));//更改topic和queueId为延迟队列的topic和queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}
}

2.2 获取最新mappedFile

在这里插入图片描述

  • 从mappedFileQueue中的mappedFiles集合中获取最后一个MappedFile。
/*** MappedFileQueue的方法* <p>* 获取最新的MappedFile*/
public MappedFile getLastMappedFile() {MappedFile mappedFileLast = null;while (!this.mappedFiles.isEmpty()) {try {//从mappedFiles中获取最后一个mappedFilemappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast;
}
  • 最新mappedFile为null, 或者mappedFile满了, 创建mappedFile。
/*** MappedFileQueue的方法* <p>* 创建新的MappedFile** @param startOffset 指定起始offset*/
public MappedFile getLastMappedFile(final long startOffset) {return getLastMappedFile(startOffset, true);
}/*** MappedFileQueue的方法* <p>* 创建或者获取最新的MappedFile** @param startOffset 起始offset* @param needCreate  是否创建*/
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {long createOffset = -1;//从mappedFiles集合中获取最后一个MappedFileMappedFile mappedFileLast = getLastMappedFile();//如果为null,那么设置创建索引,默认为0,即新建的文件为第一个mappedFile文件,从0开始if (mappedFileLast == null) {createOffset = startOffset - (startOffset % this.mappedFileSize);}//如果满了,那么设置新mappedFile文件的创建索引 = 上一个文件的起始索引(即文件名) + mappedFileSizeif (mappedFileLast != null && mappedFileLast.isFull()) {createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}//如果需要创建新mappedFile,那么根据起始索引创建新的mappedFileif (createOffset != -1 && needCreate) {return tryCreateMappedFile(createOffset);}return mappedFileLast;
}
2.2.1 tryCreateMappedFile创建新的MappedFile

在这里插入图片描述

在这里插入图片描述

  • 获取下两个MappedFile的路径nextFilePath和nextNextFilePath, 然后调用doCreateMappedFile真正创建, 一次请求对于2个mappedFile, 2个commitlog。

  • commitlog文件预创建或者文件预分配, 如果启用了MappedFile预分配服务, 那么在创建MappedFile时会同时创建两个MappedFile, 一个用于同步创建并返回用于本次实际使用, 一个后台异步创建用于下次取用。避免等到当前文件真正用完了才创建下一个文件, 提供性能。

/*** MappedFileQueue的方法* <p>* 创建commitlog文件,映射MappedFile** @param createOffset 起始索引,即新文件的文件名*/
protected MappedFile tryCreateMappedFile(long createOffset) {//下一个文件路径 {storePathCommitLog}/createOffset,即文件名为createOffset,即起始物理offsetString nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);//下下一个文件路径 {storePathCommitLog}/createOffset+mappedFileSize,即文件名为createOffset + mappedFileSize,即起始offsetString nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset+ this.mappedFileSize);//真正创建文件return doCreateMappedFile(nextFilePath, nextNextFilePath);
}

在这里插入图片描述

判断如果allocateMappedFileService不为null, 那么异步的创建MappedFile, 否则同步创建MappedFile。

/*** MappedFileQueue的方法* <p>* 创建commitlog文件,映射MappedFile** @param nextFilePath     要创建的下一个文件路径* @param nextNextFilePath 要创建的下下一个文件路径*/
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {MappedFile mappedFile = null;//如果allocateMappedFileService不为null,那么异步的创建MappedFile//CommitLog的MappedFileQueue初始化时会初始化allocateMappedFileService,因此一般都不为nullif (this.allocateMappedFileService != null) {//添加两个请求到处理任务池,然后阻塞等待异步创建默认1G大小的MappedFilemappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {//同步创建MappedFilemappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error("create mappedFile exception", e);}}if (mappedFile != null) {//如果是第一次创建,那么设置标志位firstCreateInQueue为trueif (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}//将创建的mappedFile加入mappedFiles集合中this.mappedFiles.add(mappedFile);}return mappedFile;
}
2.2.2 putRequestAndReturnMappedFile异步创建MappedFile

在这里插入图片描述

  • MappedFile作为一个RocketMQ的物理文件在Java中的映射类。commitLog consumerQueue、indexFile3种文件磁盘的读写都是通过MappedFile操作的。它的构造器中会对当前文件进行mmap内存映射操作。

  • putRequestAndReturnMappedFile用于创建MappedFile, 会同时创建两个MappedFile, 一个同步创建并返回用于本次实际使用, 一个后台异步创建用于下次取用。可以避免等到当前文件真正用完了才创建下一个文件, 提升性能。

  • 同步和异步实际上都是通过一个服务线程执行的, 该方法只是提交两个映射文件创建请求AllocateRequest, 提交到requestTable和requestQueue中。随后当前线程只会同步等待第一个映射文件的创建, 最多等待5s, 如果创建成功则返回, 较大的offset那一个映射文件则会异步的创建, 不会等待。

  • 这里线程等待使用的是倒计数器CountDownLatch, 一个请求一个AllocateRequest对象, 其内部还持有一个CountDownLatch对象, 当请求对应的MappedFile创建之后, 会调用内部的CountDownLatch#countDown, 唤醒等待的线程。

/*** AllocateMappedFileService的方法* 添加两个请求到处理任务池,然后阻塞等待异步创建并返回MappedFile** @param nextFilePath* @param nextNextFilePath* @param fileSize 文件大小默认1G* @return*/
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {//可以提交的请求int canSubmitRequests = 2;//如果当前节点不是从节点,并且是异步刷盘策略,并且transientStorePoolEnable参数配置为true,并且fastFailIfNoBufferInStorePool为true//那么重新计算最多可以提交几个文件创建请求if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in poolcanSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();}}//根据nextFilePath创建一个请求对象,并将请求对象存入requestTable这个map集合中AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;//如果存入成功if (nextPutOK) {if (canSubmitRequests <= 0) {log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());this.requestTable.remove(nextFilePath);return null;}//将请求存入requestQueue中boolean offerOK = this.requestQueue.offer(nextReq);if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");}//可以提交的请求数量自减canSubmitRequests--;}//根据nextNextFilePath创建另一个请求对象,并将请求对象存入requestTable这个map集合中AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;if (nextNextPutOK) {if (canSubmitRequests <= 0) {log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());this.requestTable.remove(nextNextFilePath);} else {//将请求存入requestQueue中boolean offerOK = this.requestQueue.offer(nextNextReq);if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");}}}//有异常就直接返回if (hasException) {log.warn(this.getServiceName() + " service has exception. so return null");return null;}//获取此前存入的nextFilePath对应的请求AllocateRequest result = this.requestTable.get(nextFilePath);try {if (result != null) {//同步等待最多5sboolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);if (!waitOK) {//超时log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());return null;} else {//如果nextFilePath对应的MappedFile创建成功,那么从requestTable移除对应的请求this.requestTable.remove(nextFilePath);//返回创建的mappedFilereturn result.getMappedFile();}} else {log.error("find preallocate mmap failed, this never happen");}} catch (InterruptedException e) {log.warn(this.getServiceName() + " service has exception. ", e);}return null;
}
2.2.3 AllocateMappedFileService创建MappedFile

在这里插入图片描述

  • AllocateMappedFileService继承了ServiceThread, ServiceThread实现了Runnable接口。
/*** ServiceThread的方法* 启动一个线程执行线程任务*/
public void start() {log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);//只能启动一次if (!started.compareAndSet(false, true)) {return;}stopped = false;//新建线程this.thread = new Thread(this, getServiceName());//后台线程this.thread.setDaemon(isDaemon);//启动线程this.thread.start();
}

AllocateMappedFileService#run()\

/*** AllocateMappedFileService的方法* 创建mappedFile*/
public void run() {log.info(this.getServiceName() + " service started");//死循环//如果服务没有停止,并且没有被线程中断,那么一直循环执行mmapOperation方法while (!this.isStopped() && this.mmapOperation()) {}log.info(this.getServiceName() + " service end");
}
2.2.4 mmapOperation执行mmp操作

在这里插入图片描述

创建MappedFile:

  1. 从requestQueue中获取优先级最高的一个请求, 即文件名最小或者说起始offset最小的请求。requestQueue是一个优先级队列。
  2. 判断是否需要通过堆外内存创建MappedFile, 如果当前节点不是从节点, 而且是异步刷盘策略, transientStorePoolEnable参数为true, 那么使用堆外内存, 默认不使用。
    1. RocketMQ中引入的 transientStorePoolEnable 能缓解 pagecache 的压力, 原理是基于DirectByteBuffer和MappedByteBuffer的读写分离。
    2. 消息先写入DirectByteBuffer(堆外内存), 随后从MappedByteBuffer(pagecache)读取。
  3. 如果没有启动堆外内存, 采用普通方式创建mappedFile, 使用mmap操作。
  4. 如果mappedFile大小大于等于1G并且warmMapedFileEnable参数为true, 那么预热, 就是所谓的内存预热或者文件预热。注意warmMapedFileEnable参数默认为false, 默认不开启预热。
  5. 创建成功后, 将请求对象中的countDownLatch释放计数, 可以唤醒putRequestAndReturnMappedFile方法中阻塞的线程。
/*** AllocateMappedFileService的方法* <p>* mmap 操作,只有被外部线程中断,才会返回false*/
private boolean mmapOperation() {boolean isSuccess = false;AllocateRequest req = null;try {//从requestQueue中获取优先级最高的一个请求,即文件名最小或者说起始offset最小的请求//requestQueue是一个优先级队列req = this.requestQueue.take();//从requestTable获取对应的请求AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());if (null == expectedRequest) {log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "+ req.getFileSize());return true;}if (expectedRequest != req) {log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);return true;}//获取对应的mappedFile,如果为null则创建if (req.getMappedFile() == null) {//起始时间long beginTime = System.currentTimeMillis();MappedFile mappedFile;//如果当前节点不是从节点,并且是异步刷盘策略,并且transientStorePoolEnable参数配置为true,那么使用堆外内存,默认不使用//RocketMQ中引入的 transientStorePoolEnable 能缓解 pagecache 的压力,其原理是基于DirectByteBuffer和MappedByteBuffer的读写分离//消息先写入DirectByteBuffer(堆外内存),随后从MappedByteBuffer(pageCache)读取。if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {try {//可以基于SPI机制获取自定义的MappedFilemappedFile = ServiceLoader.load(MappedFile.class).iterator().next();//初始化mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());} catch (RuntimeException e) {log.warn("Use default implementation.");mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());}} else {//普通方式创建mappedFile,并且进行mmapmappedFile = new MappedFile(req.getFilePath(), req.getFileSize());}//创建mappedFile消耗的时间long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);if (elapsedTime > 10) {int queueSize = this.requestQueue.size();log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize+ " " + req.getFilePath() + " " + req.getFileSize());}// pre write mappedFile//如果mappedFile大小大于等于1G并且warmMapedFileEnable参数为true,那么预写mappedFile,也就是所谓的内存预热或者文件预热//注意warmMapedFileEnable参数默认为false,即默认不开启文件预热,因此选哟手动开启if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog()&&this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {//预热文件mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());}req.setMappedFile(mappedFile);this.hasException = false;isSuccess = true;}} catch (InterruptedException e) {log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");this.hasException = true;return false;} catch (IOException e) {log.warn(this.getServiceName() + " service has exception. ", e);this.hasException = true;if (null != req) {requestQueue.offer(req);try {Thread.sleep(1);} catch (InterruptedException ignored) {}}} finally {//如果创建成功,那么将请求对象中的countDownLatch释放计数,这样就可以唤醒在putRequestAndReturnMappedFile方法中被阻塞的线程了if (req != null && isSuccess)req.getCountDownLatch().countDown();}return true;
}
2.2.4.1 mmap方法

在这里插入图片描述

使用普通构建mappedFile的时候, 会初始化参数, 会把commitlog文件从磁盘空间完全的映射到虚拟内存, 即内存映射, 为mmap操作。

在这里插入图片描述

在这里插入图片描述

public MappedFile(final String fileName, final int fileSize) throws IOException {
//调用init初始化init(fileName, fileSize);
}private void init(final String fileName, final int fileSize) throws IOException {//文件名。长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0this.fileName = fileName;//文件大小。默认1G=1073741824this.fileSize = fileSize;//构建file对象this.file = new File(fileName);//构建文件起始索引,就是取自文件名this.fileFromOffset = Long.parseLong(this.file.getName());boolean ok = false;//确保文件目录存在ensureDirOK(this.file.getParent());try {//对当前commitlog文件构建文件通道fileChannelthis.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();//把commitlog文件完全的映射到虚拟内存,也就是内存映射,即mmap,提升读写性能this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);//记录数据TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);TOTAL_MAPPED_FILES.incrementAndGet();ok = true;} catch (FileNotFoundException e) {log.error("Failed to create file " + this.fileName, e);throw e;} catch (IOException e) {log.error("Failed to map file " + this.fileName, e);throw e;} finally {//释放fileChannel,注意释放fileChannel不会对之前的mappedByteBuffer映射产生影响if (!ok && this.fileChannel != null) {this.fileChannel.close();}}
}
2.2.4.2 采用堆外内存

判断是否开启了堆外内存, 相对于mmap方法, 会多设置一个writeBuffer。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

public void init(final String fileName, final int fileSize,final TransientStorePool transientStorePool) throws IOException {//普通初始化init(fileName, fileSize);//设置写buffer,采用堆外内存this.writeBuffer = transientStorePool.borrowBuffer();this.transientStorePool = transientStorePool;
}
  • borrowBuffer方法中会返回TransientStorePool中的 availableBuffer, 如果堆外内存开启, 那么在broker启动创建DefaultMessageStore的时候将会执行TransientStorePool#init方法, 该方法会初始化5个1G大小的堆外内存并且锁定住。是个重量级初始化方法, 会延长broker启动时间。

  • 堆外内存就是通过ByteBuffer.allocateDirect方法分配的, 这5块内存可以反复使用。

TransientStorePool#init():

在这里插入图片描述

/*** TransientStorePool的方法** It's a heavy init method.*/
public void init() {//默认5个for (int i = 0; i < poolSize; i++) {//分配堆外内存,默认大小1GByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);final long address = ((DirectBuffer) byteBuffer).address();Pointer pointer = new Pointer(address);//锁定堆外内存,确保不会被置换到虚拟内存中去LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));//存入队列中availableBuffers.offer(byteBuffer);}
}

如果是普通模式, 那么采用mmap方法, 如果是读写分离模式, 那么broker会将消息写入writeBuffer, 先写入DirectByteBuffer(堆外内存), 直接返回。然后异步服务CommitRealTimeService不断从堆外内存批量Commit到Page Cache中, 消费者始终从mappedByteBuffer(page Cache)读取数据。

高并发下写入 page cache 可能会造成刷脏页时磁盘压力较高, 写入时发生毛刺现象。读写分离可以缓解page Cache压力, 但会影响消息不一致性, 数据一致性会降低。

2.2.5 warmMappedFile文件预热

在这里插入图片描述

  • mmap操作对于OS来说只是建立虚拟内存地址至物理地址的映射关系, 即将进程使用的虚拟内存地址映射到物理地址上。并不会加载任何MappedFile数据至内存中, 也并不会分配指定的大小的内存。当程序要访问数据时, 如果发现这部分数据页并没有实际加载到内存中, 则处理器自动触发一个缺页异常, 使得进入内核空间再分配物理内存, 一次默认为4k。一个G大小的commitLog, 如果靠着缺页中断来分配实际内存, 会触发26w多次缺页中断, 会造成很大开销。

  • RocketMQ避免频繁发生却也异常的做法是采用文件预热, 提前让os分配物理内存空间, 防止在写入消息时发生缺页异常才进行分配。

/*** MappedFile的方法** 建立了进程虚拟地址空间映射之后,并没有分配虚拟内存对应的物理内存,这里进行内存预热** @param type  消息刷盘类型,默认 FlushDiskType.ASYNC_FLUSH;* @param pages 一页大小,默认4k*/
public void warmMappedFile(FlushDiskType type, int pages) {long beginTime = System.currentTimeMillis();// 创建一个新的字节缓冲区ByteBuffer byteBuffer = this.mappedByteBuffer.slice();int flush = 0;long time = System.currentTimeMillis();//每隔4k大小写入一个0for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {//每隔4k大小写入一个0byteBuffer.put(i, (byte) 0);// force flush when flush disk type is sync//如果是同步刷盘,则每次写入都要强制刷盘if (type == FlushDiskType.SYNC_FLUSH) {if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {flush = i;mappedByteBuffer.force();}}// prevent gc//调用Thread.sleep(0)当前线程主动放弃CPU资源,立即进入就绪状态//防止因为多次循环导致该线程一直抢占着CPU资源不释放,if (j % 1000 == 0) {log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);time = System.currentTimeMillis();try {Thread.sleep(0);} catch (InterruptedException e) {log.error("Interrupted", e);}}}// force flush when prepare load finished//把剩余的数据强制刷新到磁盘中if (type == FlushDiskType.SYNC_FLUSH) {log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",this.getFileName(), System.currentTimeMillis() - beginTime);mappedByteBuffer.force();}log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),System.currentTimeMillis() - beginTime);//锁定内存this.mlock();
}
  • RocketMQ对于MappedFile每隔OS_PAGE_SIZE大小写入一个0, 来让操作系统预先分配1G大小的全额物理内存, 预先分配内存。

在这里插入图片描述

2.2.6 mlock锁定内存
  • 虽然预热了文件, 短时间不会读取数据不会引发缺页异常, 但是内存不足时, 一部分不常使用的内存还是会被交换到swap空间中, 程序读取交换出的数据时候会产生缺页异常。

  • mlock方法调用系统mlock函数, 锁定文件的page cache, 防止把预热的文件交换到swap空间。还会调用系统madvise函数, 尝试一次性将一段数据读入到映射内存区域, 减少了缺页异常。

在这里插入图片描述

/*** MappedFile的方法* 锁定内存*/
public void mlock() {final long beginTime = System.currentTimeMillis();final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();Pointer pointer = new Pointer(address);{//mlock调用int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);}{//madvise调用int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);}
}

2.3 appendMessage追加存储消息

在这里插入图片描述

在这里插入图片描述

  • 当获取到mappedFile之后, 调用mappedFile#appendMessage方法追加消息。
/*** MappedFile的方法* <p>* 追加消息** @param msg               消息* @param cb                回调函数* @param putMessageContext 存放消息上下文*/
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,PutMessageContext putMessageContext) {//调用appendMessagesInner方法return appendMessagesInner(msg, cb, putMessageContext);
}

首先获取当前文件的写指针, 如果写指针小于文件的大小, 进行消息追加, doAppend方法。最后更新写指针位置, 和存储时间。

/*** MappedFile的方法* <p>* 追加消息** @param messageExt        消息* @param cb                回调函数* @param putMessageContext 存放消息上下文*/
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,PutMessageContext putMessageContext) {assert messageExt != null;assert cb != null;//获取写入指针的位置int currentPos = this.wrotePosition.get();//如果小于文件大小,那么可以写入if (currentPos < this.fileSize) {//如果存在writeBuffer,即支持堆外缓存,那么则使用writeBuffer进行读写分离,否则使用mmap的方式写ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();//设置写入位置byteBuffer.position(currentPos);AppendMessageResult result;/** 通过回调函数执行实际写入*/if (messageExt instanceof MessageExtBrokerInner) {//单条消息result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else if (messageExt instanceof MessageExtBatch) {//批量消息result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}//更新写指针的位置this.wrotePosition.addAndGet(result.getWroteBytes());//更新存储实时间this.storeTimestamp = result.getStoreTimestamp();return result;}log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
2.3.1 doAppend执行追加

在这里插入图片描述

  • 通过AppendMessageCallback回调函数的doAppend方法执行的。回调函数的具体实现是DefaultAppendMessageCallback, 是CommitLog里面的一个内部类的实现。
  1. 获取消息物理偏移量, 创建服务端消息Id生成器, 4个字节ip+4个字节端口+8个消息偏移量。从topicQueueTable中获取Queue队列的最大相对偏移量。
  2. 判断如果消息的长度加上文件结束符子节数大于maxBlank, 表示commitlog剩余大小不足存储消息, 返回END_OF_FILE。在asyncPutMessage方法中判断到该code之后将会新建一个MappedFile并尝试再次存储。
  3. 如果空间足够, 消息进行编码, 编码后的消息写入到byteBuffer中, byteBuffer可能是writeBffer, 即直接缓冲区, 也可能为普通缓冲区mappedByteBuffer。
  4. 返回AppendMessageResult对象, 内部包含消息追加状态, 消息写入物理偏移量, 消息写入长度, 消息id生成器, 、消息开始追加的时间戳、消息队列偏移量、消息开始写入的时间戳等属性。
/*** DefaultAppendMessageCallback的方法* <p>* 追加消息回调** @param fileFromOffset    文件起始索引* @param byteBuffer        缓冲区* @param maxBlank          最大空闲区* @param msgInner          消息* @param putMessageContext 上下文*/
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>// PHY OFFSET//获取物理偏移量索引long wroteOffset = fileFromOffset + byteBuffer.position();/** 构建msgId,也就是broker端的唯一id,在发送消息的时候,在客户端producer也会生成一个唯一id是的。*/Supplier<String> msgIdSupplier = () -> {//系统标识int sysflag = msgInner.getSysFlag();//长度16int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;//分配16字节的缓冲区ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);//ip4个字节、host4个字节MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);//清除缓冲区,因为因为socketAddress2ByteBuffer会翻转缓冲区msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer//8个字节存储commitLog的物理偏移量msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);return UtilAll.bytes2string(msgIdBuffer.array());};// Record ConsumeQueue information//记录ConsumeQueue信息//key = "topic-queueId"String key = putMessageContext.getTopicQueueTableKey();//获取该队列的最大相对偏移量Long queueOffset = CommitLog.this.topicQueueTable.get(key);if (null == queueOffset) {//如果为null则设置为0,并且存入topicQueueTablequeueOffset = 0L;CommitLog.this.topicQueueTable.put(key, queueOffset);}//light message queue(LMQ)支持boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);if (!multiDispatchWrapResult) {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}// Transaction messages that require special handling//需要特殊处理的事务消息final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch (tranType) {// Prepared and Rollback message is not consumed, will not enter the// consumer queuec//准备和回滚消息不会被消费,不会进入消费队列case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:queueOffset = 0L;break;//非事务消息和提交消息会被消费case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:default:break;}/** 消息编码序列化*///获取编码的ByteBufferByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();final int msgLen = preEncodeBuffer.getInt(0);// Determines whether there is sufficient free space//消息编码if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {this.msgStoreItemMemory.clear();// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value// Here the length of the specially set maxBlankfinal long beginTimeMills = CommitLog.this.defaultMessageStore.now();byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */msgIdSupplier, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);}int pos = 4 + 4 + 4 + 4 + 4;// 6 QUEUEOFFSETpreEncodeBuffer.putLong(pos, queueOffset);pos += 8;// 7 PHYSICALOFFSETpreEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;// 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMPpos += 8 + 4 + 8 + ipLen;// refresh store time stamp in lockpreEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());//存储消息起始时间final long beginTimeMills = CommitLog.this.defaultMessageStore.now();// Write messages to the queue buffer/** 将消息写入到byteBuffer中,这里的byteBuffer可能是writeBuffer,即直接缓冲区,也有可能是普通缓冲区mappedByteBuffer*/byteBuffer.put(preEncodeBuffer);msgInner.setEncodedBuff(null);//返回AppendMessageResult,包括消息追加状态、消息写入偏移量、消息写入长度、消息ID生成器、消息开始追加的时间戳、消息队列偏移量、消息开始写入的时间戳AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);switch (tranType) {case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// The next update ConsumeQueue informationCommitLog.this.topicQueueTable.put(key, ++queueOffset);CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);break;default:break;}return result;
}
2.3.2 消息序列化

Broker的commitlog只会存储序列化后的消息。

在这里插入图片描述

3.存储高性能设计总结

通过获取mappedFile, RocketMQ对于commitlog的性能采取了很多措施:

  1. commitlog文件预创建和文件预分配
  2. mmap
  3. 文件预热和内存预热
  4. 内存锁定
  5. 读写分离

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/19165.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【LeetCode热题100】打卡第33天:环形链表LRU缓存

文章目录 【LeetCode热题100】打卡第33天&#xff1a;环形链表&LRU缓存⛅前言 环形链表&#x1f512;题目&#x1f511;题解 LRU缓存&#x1f512;题目&#x1f511;题解 【LeetCode热题100】打卡第33天&#xff1a;环形链表&LRU缓存 ⛅前言 大家好&#xff0c;我是知…

Vue 组件化开发

文章目录 前言组件化开发父子组件相互传数据父传子&#xff1a;自定义属性子传父&#xff1a;自定义事件父子组件互传案例 插槽 slot多个插槽 总结组件化开发总结Vue组件的基本组成子组件使用的三个步骤父子组件相互传递数据 前言 提示&#xff1a;这里可以添加本文要记录的大…

Android-jar包方式连接本地sqlite并操作返回数据

背景: 数据库的创建及字段都是后端人员维护,Android端只是映射相关数据库到本地来操作。为了统一管理操作方式方法,所以提出,后端打jar包的方式封装对Android端数据库sqllite的连接、操作。 说明: 因为之前是后端打jar包,JDBC连接的驱动及方法有差异,导致连接Android…

VMware将虚拟机网络设置为NAT模式

虚拟机有vmware和desktop&#xff0c;本人一直使用的是vmware。安装好vmware并激活后&#xff0c;创建完虚拟机。(需要vmware和激活码的可留言) 进入虚拟机设置&#xff0c;网络适配器选择NAT模式 在虚拟机工具栏->菜单栏点击编辑&#xff0c;选择“虚拟网络编辑器”。 选择…

[C语言][小游戏][猜拳游戏]

C语言的奇妙旅行 一、模块化编程二、游戏基本设计2.1 确定计算机要出的手势2.2 显示“石头剪刀布”&#xff0c;然后玩家输入自己要出的手势2.3进行输赢判断&#xff0c;显示结果2.4询问是否继续2.5 基本程序 三、游戏实现的过程3.1将玩家的手势和电脑的手势显示出来 三、总代码…

metersphere主从节点部署

metersphere主从节点关系 环境搭建 docker 环境准备 检查内存是否大于8G free -m 安装docker服务 安装docker&#xff0c;使用yum -y install docker&#xff1b; 启动docker&#xff0c;使用systemctl start docker&#xff1b; 设置开机启动&#xff0c;使用systemctl en…

基于SpringBoot的网上订餐系统【附ppt和开题|万字文档(LW)和搭建文档】

主要功能 前台登录&#xff1a;前台登录&#xff1a; ①首页&#xff1a;菜品信息推荐、菜品信息展示、查看更多 ②菜品信息&#xff1a;菜品分类、菜品名称查询、食材查询、菜品详情、下单提交 ③个人中心&#xff1a;可以查看自己的信息、我的订单、我的地址 后台登录&#…

哪款3D虚拟人物建模软件好用?

3D虚拟人物建模软件一直以来受到许多人的关注和追捧。现在&#xff0c;随着智能手机的普及&#xff0c;3D虚拟人物手机建模软件也开始走进大家的视野。那么&#xff0c;市面上3D虚拟人物建模软件这么多&#xff0c;究竟哪款3D虚拟人物建模软件是好用的呢&#xff1f; 首先&…

Vision Transformer推理中线性-角度注意转换压缩自注意

文章目录 Castling-ViT: Compressing Self-Attention via Switching Towards Linear-Angular Attention at Vision Transformer Inference摘要本文方法实验结果 Castling-ViT: Compressing Self-Attention via Switching Towards Linear-Angular Attention at Vision Transform…

Blender--原理化体积

“原理化体积 着色器将所有体积着色组件组合到一个易于使用的节点中。该节点含有散射&#xff0c;吸收和黑体辐射属性&#xff0c;因此&#xff0c;可以仅仅使用该着色器节点对烟雾和火焰等进行渲染。” 官方文档介绍&#xff1a;原理化体积 — Blender Manual 可以用于实现丁…

Docker入门

目录&#xff1a; 常见概念评价指标单机架构应用数据分离架构应用服务集群架构读写分离 / 主从分离架构引入缓存 —— 冷热分离架构垂直分库业务拆分 —— 微服务容器化引入——容器编排架构总结 1.常见概念&#xff1a; 应用&#xff08;Application&#xff09; / 系统&am…

mac使用conda(anaconda和miniconda一样)安装新版本的torch

使用pytorch给的命令行下载会很慢&#xff0c;因此我们应该修改镜像源&#xff0c;然后再下载torch 1.添加镜像 在终端输入以下命令&#xff0c;添加镜像&#xff1a; conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/ conda conf…