RocketMQ源码 发送顺序消息源码分析

前言

rocketmq 发送顺序消息和普通消息的主流程区别大部分一致的,区别在于:普通消息发送时,从所有broker的队列集合中 轮询选择一个队列,而顺序队列可以提供用户自定义消息队列选择器,从NameServer 分配的顺序 broker集合中选择一个队列。

源码版本:4.9.3

源码架构图

源码分析

发送普通消息源码在另外一篇文章https://blog.csdn.net/hzwangmr/article/details/135411495,这里主要阅读和普通消息有差异的部分。

顺序消息源码入口

org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.MessageQueueSelector, java.lang.Object)

可以看到系统提供了一个 MessageQueueSelector 消息队列选择器,用于自定义选择队列的逻辑。

    /*** Same to {@link #send(Message)} with message queue selector specified.** @param msg Message to send.* @param selector Message queue selector, through which we get target message queue to deliver message to.* @param arg Argument to work along with message queue selector.* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.* @throws MQClientException if there is any client error.* @throws RemotingException if there is any network-tier error.* @throws MQBrokerException if there is any error with broker.* @throws InterruptedException if the sending thread is interrupted.*/@Overridepublic SendResult send(Message msg, MessageQueueSelector selector, Object arg)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg, selector, arg);}

触发队列选择器源码

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl

这里可以看到在调用发送消息核心 sendKernelImpl() 方法之前,会调用 selector.select() 函数,执行我们自定的选择逻辑。那么自定义的逻辑具体是什么呢?接着往下看

    private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);// 获取topic路由数据TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {// 解析发布消息队列List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());// 克隆消息Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);// 利用消息队列选择器,选择一个队列mq = mQClientFactory.getClientConfig().queueWithNamespace(// 自定义选择队列selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue threw exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {// 如果选择出来的 MessageQueue 存在,这调用核心发送消息函数,发送消息return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}validateNameServerSetting();throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);}

源码工程自带发送顺序消息实例

可以看到,工程自带的发送顺序消息的example实例,针对 id相同的数据,选择了相同的消息队列,这样对于同一个实体的数据变化一定是有顺序的。

public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 自定义选择逻辑,可以理解为将相同的 orderId订单id,投递到相同的队列中Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}
}

其他

topic对应的顺序队列怎么来的?

mq生产者客户端,在发送消息前,会从 NameServer中读取消息队里指定 topic对应的 topic路由信息,然后写到转换和缓存在内存多个数据结构里。其中,有一处就是下方的将 topic路由数据转换为topic 已发布信息,这里写入了messageQueueList (顺序消息待选择队列)。

转换逻辑是:

  1. 将路由信息写入topic发布信息;
  2. 判断当前路由信息是不是顺序 topic 配置;
  3. 解析顺序消息 topic配置;
  4. 遍历 broker,遍历broker 队列数量;
    1. 封装消息队列数据。topic -> broker ->队里id;
  5. 顺序队列数据,添加到 MessageQueueList 集合中;
    public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {TopicPublishInfo info = new TopicPublishInfo();// 写入路由数据info.setTopicRouteData(route);// 顺序消息配置,顺序消息分配了多少个brokerif (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {// 是顺序消息// 解析顺序消息topic配置String[] brokers = route.getOrderTopicConf().split(";");// 遍历顺序 broker,从namersrv拿到的broker数量for (String broker : brokers) {String[] item = broker.split(":");// 特定broker中的队列数量int nums = Integer.parseInt(item[1]);for (int i = 0; i < nums; i++) {// 分装队列数据,topic--brokerName--队列idMessageQueue mq = new MessageQueue(topic, item[0], i);info.getMessageQueueList().add(mq);}}// 是顺序topic,打标info.setOrderTopic(true);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/328040.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CRM市场营销管理功能,如何进行客户细分和数据分析?

CRM管理系统中的营销管理模块&#xff0c;它的锋芒常被销售管理所掩盖&#xff0c;但对于企业的业务来说同样重要。营销部门虽然不像销售人员一样直接面对客户&#xff0c;却是挖掘线索、商机的重要角色。CRM在市场营销领域的关键功能包括&#xff1a;营销漏斗、客户细分、营销…

二手买卖、废品回收小程序 在app.json中声明permission scope.userLocation字段 教程说明

处理二手买卖、废品回收小程序 在app.json中声明permission scope.userLocation字段 教程说明 sitemapLocation 指明 sitemap.json 的位置&#xff1b;默认为 ‘sitemap.json’ 即在 app.json 同级目录下名字的 sitemap.json 文件 找到app.json这个文件 把这段代码加进去&…

2024PMP考试新考纲-【人员领域】近期典型真题和超详细解析(5)

今天华研荟继续为您分享PMP新考纲下的【人员People领域】近年真题&#xff0c;帮助大家举一反三&#xff0c;一次性通过2024年的PMP考试。 2024年PMP考试新考纲-【人员领域】真题解析21 题&#xff1a;项目经理正在为一个项目工作。该项目由于人员流动&#xff0c;相关方登记册…

STM32入门教程-2023版【3-2】STM32如何使用库函数及几种方法

关注 点赞 不错过精彩内容 大家好&#xff0c;我是硬核王同学&#xff0c;最近在做免费的嵌入式知识分享&#xff0c;帮助对嵌入式感兴趣的同学学习嵌入式、做项目、找工作! 五、库函数的使用方法 &#xff08;1&#xff09;第一种 想使用库函数&#xff0c;可以先打开.h文件&…

C语言第三方库Melon开箱即用之词法分析器使用

之前的文章中&#xff0c;笔者介绍了Linux/UNIX C语言库Melon的基本功能及框架使用。 本文将介绍Melon中的词法分析器组件。 Melon的Github仓库为&#xff1a;https://github.com/Water-Melon/Melon 词法分析器在Melon中并不依赖于自身框架&#xff0c;因此可以在不初始化框…

2.3_7 生产者-消费者问题

2.3_7 生产者-消费者问题 系统中有一组生产者进程和一组消费者进程,生产者进程每次生产一个产品放入缓冲区,消费者进程每次从缓冲区中取出一个产品并使用。(注:这里的“产品”理解为某种数据) 生产者、消费者共享一个初始为空、大小为n的缓冲区。 只有缓冲区没满时,生产者才…

PHP在线sqlite转html表格小功能(sqlite2html)

6KB PHP实现在线sqlite转html表格小功能(支持大文件上传,得到一表一文件) 可自定义&#xff1a;上传限制大小&#xff1b;支持后缀格式!下载格式位压缩包&#xff0c;内含一表一个html文件。 作用&#xff1a;程序员实用工具&#xff0c;上传sqlite数据得到html表格数据供本地…

嵌入式培训机构四个月实训课程笔记(完整版)-Linux系统编程第四天-Linux管道(物联技术666)

更多配套资料CSDN地址:点赞+关注,功德无量。更多配套资料,欢迎私信。 物联技术666_嵌入式C语言开发,嵌入式硬件,嵌入式培训笔记-CSDN博客物联技术666擅长嵌入式C语言开发,嵌入式硬件,嵌入式培训笔记,等方面的知识,物联技术666关注机器学习,arm开发,物联网,嵌入式硬件,单片机…

课堂分享 | IT与OT是什么?

长期以来信息技术IT和操作运营技术OT是相互隔离的&#xff0c;随着大数据分析和边缘计算业务的对现场级实时数据的采集需求&#xff0c;IT和OT有了逐渐融合的趋势。IT与OT融合&#xff0c;它赋予工厂的管理者监控运行和过程的能力大为增强&#xff0c;甚至可以预测到可能发生的…

Leetcode2967. 使数组成为等数数组的最小代价

Every day a Leetcode 题目来源&#xff1a;2967. 使数组成为等数数组的最小代价 解法1&#xff1a;贪心 题目中要求将数组变成等数数组&#xff08;数组中的所有元素都等于一个小于 109 的回文数&#xff09;。因此&#xff0c;我们需要找到一个小于 109 的回文数&#xf…

Vue新手村(一)

目录 1、Vue简介——Vue的特点 2、Vue的第一个页面 3.Vue的简单使用介绍 3.1、{{ }}的使用 3.2、v-text和v-html 3.2.1、v-text和{{ }}的区别 3.2.2、v-html和v-text的区别 3.3、v-on【事件绑定】 3.3.1、绑定事件的语法 3.3.2、语法简化 3.3.3、传参 3.4、v-show和…

Java中请求生成唯一追溯TraceId

Java中请求生成唯一追溯TraceId 一&#xff1a;背景 因为是微服务架构,平常日志太多,看日志不太好查,所以想要从一整个链路当中获取一个唯一标识,比较好定位问题&#xff0c; 原理就是从gateway网关将标识传递到下游,下游服务拿到这个标识,响应结束后将traceId反向写入响应体…