RocketMQ源码阅读-Message顺序发送与消费
- 1 普通顺序消息
- 2 严格顺序消息
- 2.1 分配消息队列
- 2.2 移除消息队列
- 2.3 消费消息队列
- 2.3.1 消费消息
- 2.3.2 处理消费结果
- 2.3.3 消息处理队列ProcessQueue
- 2.3.4 小结
- 3 总结
Message的拉取与消费逻辑,上一篇已经分析过。
这一篇看一下消息的消费顺序。
RocketMQ有两种顺序类型:
- 普通顺序消息:Producer会将相同的消息发送到相同的消息队列(性能好)
- 严格顺序消息:在普通顺序消息的基础上,Consumer严格顺序消费
一般情况下,只会使用到普通顺序消息,不必保证严格有序。
官方文档中说明目前已知只有一种情况会要求严格顺序,数据库 binlog 同步。
1 普通顺序消息
public static void main(String[] args) {try {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.start();String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;String body = "Hello RocketMQ " + i;Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (Exception e) {throw new RuntimeException(e);}
}
上面例子中,创建了一个Producer,并且发送消息时,创建了一个选择器,选择其中使用id % mqs.size() 进行消息队列的选择。传递了orderId作为参数,那么相同的orderId能够分配到同一个消息队列中。保证了消息的顺序。
看一下 MessageQueueSelector 接口:
public interface MessageQueueSelector {/*** 选择消息队列** @param mqs 消息队列* @param msg 消息* @param arg 参数* @return 消息队列*/MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
发送消息最终会调用到DefaultMQProducerImpl#sendSelectImpl:
private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);// 选择消息mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue throwed exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {// 发送消息return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
可以看到代码中选择消息和发送消息的位置。
2 严格顺序消息
Consumer能够实现严格顺序消息。
通过三把锁来实现严格顺序消费。
- Broker消息队列锁(分布式锁):
- 集群模式下,Consumer从Broker获取到该锁后,才能进行消息拉取、消费
- 广播模式下,Consumer不需要获取锁
- Consumer消息队列锁(本地锁):Consumer获得该锁才能操作消息队列
- Consumer消息处理队列消费锁(本地锁):Consumer获得该锁才能操作消息队列
2.1 分配消息队列
集群模式下,Consumer 更新属于自己的消息队列时,会向 Broker 锁定该消息队列(广播模式下不需要)。如果锁定失败,则更新失败,即该消息队列不属于自己,不能进行消费。
相关源代码如下RebalanceImpl#updateProcessQueueTableInRebalance:
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {boolean changed = false;Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();if (mq.getTopic().equals(topic)) {if (!mqSet.contains(mq)) {pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}} else if (pq.isPullExpired()) {switch (this.consumeType()) {case CONSUME_ACTIVELY:break;case CONSUME_PASSIVELY:pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",consumerGroup, mq);}break;default:break;}}}}List<PullRequest> pullRequestList = new ArrayList<PullRequest>();for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {// 顺序消息锁定消息队列if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}this.removeDirtyOffset(mq);ProcessQueue pq = new ProcessQueue();long nextOffset = this.computePullFromWhere(mq);if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}this.dispatchPullRequest(pullRequestList);return changed;
}
代码第43行,可以看到,当是顺序消息时,需要锁定消息队列。
锁定操作调用方法为RebalanceImpl#lock:
public boolean lock(final MessageQueue mq) {FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);if (findBrokerResult != null) {LockBatchRequestBody requestBody = new LockBatchRequestBody();requestBody.setConsumerGroup(this.consumerGroup);requestBody.setClientId(this.mQClientFactory.getClientId());requestBody.getMqSet().add(mq);try {// 请求Broker获得指定消息队列的分布式锁Set<MessageQueue> lockedMq =this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);// 设置消息处理队列锁定成功。锁定消息队列成功,可能本地没有消息处理队列,设置锁定成功会在lockAll()方法。for (MessageQueue mmqq : lockedMq) {ProcessQueue processQueue = this.processQueueTable.get(mmqq);if (processQueue != null) {processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());}}boolean lockOK = lockedMq.contains(mq);log.info("the message queue lock {}, {} {}",lockOK ? "OK" : "Failed",this.consumerGroup,mq);return lockOK;} catch (Exception e) {log.error("lockBatchMQ exception, " + mq, e);}}return false;
}
可以看到,严格顺序消息,rebalance更新队列时,会获取队列锁,如果锁定失败,新增消息处理队列失败。
Broker 消息队列锁会过期,默认配置 30s。因此,Consumer 需要不断向 Broker 刷新该锁过期时间,默认配置 20s 刷新一次。源码ConsumeMessageOrderlyService#start:
public void start() {if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {ConsumeMessageOrderlyService.this.lockMQPeriodically();}}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);}
}
2.2 移除消息队列
集群模式下,Consumer 移除自己的消息队列时,会向 Broker 解锁该消息队列(广播模式下不需要)。
源码RebalancePushImpl#removeUnnecessaryMessageQueue:
该方法的作用是移除不需要的队列相关的信息。
@Override
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {// 持久化队列的消费进度this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);// 移除this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);// 集群模式下,顺序消费移除时,解锁对队列的锁定if (this.defaultMQPushConsumerImpl.isConsumeOrderly()&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {try {if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {try {// 延迟解锁return this.unlockDelay(mq, pq);} finally {pq.getLockConsume().unlock();}} else {log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",mq,pq.getTryUnlockTimes());pq.incTryUnlockTimes();}} catch (Exception e) {log.error("removeUnnecessaryMessageQueue Exception", e);}return false;}return true;
}
获取消息队列消费锁,避免和消息队列消费冲突。如果获取锁失败,则移除消息队列失败,等待下次重新分配消费队列时,再进行移除。如果未获得锁而进行移除,则可能出现另外的 Consumer 和当前 Consumer 同时消费该消息队列,导致消息无法严格顺序消费。
会调用延迟解锁方法RebalancePushImpl#unlockDelay:
private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {if (pq.hasTempMessage()) {log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {@Overridepublic void run() {log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);RebalancePushImpl.this.unlock(mq, true);}}, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);} else {// 消息处理队列不存在,直接解锁this.unlock(mq, true);}return true;
}
解锁 Broker 消息队列锁。如果消息处理队列存在剩余消息,则延迟解锁 Broker 消息队列锁。
2.3 消费消息队列
源码DefaultMQPushConsumerImpl类中300行,创建了一个PullCallback,作为拉取消息成功的回调,325行调用ConsumeMessageOrderlyService(严格顺序消息处理类)的submitConsumeRequest方法
@Override
public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume) {if (dispathToConsume) {ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);this.consumeExecutor.submit(consumeRequest);}
}
方法创建了一个ConsumeRequest提交给了线程池。这个ConsumeRequest在下面进行分析。
2.3.1 消费消息
消费消息的流程如下图(来源https://www.iocoder.cn/RocketMQ/message-send-and-consume-orderly/?github&1601):
接着看ConsumeRequest的分析。
ConsumeRequest是ConsumeMessageOrderlyService的内部类:
类源码如下:
class ConsumeRequest implements Runnable {// 消息处理队列private final ProcessQueue processQueue;// 消息队列private final MessageQueue messageQueue;public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {this.processQueue = processQueue;this.messageQueue = messageQueue;}public ProcessQueue getProcessQueue() {return processQueue;}public MessageQueue getMessageQueue() {return messageQueue;}@Overridepublic void run() {if (this.processQueue.isDropped()) {log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}// 获得 Consumer 消息队列锁final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {// (广播模式) 或者 (集群模式 && Broker消息队列锁有效)if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {final long beginTime = System.currentTimeMillis();for (boolean continueConsume = true; continueConsume; ) {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);break;}// 消息队列分布式锁未锁定,提交延迟获得锁并消费请求if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& !this.processQueue.isLocked()) {log.warn("the message queue not locked, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}// 消息队列分布式锁已经过期,提交延迟获得锁并消费请求if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& this.processQueue.isLockExpired()) {log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}// 当前周期消费时间超过连续时长,默认:60s,提交延迟消费请求。默认情况下,每消费1分钟休息10ms。long interval = System.currentTimeMillis() - beginTime;if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);break;}// 获取消费消息。此处和并发消息请求不同,并发消息请求已经带了消费哪些消息。final int consumeBatchSize =ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());if (!msgs.isEmpty()) {final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);ConsumeOrderlyStatus status = null;ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);// init the consume context typeconsumeMessageContext.setProps(new HashMap<String, String>());ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}// 执行消费long beginTimestamp = System.currentTimeMillis();ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;boolean hasException = false;try {// 获取消息处理队列锁this.processQueue.getLockConsume().lock();if (this.processQueue.isDropped()) {log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue);break;}status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue);hasException = true;} finally {// 释放消息处理队列锁this.processQueue.getLockConsume().unlock();}if (null == status|| ConsumeOrderlyStatus.ROLLBACK == status|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue);}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeOrderlyStatus.SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);// 处理消费结果continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);} else {continueConsume = false;}}} else {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);}}}}
源码分析详细见上面代码片段中的注释。主要有以下流程:
- 获取Consumer消息队列锁
- 从消息队列顺序获取消息
- 获取消息处理队列ProcessQueue锁,(此锁相比Consumer消息队列锁,粒度更小)
- 执行消费
- 释放消息处理队列锁
2.3.2 处理消费结果
上一节源码第150行,执行处理消费结果的逻辑,调用的方法为ConsumeMessageOrderlyService#processConsumeResult方法:
// 处理消费结果,并返回是否继续消费
public boolean processConsumeResult(final List<MessageExt> msgs,final ConsumeOrderlyStatus status,final ConsumeOrderlyContext context,final ConsumeRequest consumeRequest
) {boolean continueConsume = true;long commitOffset = -1L;if (context.isAutoCommit()) {// 自动提交的消息switch (status) {case COMMIT:case ROLLBACK:log.warn("the message queue consume result is illegal, we think you want to ack these message {}",consumeRequest.getMessageQueue());case SUCCESS:// 消息消费成功,提交到消息处理队列commitOffset = consumeRequest.getProcessQueue().commit();// 统计this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());break;case SUSPEND_CURRENT_QUEUE_A_MOMENT:// 统计this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());// 计算是否暂时挂起消费N毫秒,默认10msif (checkReconsumeTimes(msgs)) {// 设置消息重新消费consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);// 提交延时消费请求this.submitConsumeRequestLater(consumeRequest.getProcessQueue(),consumeRequest.getMessageQueue(),context.getSuspendCurrentQueueTimeMillis());continueConsume = false;} else {commitOffset = consumeRequest.getProcessQueue().commit();}break;default:break;}} else {switch (status) {case SUCCESS:// 统计this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());break;case COMMIT:// 提交消息已消费成功到消息处理队列commitOffset = consumeRequest.getProcessQueue().commit();break;case ROLLBACK:// 设置消息重新消费consumeRequest.getProcessQueue().rollback();this.submitConsumeRequestLater(consumeRequest.getProcessQueue(),consumeRequest.getMessageQueue(),context.getSuspendCurrentQueueTimeMillis());continueConsume = false;break;case SUSPEND_CURRENT_QUEUE_A_MOMENT:// 统计this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());// 计算是否暂时挂起消费N毫秒,默认:10msif (checkReconsumeTimes(msgs)) {// 设置消息重新消费consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);// 提交延迟消费请求this.submitConsumeRequestLater(consumeRequest.getProcessQueue(),consumeRequest.getMessageQueue(),context.getSuspendCurrentQueueTimeMillis());continueConsume = false;}break;default:break;}}// 消息处理队列未dropped,提交有效消费进度if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);}return continueConsume;
}private int getMaxReconsumeTimes() {// default reconsume times: Integer.MAX_VALUEif (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {return Integer.MAX_VALUE;} else {return this.defaultMQPushConsumer.getMaxReconsumeTimes();}
}// 计算是否暂时挂起消费N毫秒,不暂停条件:存在消息都超过最大消费次数并且都发回broker成功,返回是否要暂停
private boolean checkReconsumeTimes(List<MessageExt> msgs) {boolean suspend = false;if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));if (!sendMessageBack(msg)) {suspend = true;msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);}} else {suspend = true;msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);}}}return suspend;
}// 消息发回Broker,对应的队列时死信队列
public boolean sendMessageBack(final MessageExt msg) {try {// 超出最大重新消耗次数(默认 :16次),然后发送到死信队列。Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;
}
此方法根据不同的消费结果状态执行不同的逻辑,消费结果状态共有四种:
public enum ConsumeOrderlyStatus {/*** 消费成功但不提交*/SUCCESS,/*** 消费失败,消费回滚(已过期,仅适用于binlog同步)*/@DeprecatedROLLBACK,/*** 消费成功提交并且提交(已过期,仅适用于binlog同步)*/@DeprecatedCOMMIT,/*** 消费失败,挂起一会,稍后继续消费*/SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
四种状态中ROLLBACK和COMMIT仅用于binlog同步,所以已经标注了@Deprecated。
普通顺序消息,也就是并发消费时,消费失败,Consume会将消费失败的消息发回Broker,等下次拉取再消费。
严格顺序消息,为保证顺序,只能挂起队列(上面代码的第99行函数),延迟一会再次消费。
如果失败次数达到上限(默认 :16次),Consumer会将此消息发回Broker的死信队列。
2.3.3 消息处理队列ProcessQueue
类ProcessQueue的签名是:队列消费快照。
用来存储消息队列的消费进度,也叫作消息处理队列。
其中的关键属性和方法为:
// 消息银蛇,key为消息队列的位置
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();/**
* msgTreeMap 的子集,仅在有序使用时使用
*/
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();// 回滚消费中的消息
public void rollback() {try {this.lockTreeMap.writeLock().lockInterruptibly();try {// consumingMsgOrderlyTreeMap 子集中的消息全部放回msgTreeMapthis.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);// 并清空 consumingMsgOrderlyTreeMapthis.consumingMsgOrderlyTreeMap.clear();} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("rollback exception", e);}
}// 提交消费中的消息为消费成功,返回消费进度
public long commit() {try {this.lockTreeMap.writeLock().lockInterruptibly();try {// 消费进度Long offset = this.consumingMsgOrderlyTreeMap.lastKey();msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {msgSize.addAndGet(0 - msg.getBody().length);}this.consumingMsgOrderlyTreeMap.clear();// 返回消费进度,偏移量+1if (offset != null) {return offset + 1;}} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("commit exception", e);}return -1;
}
// 指定消息重新消费
public void makeMessageToCosumeAgain(List<MessageExt> msgs) {try {this.lockTreeMap.writeLock().lockInterruptibly();try {for (MessageExt msg : msgs) {this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());this.msgTreeMap.put(msg.getQueueOffset(), msg);}} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("makeMessageToCosumeAgain exception", e);}
}
// 获取前batchSize条消息
public List<MessageExt> takeMessags(final int batchSize) {List<MessageExt> result = new ArrayList<MessageExt>(batchSize);final long now = System.currentTimeMillis();try {this.lockTreeMap.writeLock().lockInterruptibly();this.lastConsumeTimestamp = now;try {if (!this.msgTreeMap.isEmpty()) {for (int i = 0; i < batchSize; i++) {Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();if (entry != null) {result.add(entry.getValue());// 同时put进consumingMsgOrderlyTreeMap子集中,代表正在消费的消息consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());} else {break;}}}if (result.isEmpty()) {consuming = false;}} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("take Messages exception", e);}return result;
}
2.3.4 小结
ProcessQueue是消息处理队列,记录消费的进度,封装对于消息的流程控制方法。
3 总结
本篇分析了RocketMQ对于普通顺序消息和严格顺序消息的消费流程。
- 普通顺序消息:Producer会将相同的消息发送到相同的消息队列(性能好),在消费失败时会放回队列重新消费,但是先消费其他的。
- 严格顺序消息:在普通顺序消息的基础上,Consumer严格顺序消费。此种消费方式,在消费失败时会暂定消费一会再重新消费,直到达到一定的失败次数限制(默认16次),会发回到Broker的死信队列,然后跳过,继续消费后面的消息。