RocketMQ源码阅读-Message顺序发送与消费

RocketMQ源码阅读-Message顺序发送与消费

  • 1 普通顺序消息
  • 2 严格顺序消息
    • 2.1 分配消息队列
    • 2.2 移除消息队列
    • 2.3 消费消息队列
      • 2.3.1 消费消息
      • 2.3.2 处理消费结果
      • 2.3.3 消息处理队列ProcessQueue
      • 2.3.4 小结
  • 3 总结

Message的拉取与消费逻辑,上一篇已经分析过。
这一篇看一下消息的消费顺序。
RocketMQ有两种顺序类型:

  1. 普通顺序消息:Producer会将相同的消息发送到相同的消息队列(性能好)
  2. 严格顺序消息:在普通顺序消息的基础上,Consumer严格顺序消费

一般情况下,只会使用到普通顺序消息,不必保证严格有序。
官方文档中说明目前已知只有一种情况会要求严格顺序,数据库 binlog 同步。

1 普通顺序消息

public static void main(String[] args) {try {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.start();String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;String body = "Hello RocketMQ " + i;Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (Exception e) {throw new RuntimeException(e);}
}

上面例子中,创建了一个Producer,并且发送消息时,创建了一个选择器,选择其中使用id % mqs.size() 进行消息队列的选择。传递了orderId作为参数,那么相同的orderId能够分配到同一个消息队列中。保证了消息的顺序。

看一下 MessageQueueSelector 接口:

public interface MessageQueueSelector {/*** 选择消息队列** @param mqs 消息队列* @param msg 消息* @param arg 参数* @return 消息队列*/MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

发送消息最终会调用到DefaultMQProducerImpl#sendSelectImpl:

private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);// 选择消息mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue throwed exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {// 发送消息return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

可以看到代码中选择消息和发送消息的位置。

2 严格顺序消息

Consumer能够实现严格顺序消息。
通过三把锁来实现严格顺序消费。

  1. Broker消息队列锁(分布式锁):
  • 集群模式下,Consumer从Broker获取到该锁后,才能进行消息拉取、消费
  • 广播模式下,Consumer不需要获取锁
  1. Consumer消息队列锁(本地锁):Consumer获得该锁才能操作消息队列
  2. Consumer消息处理队列消费锁(本地锁):Consumer获得该锁才能操作消息队列

2.1 分配消息队列

集群模式下,Consumer 更新属于自己的消息队列时,会向 Broker 锁定该消息队列(广播模式下不需要)。如果锁定失败,则更新失败,即该消息队列不属于自己,不能进行消费。

相关源代码如下RebalanceImpl#updateProcessQueueTableInRebalance:

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {boolean changed = false;Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();if (mq.getTopic().equals(topic)) {if (!mqSet.contains(mq)) {pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}} else if (pq.isPullExpired()) {switch (this.consumeType()) {case CONSUME_ACTIVELY:break;case CONSUME_PASSIVELY:pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",consumerGroup, mq);}break;default:break;}}}}List<PullRequest> pullRequestList = new ArrayList<PullRequest>();for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {// 顺序消息锁定消息队列if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}this.removeDirtyOffset(mq);ProcessQueue pq = new ProcessQueue();long nextOffset = this.computePullFromWhere(mq);if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}this.dispatchPullRequest(pullRequestList);return changed;
}

代码第43行,可以看到,当是顺序消息时,需要锁定消息队列。
锁定操作调用方法为RebalanceImpl#lock:

public boolean lock(final MessageQueue mq) {FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);if (findBrokerResult != null) {LockBatchRequestBody requestBody = new LockBatchRequestBody();requestBody.setConsumerGroup(this.consumerGroup);requestBody.setClientId(this.mQClientFactory.getClientId());requestBody.getMqSet().add(mq);try {// 请求Broker获得指定消息队列的分布式锁Set<MessageQueue> lockedMq =this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);// 设置消息处理队列锁定成功。锁定消息队列成功,可能本地没有消息处理队列,设置锁定成功会在lockAll()方法。for (MessageQueue mmqq : lockedMq) {ProcessQueue processQueue = this.processQueueTable.get(mmqq);if (processQueue != null) {processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());}}boolean lockOK = lockedMq.contains(mq);log.info("the message queue lock {}, {} {}",lockOK ? "OK" : "Failed",this.consumerGroup,mq);return lockOK;} catch (Exception e) {log.error("lockBatchMQ exception, " + mq, e);}}return false;
}

可以看到,严格顺序消息,rebalance更新队列时,会获取队列锁,如果锁定失败,新增消息处理队列失败。

Broker 消息队列锁会过期,默认配置 30s。因此,Consumer 需要不断向 Broker 刷新该锁过期时间,默认配置 20s 刷新一次。源码ConsumeMessageOrderlyService#start:

public void start() {if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {ConsumeMessageOrderlyService.this.lockMQPeriodically();}}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);}
}

2.2 移除消息队列

集群模式下,Consumer 移除自己的消息队列时,会向 Broker 解锁该消息队列(广播模式下不需要)。
源码RebalancePushImpl#removeUnnecessaryMessageQueue:
该方法的作用是移除不需要的队列相关的信息。

@Override
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {// 持久化队列的消费进度this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);// 移除this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);// 集群模式下,顺序消费移除时,解锁对队列的锁定if (this.defaultMQPushConsumerImpl.isConsumeOrderly()&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {try {if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {try {// 延迟解锁return this.unlockDelay(mq, pq);} finally {pq.getLockConsume().unlock();}} else {log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",mq,pq.getTryUnlockTimes());pq.incTryUnlockTimes();}} catch (Exception e) {log.error("removeUnnecessaryMessageQueue Exception", e);}return false;}return true;
}

获取消息队列消费锁,避免和消息队列消费冲突。如果获取锁失败,则移除消息队列失败,等待下次重新分配消费队列时,再进行移除。如果未获得锁而进行移除,则可能出现另外的 Consumer 和当前 Consumer 同时消费该消息队列,导致消息无法严格顺序消费。

会调用延迟解锁方法RebalancePushImpl#unlockDelay:

private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {if (pq.hasTempMessage()) {log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {@Overridepublic void run() {log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);RebalancePushImpl.this.unlock(mq, true);}}, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);} else {// 消息处理队列不存在,直接解锁this.unlock(mq, true);}return true;
}

解锁 Broker 消息队列锁。如果消息处理队列存在剩余消息,则延迟解锁 Broker 消息队列锁。

2.3 消费消息队列

源码DefaultMQPushConsumerImpl类中300行,创建了一个PullCallback,作为拉取消息成功的回调,325行调用ConsumeMessageOrderlyService(严格顺序消息处理类)的submitConsumeRequest方法

@Override
public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume) {if (dispathToConsume) {ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);this.consumeExecutor.submit(consumeRequest);}
}

方法创建了一个ConsumeRequest提交给了线程池。这个ConsumeRequest在下面进行分析。

2.3.1 消费消息

消费消息的流程如下图(来源https://www.iocoder.cn/RocketMQ/message-send-and-consume-orderly/?github&1601):
image.png
接着看ConsumeRequest的分析。
image.png
ConsumeRequest是ConsumeMessageOrderlyService的内部类:
image.png
类源码如下:

class ConsumeRequest implements Runnable {// 消息处理队列private final ProcessQueue processQueue;// 消息队列private final MessageQueue messageQueue;public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {this.processQueue = processQueue;this.messageQueue = messageQueue;}public ProcessQueue getProcessQueue() {return processQueue;}public MessageQueue getMessageQueue() {return messageQueue;}@Overridepublic void run() {if (this.processQueue.isDropped()) {log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}// 获得 Consumer 消息队列锁final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {// (广播模式) 或者 (集群模式 && Broker消息队列锁有效)if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {final long beginTime = System.currentTimeMillis();for (boolean continueConsume = true; continueConsume; ) {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);break;}// 消息队列分布式锁未锁定,提交延迟获得锁并消费请求if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& !this.processQueue.isLocked()) {log.warn("the message queue not locked, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}// 消息队列分布式锁已经过期,提交延迟获得锁并消费请求if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& this.processQueue.isLockExpired()) {log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}// 当前周期消费时间超过连续时长,默认:60s,提交延迟消费请求。默认情况下,每消费1分钟休息10ms。long interval = System.currentTimeMillis() - beginTime;if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);break;}// 获取消费消息。此处和并发消息请求不同,并发消息请求已经带了消费哪些消息。final int consumeBatchSize =ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());if (!msgs.isEmpty()) {final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);ConsumeOrderlyStatus status = null;ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);// init the consume context typeconsumeMessageContext.setProps(new HashMap<String, String>());ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}// 执行消费long beginTimestamp = System.currentTimeMillis();ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;boolean hasException = false;try {// 获取消息处理队列锁this.processQueue.getLockConsume().lock();if (this.processQueue.isDropped()) {log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue);break;}status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue);hasException = true;} finally {// 释放消息处理队列锁this.processQueue.getLockConsume().unlock();}if (null == status|| ConsumeOrderlyStatus.ROLLBACK == status|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue);}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeOrderlyStatus.SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);// 处理消费结果continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);} else {continueConsume = false;}}} else {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);}}}}

源码分析详细见上面代码片段中的注释。主要有以下流程:

  1. 获取Consumer消息队列锁
  2. 从消息队列顺序获取消息
  3. 获取消息处理队列ProcessQueue锁,(此锁相比Consumer消息队列锁,粒度更小)
  4. 执行消费
  5. 释放消息处理队列锁

2.3.2 处理消费结果

上一节源码第150行,执行处理消费结果的逻辑,调用的方法为ConsumeMessageOrderlyService#processConsumeResult方法:

// 处理消费结果,并返回是否继续消费
public boolean processConsumeResult(final List<MessageExt> msgs,final ConsumeOrderlyStatus status,final ConsumeOrderlyContext context,final ConsumeRequest consumeRequest
) {boolean continueConsume = true;long commitOffset = -1L;if (context.isAutoCommit()) {// 自动提交的消息switch (status) {case COMMIT:case ROLLBACK:log.warn("the message queue consume result is illegal, we think you want to ack these message {}",consumeRequest.getMessageQueue());case SUCCESS:// 消息消费成功,提交到消息处理队列commitOffset = consumeRequest.getProcessQueue().commit();// 统计this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());break;case SUSPEND_CURRENT_QUEUE_A_MOMENT:// 统计this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());// 计算是否暂时挂起消费N毫秒,默认10msif (checkReconsumeTimes(msgs)) {// 设置消息重新消费consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);// 提交延时消费请求this.submitConsumeRequestLater(consumeRequest.getProcessQueue(),consumeRequest.getMessageQueue(),context.getSuspendCurrentQueueTimeMillis());continueConsume = false;} else {commitOffset = consumeRequest.getProcessQueue().commit();}break;default:break;}} else {switch (status) {case SUCCESS:// 统计this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());break;case COMMIT:// 提交消息已消费成功到消息处理队列commitOffset = consumeRequest.getProcessQueue().commit();break;case ROLLBACK:// 设置消息重新消费consumeRequest.getProcessQueue().rollback();this.submitConsumeRequestLater(consumeRequest.getProcessQueue(),consumeRequest.getMessageQueue(),context.getSuspendCurrentQueueTimeMillis());continueConsume = false;break;case SUSPEND_CURRENT_QUEUE_A_MOMENT:// 统计this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());// 计算是否暂时挂起消费N毫秒,默认:10msif (checkReconsumeTimes(msgs)) {// 设置消息重新消费consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);// 提交延迟消费请求this.submitConsumeRequestLater(consumeRequest.getProcessQueue(),consumeRequest.getMessageQueue(),context.getSuspendCurrentQueueTimeMillis());continueConsume = false;}break;default:break;}}// 消息处理队列未dropped,提交有效消费进度if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);}return continueConsume;
}private int getMaxReconsumeTimes() {// default reconsume times: Integer.MAX_VALUEif (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {return Integer.MAX_VALUE;} else {return this.defaultMQPushConsumer.getMaxReconsumeTimes();}
}// 计算是否暂时挂起消费N毫秒,不暂停条件:存在消息都超过最大消费次数并且都发回broker成功,返回是否要暂停
private boolean checkReconsumeTimes(List<MessageExt> msgs) {boolean suspend = false;if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));if (!sendMessageBack(msg)) {suspend = true;msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);}} else {suspend = true;msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);}}}return suspend;
}// 消息发回Broker,对应的队列时死信队列
public boolean sendMessageBack(final MessageExt msg) {try {// 超出最大重新消耗次数(默认 :16次),然后发送到死信队列。Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;
}

此方法根据不同的消费结果状态执行不同的逻辑,消费结果状态共有四种:

public enum ConsumeOrderlyStatus {/*** 消费成功但不提交*/SUCCESS,/*** 消费失败,消费回滚(已过期,仅适用于binlog同步)*/@DeprecatedROLLBACK,/*** 消费成功提交并且提交(已过期,仅适用于binlog同步)*/@DeprecatedCOMMIT,/*** 消费失败,挂起一会,稍后继续消费*/SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

四种状态中ROLLBACK和COMMIT仅用于binlog同步,所以已经标注了@Deprecated。
普通顺序消息,也就是并发消费时,消费失败,Consume会将消费失败的消息发回Broker,等下次拉取再消费。
严格顺序消息,为保证顺序,只能挂起队列(上面代码的第99行函数),延迟一会再次消费。
如果失败次数达到上限(默认 :16次),Consumer会将此消息发回Broker的死信队列。

2.3.3 消息处理队列ProcessQueue

类ProcessQueue的签名是:队列消费快照。
用来存储消息队列的消费进度,也叫作消息处理队列。
其中的关键属性和方法为:

// 消息银蛇,key为消息队列的位置
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();/**
* msgTreeMap 的子集,仅在有序使用时使用
*/
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();// 回滚消费中的消息
public void rollback() {try {this.lockTreeMap.writeLock().lockInterruptibly();try {// consumingMsgOrderlyTreeMap 子集中的消息全部放回msgTreeMapthis.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);// 并清空 consumingMsgOrderlyTreeMapthis.consumingMsgOrderlyTreeMap.clear();} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("rollback exception", e);}
}// 提交消费中的消息为消费成功,返回消费进度
public long commit() {try {this.lockTreeMap.writeLock().lockInterruptibly();try {// 消费进度Long offset = this.consumingMsgOrderlyTreeMap.lastKey();msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {msgSize.addAndGet(0 - msg.getBody().length);}this.consumingMsgOrderlyTreeMap.clear();// 返回消费进度,偏移量+1if (offset != null) {return offset + 1;}} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("commit exception", e);}return -1;
}
// 指定消息重新消费
public void makeMessageToCosumeAgain(List<MessageExt> msgs) {try {this.lockTreeMap.writeLock().lockInterruptibly();try {for (MessageExt msg : msgs) {this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());this.msgTreeMap.put(msg.getQueueOffset(), msg);}} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("makeMessageToCosumeAgain exception", e);}
}
// 获取前batchSize条消息
public List<MessageExt> takeMessags(final int batchSize) {List<MessageExt> result = new ArrayList<MessageExt>(batchSize);final long now = System.currentTimeMillis();try {this.lockTreeMap.writeLock().lockInterruptibly();this.lastConsumeTimestamp = now;try {if (!this.msgTreeMap.isEmpty()) {for (int i = 0; i < batchSize; i++) {Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();if (entry != null) {result.add(entry.getValue());// 同时put进consumingMsgOrderlyTreeMap子集中,代表正在消费的消息consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());} else {break;}}}if (result.isEmpty()) {consuming = false;}} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("take Messages exception", e);}return result;
}

2.3.4 小结

ProcessQueue是消息处理队列,记录消费的进度,封装对于消息的流程控制方法。

3 总结

本篇分析了RocketMQ对于普通顺序消息和严格顺序消息的消费流程。

  1. 普通顺序消息:Producer会将相同的消息发送到相同的消息队列(性能好),在消费失败时会放回队列重新消费,但是先消费其他的。
  2. 严格顺序消息:在普通顺序消息的基础上,Consumer严格顺序消费。此种消费方式,在消费失败时会暂定消费一会再重新消费,直到达到一定的失败次数限制(默认16次),会发回到Broker的死信队列,然后跳过,继续消费后面的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/413683.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于springboot+vue的社区团购系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目背景…

军事智能中的深度强化学习不同于传统的深度强化学习

在军事智能中&#xff0c;“诡”和“诈”是两个最重要的概念。 “诡”变指的是智能体通过采取一些不可预测或复杂的变化策略来获得优势。诡变可能包括逃避对手的观察或引诱对手采取不利的行动。智能体可以使用诡变来欺骗对手&#xff0c;使其做出错误的决策或暴露其策略。 “诈…

Python编辑开发---pycharm pro 2023 中文

PyCharm Pro 2023是一款功能强大的Python集成开发环境&#xff08;IDE&#xff09;&#xff0c;旨在提高Python开发人员的生产力。它提供了智能代码编辑、实时代码分析和调试工具&#xff0c;支持版本控制和数据库工具&#xff0c;以及可扩展的插件系统。PyCharm Pro 2023可在多…

医学图像的图像处理、分割、分类和定位-1

一、说明 本报告全面探讨了应用于医学图像的图像处理和分类技术。开展了四项不同的任务来展示这些方法的多功能性和有效性。任务 1 涉及读取、写入和显示 PNG、JPG 和 DICOM 图像。任务 2 涉及基于定向变化的多类图像分类。此外&#xff0c;我们在任务 3 中包括了胸部 X 光图像…

【PyTorch】PyTorch之Tensors索引切片篇

文章目录 前言一、ARGWHERE二、CAT、CONCAT、CONCATENATE三、CHUNK四、GATHER五、MOVEDIM和MOVEAXIS六、PERMUTE七、RESHAPE八、SELECT九、SPLIT十、SQUEEZE十一、T十二、TAKE十三、TILE十四、TRANSPOSE十五、UNBIND十六、UNSQUEEZE十七、WHERE 前言 介绍常用的PyTorch之Tenso…

pytest + allure(windows)安装

背景 软硬件环境&#xff1a; windows11&#xff0c;已安装anaconda&#xff0c;python&#xff0c;pycharm用途&#xff1a;使用pytest allure 生成报告allure 依赖java&#xff0c;点击查看java安装教程 allure 下载与安装 从 allure下载网址下载最新版本.zip文件 放在自…

Spring Web文件上传功能简述

文章目录 正文简单文件上传文件写入 总结 正文 在日常项目开发过程中&#xff0c;文件上传是一个非常常见的功能&#xff0c;当然正规项目都有专门的文件服务器保存上传的文件&#xff0c;实际只需要保存文件路径链接到数据库中即可&#xff0c;但在小型项目中可能没有专门的文…

汽车连接器接线端子和多芯线束连接界面

冷压接的开式压接和闭式压接以及热压接的超声波焊接对汽车连接器接线端子和多芯线束连接界面 连接器接线端子和多芯线束的连接是电子线束行业&#xff0c;特别是汽车行业常用的导线连接方式。汽车整车线束又由许多分支线束组成&#xff0c;而分支线束必须通过连接器实现连接&am…

kafka系列(二)

本章承接kafka一内容&#xff0c;文章在本人博客主页都有&#xff0c;可以自行点击浏览。 幂等性 请求执行多次&#xff0c;但执行的结果是一致的。 如果&#xff0c;某个系统是不具备幂等性的&#xff0c;如果用户重复提交了某个表格&#xff0c;就可能会造成不良影响。例如…

【React基础】– JSX语法

文章目录 认识JSX为什么React选择了JSXJSX的使用 React事件绑定this的绑定问题事件参数传递 React条件渲染React列表渲染列表中的key JSX的本质createElement源码Babel官网查看直接编写jsx代码 虚拟DOM的创建过程jsx – 虚拟DOM – 真实DOM声明式编程 阶段案例练习 认识JSX ◼ …

PPT大神带你飞!!!

1、OneKeyTools 官网&#xff1a;http://oktools.xyz/ OneKeyTools是一款免费开源的PowerPoint第三方平面设计辅助插件&#xff0c;功能涵盖了形状、调色、三维、图片处理、辅助功能等等方面。 插件功能&#xff1a; 插件从面世逐步受到广大PPT设计师和爱好者的追捧&#x…

2024美赛数学建模思路 - 案例:异常检测

文章目录 赛题思路一、简介 -- 关于异常检测异常检测监督学习 二、异常检测算法2. 箱线图分析3. 基于距离/密度4. 基于划分思想 建模资料 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 一、简介 – 关于异常…