RocketMQ5.0Pop消费模式

前言

RocketMQ 5.0 消费者引入了一种新的消费模式:Pop 消费模式,目的是解决 Push 消费模式的一些痛点。
RocketMQ 4.x 之前,消费模式分为两种:

  • Pull:拉模式,消费者自行拉取消息、上报消费结果
  • Push:推模式,消费者主动拉取消息,看起来像是 Broker 主动推送消息,主动上报消费结果

Pop 消费模式可以看作是 Push 的升级版,Push 消费模式存在以下痛点:

  • 客户端负责队列重平衡、消息拉取、消费位点上报、消费失败重试等功能,逻辑太重了,不利于多语言客户端发展
  • 队列数/消费者数量变更会触发重平衡操作,期间消息无法消费,容易造成消息堆积
  • 队列和消费者强绑定,消费者数量超过队列数后,无法再水平扩容
  • 队列和消费者强绑定,消费者僵死状态下导致消息堆积

Pop 消费模式就是要解决这些痛点的,它的设计目标:

  • 消费者只管消息拉取、消息消费、上报 ACK,客户端 SDK 轻量级
  • 队列不再绑定消费者,消费者可以消费所有队列消息
  • 消费者可以很方便的水平扩容

要实现这个目标还是有不小的挑战,看看 RocketMQ 是如何做到的吧。

设计难点

Pop 消费模式是存在一些设计难点的。
Push 模式下队列为什么要和消费者绑定?也就是一个队列同一时间只允许被一个消费者消费,因为这样实现起来简单啊,主要体现在:

  • Broker 消息投递处理简单,根据消费者请求的拉取位点投递
  • Broker 消费位点管理简单,无需记录消息是否被消费,只需要记录消费位点,位点前的都消费了,位点后的都没消费

如果改成 Pop 模式,首先面临的挑战有:
1、Broker 消息投递时,要记录哪些消息已经投递了,哪些消息还没投递
Pop 模式下一个队列可以被多个消费者消费,假设现在队列里面有 1~10 号消息,消费者A 拉取了 1~3 号消息,消费者B 再拉取的时候,Broker 必须知道 1~3 号消息已经投递过在消费中了,不能再投递给消费者B了,得投递 4 号及之后的消息给消费者B才行。
image.png

2、Broker 要记录哪些消息消费成功了,哪些消息消费失败了,不能单纯记录消费位点了
因为队列可以被多个消费者消费,大家都在上报队列粒度的消费位点,Broker 没法管理了。

你看,为了可以让队列被多个消费者消费,Broker 已经不能再按队列维度去管理信息了,必须精确到消息粒度,这就需要额外的数据结构来支撑。

设计实现

取消客户端队列 Rebalance
客户端重平衡,不仅限制了消费者的消费能力,还增加了客户端复杂度,重平衡期间消息无法被消费,还容易造成消息堆积。Pop 消费模式下,消费者可以消费所有队列,也就不再需要客户端重平衡了。消费者查询给自己分配的队列时,Proxy 返回的是 Broker 维度且 queueId=-1 的逻辑队列,Broker 端会投递所有队列的消息。
image.png

invisibleTime
Pop 消费模式下,消息投递后会有一个invisibleTime的概念,即消息的不可见时间,默认是 60 秒。比如 M1 投递给消费者A后,在不可见时间段内,其它消费者是无法消费这条消息的。
image.png
另外 Broker 还提供了changeInvisibleTime()接口修改单条消息的不可见时间,比如消息消费失败后,会根据重试次数来设置新的不可见时间。
image.png

CK & ACK 消息
RocketMQ 为了记录消息是否被消费成功,引入了 CK 和 ACK 消息,以及一个专属的 Topic:rmq_sys_REVIVE_LOG_{clusterName}
消费者在拉取消息时,Broker 端会给拉取到的这一批消息发一个 CK 消息,CK 消息记录了各消息的偏移量可以定位到具体消息,同时用位图记录了各消息的 ACK 情况。Consumer 消费成功后会调用ackMessage接口,Broker 会发送一个对应的 ACK 消息;消费失败后会调用changeInvisibleTime接口,延长消息不可见时间,底层是先发一个旧消息的 ACK 消息,再发一个新消息的 CK 消息。
image.png
上述流程的运转是基于消费者正常处理消费结果的前提下的,如果消费者挂了,既不发 ack 也不发 nack,Broker 又该怎么处理这些消息呢?其实也能正常处理,因为 CK 和 ACK 消息均是延时消息,延迟的时间即消息的不可见时间,CK 消息会提前一秒消费,目的是匹配 ACK 消息。
RocketMQ 会启动八个线程消费 REVIVE Topic 对应的八个队列,匹配 CK 消息里还有哪些消息没被 ack,再将这些没被 ack 的消息发送到 RETRY Topic,消费者就可以重新消费了。

源码

先通过流程图熟悉下 Pop 消费模式的大体流程:
image.png

ProcessQueue

PushConsumer 启动后,会定时向 Proxy 查询分配的队列:

protected void startUp() throws Exception {......scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {try {// 扫描分配的队列scanAssignments();} catch (Throwable t) {log.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);}}, 1, 5, TimeUnit.SECONDS);......
}

Pop 消费模式下,客户端无须知道具体的队列数据,因为 Broker 会投递所有队列消息,所以 Proxy 只返回 Broker 维度且 queueId=-1 的一个逻辑队列即可。
消费者拿到分配的队列后,紧接着调用syncProcessQueue()方法处理队列,有两种情况:

  • 队列不再分配给自己,停止拉取消息,移除队列
  • 新分配的队列,立即拉取消息

对于新分配的队列,消费者会创建ProcessQueue对象开始拉取队列消息:

private void receiveMessageImmediately() {// Broker端点列表 gRPC负载均衡调用final Endpoints endpoints = mq.getBroker().getEndpoints();// 构建请求final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression);// 发请求final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,consumer.getPushConsumerSettings().getLongPollingTimeout());Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {@Overridepublic void onSuccess(ReceiveMessageResult result) {// 处理消息onReceiveMessageResult(result);}}
}

PopMessageProcessor

Proxy 端会开启 gRPC 服务,GrpcMessagingApplication 用来处理客户端的请求,拉取消息对应的处理方法是receiveMessage,最终会交给 Broker 的 PopMessageProcessor 处理,主要步骤:

  • 校验请求参数、队列写权限等等
  • 构建消息过滤器 ExpressionMessageFilter
  • 按照 1/5 的概率先尝试从重试队列里拉取消息
  • 遍历所有队列拉取消息,直到拉取最大消息数
  • 如果普通队列拉取的消息数没达到最大消息数,再尝试拉取重试队列
  • 如果还有剩余消息,通知其它被长轮询挂起的请求继续拉取
  • 返回结果
private RemotingCommand processRequest(final Channel channel, RemotingCommand request)throws RemotingCommandException {......// 参数、权限校验// Topic配置TopicConfig topicConfig =this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());// 消费组订阅配置SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());// 消息过滤ExpressionMessageFilter messageFilter = ......// POP模式下,消费者要遍历所有队列,随机一个下标开始读取,都从0开始读的话,存在热点队列,竞争锁int randomQ = random.nextInt(100);int reviveQid; // REVEIE队列ID 顺序消息999 普通消息:0~7轮询if (requestHeader.isOrder()) {reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE;} else {reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum());}int commercialSizePerMsg = this.brokerController.getBrokerConfig().getCommercialSizePerMsg();GetMessageResult getMessageResult = new GetMessageResult(commercialSizePerMsg);long restNum = 0; // 剩余消息数// 平均每5次请求,读取一下重试队列boolean needRetry = randomQ % 5 == 0;if (needRetry && !requestHeader.isOrder()) {restNum = popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid,channel, popTime, messageFilter,startOffsetInfo, msgOffsetInfo, orderCountInfo);}// POP模式下,请求的queueId=-1,读所有队列if (requestHeader.getQueueId() < 0) {for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {int queueId = (randomQ + i) % topicConfig.getReadQueueNums();restNum = popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter,startOffsetInfo, msgOffsetInfo, orderCountInfo);}}// 拉取到的消息数量不满足消费者期望的数量,接着拉取重试队列:%RETRY%{consumerGroup}_{topic}......if (!getMessageResult.getMessageBufferList().isEmpty()) {// 拉取到消息,且队列里面还有消息,通知其它拉取请求if (restNum > 0) {notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),requestHeader.getQueueId());}} else {// 没有消息,进入长轮询状态 挂起int pollingResult = polling(channel, request, requestHeader);}......
}

通过队列获取消息的方法是popMsgFromQueue,虽然队列可以被多个消费者消费,但是同一个消费组下,针对同一个队列拉取消息的行为必须保证串行,所以 Broker 首先会构建一个topic@group@queueId格式的字符串作为 lockKey 保证加锁成功,然后再获取消息拉取位点 offset,调用 MessageStore 获取消息,最终给这批消息记录 CK 消息。

private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,Channel channel, long popTime,ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),requestHeader.getConsumerGroup()) : requestHeader.getTopic();// 给队列加锁 同一消费组下的队列串行读取 topic@group@queueIdString lockKey =topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;// 加锁if (!queueLockManager.tryLock(lockKey)) {restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;return restNum;}// 拉取位点offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);// 获取消息GetMessageResult getMessageTmpResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), topic, queueId, offset,requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {// 追加CheckPointappendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());}......queueLockManager.unLock(lockKey);
}

image.png
这里的锁实现很简单,通过 AtomicBoolean CAS 修改上锁标记位来判断是否加锁成功:

static class TimedLock {private final AtomicBoolean lock;private volatile long lockTime;
}

加锁成功后,Broker 得知道该从哪里开始给消费者投递消息,这就是拉取位点的获取:

  • 首先取已提交的消费位点
  • 如果 CK 缓冲区有已投递的 PopCheckPoint,则取缓冲区的拉取位点,避免已经投递过的消息重复投递
private long getPopOffset(String topic, PopMessageRequestHeader requestHeader, int queueId, boolean init,String lockKey) {// 已提交的消费位点long offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),topic, queueId);if (offset < 0) {// 还没消费过 判断是从最旧还是最新的开始消费if (ConsumeInitMode.MIN == requestHeader.getInitMode()) {offset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);} else {// pop last one,then commit offset.offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1;// max & no consumer offsetif (offset < 0) {offset = 0;}if (init) {this.brokerController.getConsumerOffsetManager().commitOffset("getPopOffset",requestHeader.getConsumerGroup(), topic,queueId, offset);}}}// CK缓冲区的位点,这些是已投递、还没提交消费的位点long bufferOffset = this.popBufferMergeService.getLatestOffset(lockKey);if (bufferOffset < 0) {return offset;} else {return bufferOffset > offset ? bufferOffset : offset;}
}

有了拉取位点,接下来就是通过 MessageStore 查询消息了,底层会读取 consumequeue 和 commitlog 文件,这里不赘述。拉取到消息列表后,Broker 会给这一批消息记录一个 CK 消息,用于后续匹配 ACK 消息。

PopCheckPoint

CK 消息对应的类是 PopCheckPoint,主要记录了:消息拉取时间、消息偏移量、不可见时间、消息 ACK 位图等等。

public class PopCheckPoint {// 起始偏移量@JSONField(name = "so")private long startOffset;// 消息拉取时间@JSONField(name = "pt")private long popTime;@JSONField(name = "it")private long invisibleTime;// 位图 收到ACK消息则把对应位设为1@JSONField(name = "bm")private int bitMap;// 消息数量@JSONField(name = "n")private byte num;@JSONField(name = "q")private byte queueId;@JSONField(name = "t")private String topic;// 消费组@JSONField(name = "c")private String cid;@JSONField(name = "ro")private long reviveOffset;// 消息增量偏移量@JSONField(name = "d")private List<Integer> queueOffsetDiff;@JSONField(name = "bn")String brokerName;
}

构建好 PopCheckPoint 对象,Broker 会把它作为一个普通消息写入 commitlog 持久化,消息内容:

topic: rmq_sys_REVIVE_LOG_{clusterName}
queueId: 0~7轮询
tag: ck
body: json(PopCheckPoint)
deliverTimeMs: invisibleTime-1s

CK 消息存储完毕后,就可以正常返回消息了。需要注意的是,消息在返回给消费者前,Broker 会给消息设置一个很重要的属性:POP_CK。它是消息关联的 CK 消息的句柄字符串,消费者基于该属性来 ack 消息。

messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(key), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),responseHeader.getReviveQid(), messageExt.getTopic(), messageQueue.getBrokerName(), messageExt.getQueueId(), msgQueueOffset)
);

POP_CK由以下八个属性构成,空格连接,通过该属性可以快速定位到 CK 消息。

{ckQueueOffset} {popTime} {invisibleTime} {reviveQid} {0/1} {brokerName} {queueId} {msgQueueOffset}# 示例值
2 1698656741635 60000 0 0 broker-a 3 2

AckMessageProcessor

消费者消费完消息后,会调用eraseMessage()擦除消息,也就是根据消费结果判断是 ack 还是 nack。

public void eraseMessage(MessageViewImpl messageView, ConsumeResult consumeResult) {statsConsumptionResult(consumeResult);ListenableFuture<Void> future = ConsumeResult.SUCCESS.equals(consumeResult) ? ackMessage(messageView) :nackMessage(messageView);future.addListener(() -> evictCache(messageView), MoreExecutors.directExecutor());
}

如果消费成功,则调用ackMessage接口,Broker 的处理方式也很简单,就是构建一个 AckMsg 对象,然后把它作为消息体发一个 ACK 消息。

private RemotingCommand processRequest(final Channel channel, RemotingCommand request,boolean brokerAllowSuspend) throws RemotingCommandException {......AckMsg ackMsg = new AckMsg();ackMsg.setAckOffset(requestHeader.getOffset());ackMsg.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfo));ackMsg.setConsumerGroup(requestHeader.getConsumerGroup());ackMsg.setTopic(requestHeader.getTopic());ackMsg.setQueueId(requestHeader.getQueueId());ackMsg.setPopTime(ExtraInfoUtil.getPopTime(extraInfo));ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {顺序消息处理}// 构建消息存储MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(reviveTopic);msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));msgInner.setQueueId(rqId);msgInner.setTags(PopAckConstants.ACK_TAG);msgInner.setBornTimestamp(System.currentTimeMillis());msgInner.setBornHost(this.brokerController.getStoreHost());msgInner.setStoreHost(this.brokerController.getStoreHost());// 延时消息 拉取时间+不可见时间msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + ExtraInfoUtil.getInvisibleTime(extraInfo));msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);return response;
}

ACK 消息内容如下:

topic: rmq_sys_REVIVE_LOG_{clusterName}
queueId: 0~7轮询
tag: ack
body: json(AckMsg)
deliverTimeMs: invisibleTime

注意:CK & ACK 消息同属一个Topic

ChangeInvisibleTimeProcessor

如果消费失败,会根据重试次数调用changeInvisibleDuration接口延长消息不可见时间。Broker 最终会由 ChangeInvisibleTimeProcessor 处理请求,因为消息是按照紧凑的方式顺序写入 commitlog,所以写入后就不支持修改了,所谓的修改消息不可见时间,其实是先发一个新的 CK 消息,再发一个旧消息的 ACK 消息。

private RemotingCommand processRequest(final Channel channel, RemotingCommand request,boolean brokerAllowSuspend) throws RemotingCommandException {......// add new cklong now = System.currentTimeMillis();PutMessageResult ckResult = appendCheckPoint(requestHeader, ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(), requestHeader.getOffset(), now, ExtraInfoUtil.getBrokerName(extraInfo));// ack old msgackOrigin(requestHeader, extraInfo);
}

所以,无论消费者是调用 ack 还是 nack,消息一定会被 ack 掉。另外还有一种场景就是消费者异常,没有上报消费结果,Broker 在消息不可见时间到期后会把这些没被 ack 掉的消息发到重试队列里,让其它消费者消费。

PopReviveService

CK & ACK 消息存储下来以后,由谁来处理呢?AckMessageProcessor 会开启八个线程,消费 REVIVE Topic 的八个队列,线程对应的服务是 PopReviveService。
PopReviveService 线程每秒执行一次,消费 REVIVE Topic 里的消息。如果是 CK 消息则重新构建 PopCheckPoint 对象;如果是 ACK 消息则更新 PopCheckPoint 里的位图,通过 tag 来区分消息类型。
最后处理 PopCheckPoint,已经被 ack 的消息不做处理,未被 ack 的消息会构建一条新消息重新发送到重试队列,Topic 规则是:%RETRY%{consumerGroup}_{topic}
image.png

public void run() {......ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj();// 消费Revive队列消息,构建PopCheckPoint放入mapconsumeReviveMessage(consumeReviveObj);// 处理PopCheckPointmergeAndRevive(consumeReviveObj);
}
protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {HashMap<String, PopCheckPoint> map = consumeReviveObj.map;// 消费位点long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(PopAckConstants.REVIVE_GROUP, reviveTopic, queueId);while (true) {// 查询REVIVE队列消息List<MessageExt> messageExts = getReviveMessage(offset, queueId);for (MessageExt messageExt : messageExts) {//ck消息if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {String raw = new String(messageExt.getBody(), DataConverter.charset);PopCheckPoint point = JSON.parseObject(raw, PopCheckPoint.class);map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);} else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {// ack消息String raw = new String(messageExt.getBody(), DataConverter.charset);AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);PopCheckPoint point = map.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime());// 设置CheckPoint ACK位图int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());if (indexOfAck > -1) {point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));} else {POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point);}}        }}
}

CK 消息到期后,会触发reviveMsgFromCk恢复没有被 ack 的消息,处理方式是构建新消息发到重试队列。

private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable {for (int j = 0; j < popCheckPoint.getNum(); j++) {if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) {// 已经ack了,跳过continue;}// 查询实际消息long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);MessageExt messageExt = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName());if (messageExt == null) {POP_LOGGER.warn("reviveQueueId={},can not get biz msg topic is {}, offset is {} , then continue ",queueId, popCheckPoint.getTopic(), msgOffset);continue;}//skip ck from last epochif (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {POP_LOGGER.warn("reviveQueueId={},skip ck from last epoch {}", queueId, popCheckPoint);continue;}// 重试 构建一条消息重新Put到commitlog topic:%RETRY%{consumerGroup}_{topic}reviveRetry(popCheckPoint, messageExt);}
}

PopBufferMergeService

为了记录消息的 ack 状态,RocketMQ 需要存储额外的 CK & ACK 消息,开销还是比较大的。而消息存储的目的是为了匹配消息的 ack 状态,把未被 ack 的消息发送到重试队列里。
消息消费一般是很快的,意味着正常情况下,很快就能收到消费者发过来的 ack 和 nack 请求,那为什么不直接在内存里完成匹配呢?这样做可以大幅提升性能,消息落盘只作为一个兜底方案。
RocketMQ 是支持优先在内存里匹配 ack 消息的,默认是关闭状态,需要手动把enablePopBufferMerge打开。
开启内存匹配后,PopCheckPoint 会优先只追加到缓冲区,只有当缓冲区添加失败才会落地磁盘。

public boolean addCk(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {// key: point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt()if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {// 启用内存匹配return false;}if (!serving) {return false;}long now = System.currentTimeMillis();if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ck, timeout, {}, {}", point, now);}return false;}if (this.counter.get() > brokerController.getBrokerConfig().getPopCkMaxBufferSize()) {POP_LOGGER.warn("[PopBuffer]add ck, max size, {}, {}", point, this.counter.get());return false;}PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset);if (!checkQueueOk(pointWrapper)) {// 队列默认不超过20000个return false;}// 添加到commitOffsetsputOffsetQueue(pointWrapper);// 添加到缓冲区this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);this.counter.incrementAndGet();if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]add ck, {}", pointWrapper);}return true;
}

同样的,Broker 在接收到消息 ack 请求时,也会优先只改缓冲区的 PopCheckPoint ack 位图,无需存储 ACK 消息。

 public boolean addAk(int reviveQid, AckMsg ackMsg) {if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {return false;}if (!serving) {return false;}try {PopCheckPointWrapper pointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName());if (pointWrapper == null) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, no ck, {}", reviveQid, ackMsg);}return false;}if (pointWrapper.isJustOffset()) {return false;}PopCheckPoint point = pointWrapper.getCk();long now = System.currentTimeMillis();if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);}return false;}if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() - 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, stay too long, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);}return false;}// 直接更改内存里的位图int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());if (indexOfAck > -1) {markBitCAS(pointWrapper.getBits(), indexOfAck);} else {POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);return true;}if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]add ack, rqId={}, {}, {}", reviveQid, pointWrapper, ackMsg);}return true;} catch (Throwable e) {POP_LOGGER.error("[PopBuffer]add ack error, rqId=" + reviveQid + ", " + ackMsg, e);}return false;}

内存资源有限,一直往里堆 PopCheckPoint 也不行啊,这时候就需要做两件事:

  • 把内存里已经匹配完 ack 的 PopCheckPoint 移除掉
  • 隔太久还没 ack 掉的 PopCheckPoint,必须要落盘存储了

PopBufferMergeService 本身也是个线程,会每隔 5ms 扫描一次缓冲区,执行上述操作。
image.png
内存里的 PopCheckPoint 移除前需要满足两个条件:

  • PopCheckPoint 消息已经持久化了
  • PopCheckPoint 在内存里就已经全被 ack 掉了

还没有完全被 ack 掉的 PopCheckPoint 移除前需要做两件事:

  • PopCheckPoint 消息持久化
  • 已经被 ack 掉的消息也要持久化
private void scan() {long startTime = System.currentTimeMillis();int count = 0, countCk = 0;// 迭代缓冲区的PopCheckPointIterator<Map.Entry<String, PopCheckPointWrapper>> iterator = buffer.entrySet().iterator();while (iterator.hasNext()) {Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();PopCheckPointWrapper pointWrapper = entry.getValue();// CheckPoint已持久化,或已被ACK,从缓冲区删除// just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)|| isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper);}iterator.remove();counter.decrementAndGet();continue;}// 把超时或停留时间超10s的CheckPoint从缓冲区删除,删除前要先持久化PopCheckPoint point = pointWrapper.getCk();long now = System.currentTimeMillis();boolean removeCk = !this.serving;// ck will be timeoutif (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {removeCk = true;}// 内存停留时间超过10秒,也要移除掉if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {removeCk = true;}if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() * 2L) {POP_LOGGER.warn("[PopBuffer]ck finish fail, stay too long, {}", pointWrapper);}// double checkif (isCkDone(pointWrapper)) {continue;} else if (pointWrapper.isJustOffset()) {// just offset should be in store.if (pointWrapper.getReviveQueueOffset() < 0) {// reviveQueueOffset<0代表还没存储,要先存储putCkToStore(pointWrapper, false);countCk++;}continue;} else if (removeCk) {// put buffer ak to storeif (pointWrapper.getReviveQueueOffset() < 0) {putCkToStore(pointWrapper, false);countCk++;}if (!pointWrapper.isCkStored()) {continue;}for (byte i = 0; i < point.getNum(); i++) {// reput buffer ak to store// 存储ack消息if (DataConverter.getBit(pointWrapper.getBits().get(), i)&& !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {if (putAckToStore(pointWrapper, i)) {count++;markBitCAS(pointWrapper.getToStoreBits(), i);}}}if (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]ck finish, {}", pointWrapper);}iterator.remove();counter.decrementAndGet();continue;}}}// 扫描commitOffsets,提交消费位点int offsetBufferSize = scanCommitOffset();long eclipse = System.currentTimeMillis() - startTime;if (eclipse > brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() - 1000) {POP_LOGGER.warn("[PopBuffer]scan stop, because eclipse too long, PopBufferEclipse={}, " +"PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",eclipse, count, countCk, counter.get(), offsetBufferSize);this.serving = false;} else {if (scanTimes % countOfSecond1 == 0) {POP_LOGGER.info("[PopBuffer]scan, PopBufferEclipse={}, " +"PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",eclipse, count, countCk, counter.get(), offsetBufferSize);}}scanTimes++;if (scanTimes >= countOfMinute1) {counter.set(this.buffer.size());scanTimes = 0;}
}

内存里的 PopCheckPoint 除了会根据唯一键,构建一个 Map,还会根据topic@cid@queueId构建一个 QueueWithTime 队列连接起来。

ConcurrentHashMap<String/*topic@cid@queueId*/, QueueWithTime<PopCheckPointWrapper>> commitOffsets =new ConcurrentHashMap<>();

QueueWithTime 队列的用途有两个:

  • 内存里的 PopCheckPoint 处理完毕后,提交消费位点
  • 消息拉取时,获取拉取位点,避免已经投递的消息重复投递

内存里的 PopCheckPoint 只要持久化了或者全被 ack 掉了,就可以提交消费位点了。因为这些消息要么被成功消费了,要么后续在处理 CK 消息时也会被发送到重试队列里。提交消费位点的方法是commitOffset()

private boolean commitOffset(final PopCheckPointWrapper wrapper) {if (wrapper.getNextBeginOffset() < 0) {return true;}final PopCheckPoint popCheckPoint = wrapper.getCk();final String lockKey = wrapper.getLockKey();// 加锁if (!queueLockManager.tryLock(lockKey)) {return false;}try {// 旧的消费位点final long offset = brokerController.getConsumerOffsetManager().queryOffset(popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId());if (wrapper.getNextBeginOffset() > offset) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("Commit offset, {}, {}", wrapper, offset);}} else {// maybe store offset is not correct.POP_LOGGER.warn("Commit offset, consumer offset less than store, {}, {}", wrapper, offset);}// 提交消费位点brokerController.getConsumerOffsetManager().commitOffset(getServiceName(),popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId(), wrapper.getNextBeginOffset());} finally {queueLockManager.unLock(lockKey);}return true;
}

尾巴

RocketMQ 5.0 的 Pop 消费模式是 Push 模式的升级版,它解决了原先 Push 模式下队列只能由一个消费者消费的问题、去除了客户端繁重的重平衡逻辑、降低了消息堆积的风险。核心逻辑是 Broker 给每次拉取的一批消息发一个 CK 延时消息,客户端 ack 时再发一个 ACK 延时消息,消息到期后对 CK 消息做 ACK 匹配,把未被 ack 掉的消息发到重试独立里。这种模式下,不可避免的会带来额外开销,所以 RocketMQ 也支持优先在内存里完成匹配,CK 和 ACK 消息就不用存储了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/317852.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

L1-083:谁能进图书馆

题目描述 为了保障安静的阅读环境&#xff0c;有些公共图书馆对儿童入馆做出了限制。例如“12 岁以下儿童禁止入馆&#xff0c;除非有 18 岁以上&#xff08;包括 18 岁&#xff09;的成人陪同”。现在有两位小/大朋友跑来问你&#xff0c;他们能不能进去&#xff1f;请你写个程…

机器学习笔记 - 偏最小二乘回归 (PLSR)

一、偏最小二乘回归:简介 PLS 方法构成了一个非常大的方法族。虽然回归方法可能是最流行的 PLS 技术,但它绝不是唯一的一种。即使在 PLSR 中,也有多种不同的算法可以获得解决方案。PLS 回归主要由斯堪的纳维亚化学计量学家 Svante Wold 和 Harald Martens 在 20 世纪 80 年代…

智能安全帽定制_基于联发科MT6765安卓核心板方案

智能安全帽&#xff1a;解放双手&#xff0c;实现远程指导和可视化管理 智能安全帽在安全帽的基础功能上&#xff0c;高度集成了摄像头、语音、通信主板等模块&#xff0c;具有高清视频采集、语音通讯、对讲、本地视频存储等功能。它是一款真正意义上解放现场操作人员双手的穿戴…

《Linux系列》Linux磁盘MBR分区扩容

文章目录 Linux磁盘MBR分区扩容1.前言2.控制台磁盘扩容3.分区扩容3.1 fdisk3.2 lsblk3.3 扩容分区 4.扩容文件系统4.1 df4.2 扩容文件系统 Linux磁盘MBR分区扩容 1&#xff09;参考阿里云扩容分区文档&#xff0c;整理MBR分区扩容 2&#xff09;本文档适用于MBR分区(fdisk -lu查…

数据挖掘中的数据属性特点、描述性统计度量与相似度计算

目录 1. 引言 2. 数据挖掘中的数据属性 2.1 数值属性 2.2 标称属性 2.3 有序属性 2.4 无序属性 3. 描述性统计度量 3.1 中心趋势度量 3.2 离散程度度量 3.3 分布形状度量 4. 相似度计算 4.1 欧氏距离 4.2 余弦相似度 4.3 Jaccard 5. 数据挖掘中的案例应用 5.1 …

本科毕业四年,跳槽3次,从外包到年入20W,谁还没点绝活呢?

本人本科就读于某普通院校&#xff0c;毕业后通过同学的原因加入软件测试这个行业&#xff0c;角色也从测试小白到了目前的资深工程师&#xff0c;从功能测试转变为测试开发&#xff0c;并顺利拿下了某二线城市互联网企业Offer&#xff0c;年薪20W 。 选择和努力哪个重要&#…

Java经典面试题笔记

一&#xff0c;Java基础 1&#xff0c;说说你对面向对象的理解。 什么是面向对象呢&#xff1f;在所其是什么时&#xff0c;不妨我们先来说说以其不同的一个概念面向过程。面向过程是一个更加注重事情的每一个步骤即顺序&#xff0c;即是强调过程的。而面向对象更加注重有哪些…

Http状态:net::ERR_INCOMPLETE_CHUNKED_ENCODING

一、问题描述&#xff1a; 今天前端的小伙伴遇到一个js文件加载报错&#xff1a;net::ERR_INCOMPLETE_CHUNKED_ENCODING&#xff0c;不论如何刷新页面始终只有该文件加载失败&#xff0c;Chrome开发者工具中响应内容显示此请求没有可用的响应数据。 二、原因调查 排除非前端发…

Twincat中PLC编程的ST语言

在Twincat中&#xff0c;PLC编程使用的是Structured Text&#xff08;ST&#xff09;语言。ST语言是一种类似于Pascal的高级编程语言&#xff0c;专为工业自动化领域的程序开发而设计。它提供了结构化的控制流和数据操作&#xff0c;使得PLC编程更加高效和可靠。 https://kunal…

YApi怎么测试接口?简单实用教程

接口测试 为什么要接口测试&#xff1f; 你想想&#xff0c;你们后端团队写了几百个接口&#xff0c;兴高采烈地&#xff0c;直接部署上线&#xff0c;你们开开心心下班去。 等到晚上的时候&#xff0c;你发现你们的接口好几个都崩了&#xff0c;这导致了你们产品损失了一大…

[DevOps-05] Jenkins实现CI/CD操作

一、简要说明 基于Jenkins拉取GitLab的SpringBoot代码进行构建发布到测试环境实现持续集成 基于Jenkins拉取GitLab指定发行版本的SpringBoot代码进行构建发布到生产环境实现CD实现持续部署 二、准备Springboot工程 1、IDEA新建工程 2、填写项目工程信息 3、选择Springboot版本…

3dmax渲染全景图参数设置 3dmax云渲染插件使用

家经常在互联网上看到制作360度全景图的各种教程&#xff0c;但这些教程往往是片段的&#xff0c;并且细节解释并不充分。为此&#xff0c;以下是一些从业者常用的优良做法&#xff0c;涉及到3ds Max中的场景布局和V-Ray渲染设置&#xff0c;这些建议旨在提供一个更全面和详尽的…