前言
rocketmq 发送顺序消息和普通消息的主流程区别大部分一致的,区别在于:普通消息发送时,从所有broker的队列集合中 轮询选择一个队列,而顺序队列可以提供用户自定义消息队列选择器,从NameServer 分配的顺序 broker集合中选择一个队列。
源码版本:4.9.3
源码架构图
源码分析
发送普通消息源码在另外一篇文章https://blog.csdn.net/hzwangmr/article/details/135411495,这里主要阅读和普通消息有差异的部分。
顺序消息源码入口
org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.MessageQueueSelector, java.lang.Object)
可以看到系统提供了一个 MessageQueueSelector 消息队列选择器,用于自定义选择队列的逻辑。
/*** Same to {@link #send(Message)} with message queue selector specified.** @param msg Message to send.* @param selector Message queue selector, through which we get target message queue to deliver message to.* @param arg Argument to work along with message queue selector.* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.* @throws MQClientException if there is any client error.* @throws RemotingException if there is any network-tier error.* @throws MQBrokerException if there is any error with broker.* @throws InterruptedException if the sending thread is interrupted.*/@Overridepublic SendResult send(Message msg, MessageQueueSelector selector, Object arg)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg, selector, arg);}
触发队列选择器源码
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl
这里可以看到在调用发送消息核心 sendKernelImpl() 方法之前,会调用 selector.select() 函数,执行我们自定的选择逻辑。那么自定义的逻辑具体是什么呢?接着往下看
private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);// 获取topic路由数据TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {// 解析发布消息队列List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());// 克隆消息Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);// 利用消息队列选择器,选择一个队列mq = mQClientFactory.getClientConfig().queueWithNamespace(// 自定义选择队列selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue threw exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {// 如果选择出来的 MessageQueue 存在,这调用核心发送消息函数,发送消息return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}validateNameServerSetting();throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);}
源码工程自带发送顺序消息实例
可以看到,工程自带的发送顺序消息的example实例,针对 id相同的数据,选择了相同的消息队列,这样对于同一个实体的数据变化一定是有顺序的。
public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 自定义选择逻辑,可以理解为将相同的 orderId订单id,投递到相同的队列中Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}
}
其他
topic对应的顺序队列怎么来的?
mq生产者客户端,在发送消息前,会从 NameServer中读取消息队里指定 topic对应的 topic路由信息,然后写到转换和缓存在内存多个数据结构里。其中,有一处就是下方的将 topic路由数据转换为topic 已发布信息,这里写入了messageQueueList (顺序消息待选择队列)。
转换逻辑是:
- 将路由信息写入topic发布信息;
- 判断当前路由信息是不是顺序 topic 配置;
- 解析顺序消息 topic配置;
- 遍历 broker,遍历broker 队列数量;
- 封装消息队列数据。topic -> broker ->队里id;
- 顺序队列数据,添加到 MessageQueueList 集合中;
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {TopicPublishInfo info = new TopicPublishInfo();// 写入路由数据info.setTopicRouteData(route);// 顺序消息配置,顺序消息分配了多少个brokerif (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {// 是顺序消息// 解析顺序消息topic配置String[] brokers = route.getOrderTopicConf().split(";");// 遍历顺序 broker,从namersrv拿到的broker数量for (String broker : brokers) {String[] item = broker.split(":");// 特定broker中的队列数量int nums = Integer.parseInt(item[1]);for (int i = 0; i < nums; i++) {// 分装队列数据,topic--brokerName--队列idMessageQueue mq = new MessageQueue(topic, item[0], i);info.getMessageQueueList().add(mq);}}// 是顺序topic,打标info.setOrderTopic(true);}}