RocketMQ源码剖析之createUniqID方法

目录

版本信息:

写在前面:

源码剖析:

总计:


版本信息:

RocketMQ-5.1.3

源码地址:https://github.com/apache/rocketmq

写在前面:

首先,笔者先吐槽一下RocketMQ的官方,源码中啥注释都没有,虽然文档给的多,但是很多都是版本过时不及时更新,阅读者只能靠自己的强硬的技术去理解~

回归正题,如今互联网的技术离不开微服务、分布式的体系,所以在分布式的体系中如何创建一个全局唯一的ID是大家所面对的问题。现大厂都提出了解决方案:Twitter的雪花算法(Snowflake)、美团的Leaf算法、以及Mysql、Redis 这种自带原子性操作的中间件。

当然RocketMQ为分布式而生的消息队列中间件肯定也需要有他的分布式ID解决方案(虽然笔者不知道该如何称呼,源码中也没有给出)~ 

源码剖析:

createUniqID 方法是本文章所论述的点,此方法在生产者往Broker 发送消息时,给发送的消息创建一个唯一KEY时调用。

public static void setUniqID(final Message msg) {// 如果用户自定义了唯一key,RocketMQ就不提供默认实现// 否则RocketMQ调用createUniqID 方法提供默认的实现if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());}
}

在看createUniqID之前,我们先需要看一些变量的初始化作为看createUniqID 方法的铺垫~

org.apache.rocketmq.common.message MessageClientIDSetter类中。

public class MessageClientIDSetter {private static final int LEN;                   // 原有长度private static final char[] FIX_STRING;         // 变化后的char字符数组(其实就是字符串)private static final AtomicInteger COUNTER;     // 原子变量private static long startTime;                  // 记录开始时间private static long nextStartTime;              // 记录最后时间(用于更新)static {byte[] ip;try {// 获取到本机的IP地址。// 一共占用4个字节。ip = UtilAll.getIP();} catch (Exception e) {ip = createFakeIP();}// 4(ip) + 2(pid进程id) + 4(类加载器的HashCode) + 4(时间差值) + 2(自增位) LEN = ip.length + 2 + 4 + 4 + 2;// 拼接处理分布式体系的10字节// 处理 本机IP + JVM进程PID + HashCodeByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);tempBuffer.put(ip);tempBuffer.putShort((short) UtilAll.getPid());tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());// 把10字节中的内容 作为索引值 转换成16进制的字符串表示// 简单来说,这一步就是编码,因为ID不可能用负数或者二进制01表示。FIX_STRING = UtilAll.bytes2string(tempBuffer.array()).toCharArray();// 设置当前启动的时间(用来4字节的计算时间差值)// 并且设置末尾时间,末尾时间用来更新时间// 如果有小伙伴看过雪花算法,就明白,雪花算法的时间差值是41位,限制只能用多少年,而这里做了优化,动态更新时间。// 这里的起始时间是本月的1号。// 末尾时间是下月的1号。setStartTime(System.currentTimeMillis());// 原子性自增,用于最后2位的自增位。COUNTER = new AtomicInteger(0);}
}

这里是核心所在,所以在提供的源码中笔者有非常详细的注释,并且这里做一个总结:

  1. RocketMQ的分布式ID算法核心就在这里,用了16字节表示:4(本机IP) + 2(进程的PID) + 4(类加载器的HashCode) + 4(时间差值) + 2(自增位)
  2. 本机IP + 进程PID + 类加载器HashCode 解决了分布式环境下集群的重复可能性
  3. 最后2位的自增位,用于处理本机RocketMQ的并发重复可能性
  4. 时间差值用于解码时获得创建的时间

看到这里,有读者会问,那源码中FIX_STRING 变量是干啥的,这很简单,如上图所示总共16字节,因为byte用10进制可能会有负数,作为分布式ID总不能是一串负数或者二进制01表示把。所以RocketMQ用16字节的Byte数组转换成 16进制的字符串表示,存储在FIX_STRING中。

这里需要注意,在上文的初始化代码中,只对 本机IP + JVM进程PID + HashCode做了处理,后续的时间差值和自增位在createUniqID方法中做处理。

以上的铺垫已做完,直接看到org.apache.rocketmq.common.message MessageClientIDSetter类中createUniqID方法

public static String createUniqID() {// 在Java中byte占用一个字节,char占用2个字节// 所以这里需要创建LEN * 2 的char数组来存放完 16字节的数据。char[] sb = new char[LEN * 2];// 在上文的初始化中把 IP + PID + HashCode 16进制字符串放入到FIX_STRING// 这里把FIX_STRING拷贝到sb中。System.arraycopy(FIX_STRING, 0, sb, 0, FIX_STRING.length);long current = System.currentTimeMillis();// 是否需要更新时间。if (current >= nextStartTime) {setStartTime(current);}// 计算出运行时间差值。int diff = (int)(current - startTime);if (diff < 0 && diff > -1000_000) {diff = 0;}// 获取到长度,这个长度作为索引。int pos = FIX_STRING.length;// 这里填充了4字节的时间差值UtilAll.writeInt(sb, pos, diff);pos += 8;// 这里填充了2字节的自增位。UtilAll.writeShort(sb, pos, COUNTER.getAndIncrement());// char数组转换成字符串。return new String(sb);}
  1. 获取到初始化中初始的FIX_STRING字段,此字段已经处理了本机IP + JVM进程PID + HashCode,后续的时间差值 和 自增位还没做处理,下文会对其做处理
  2. 获取到当前时间,判断是否需要更新时间(没个月月初更新)
  3. 得到时间差值赋值给diff变量,并且转换成16进制的字符表示
  4. 获取到自增值,并且转换成16进制的字符表示
  5. 最终把16进制的 char数组转换成String对象
  6. 整个分布式的ID 创建过程完毕。

总计:

只需要记住三部分

  1. 第一部分用于处理分布式的重复可能性(IP + PID + HashCode)
  2. 第二部分用于记录创建时间
  3. 第三部分用于处理机器的并发创建ID的重复可能性(原子变量解决)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/229845.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AMIS【部署 01】amis前端低代码框架可视化编辑器amis-editor本地部署流程

amis-editor本地部署流程 1.amis-editor是什么1.1 amis是什么1.2 amis-editor是什么 2.amis-editor本地部署2.1 准备阶段2.2 源码修改2.3 构建项目2.4 nginx配置2.5 启动nginx 3.总结 官网仅贴出了本地运行这个项目的步骤&#xff1a; # 1.安装依赖 npm i # 2.等编译完成后本地…

springboot基础配置及maven运行

目录 1、spring快速开始&#xff1a; 2、通过idea工具打开导入包 3、maven打包 1、springboot快速开始&#xff1a; 环境依赖&#xff1a;jdk17 Spring | Quickstart spring初始化包下载&#xff1a; 点击generate&#xff0c;下载包 2、通过idea工具打开导入包 我之前写了…

多模态大模型总结2(主要2023年)

LLaVA-V1&#xff08;2023/04&#xff09; 论文&#xff1a;Visual Instruction Tuning 网络结构 如下图 所示为 LLaVA-v1 的模型结构&#xff0c;可以看出其简化了很多&#xff0c;但整体来说还是由三个组件构成&#xff1a; Vision Encoder&#xff1a;和 Flamingo 模型的 V…

基于Pix2Struct的文档信息提取【DocVQA】

文档信息提取涉及使用计算机算法从非结构化或半结构化文档&#xff08;例如报告、电子邮件和网页&#xff09;中提取结构化数据&#xff08;例如员工姓名、地址、职务、电话号码等&#xff09;。 提取的信息可用于各种目的&#xff0c;例如分析和分类。 DocVQA&#xff08;文档…

RHCSA---基本命令使用

文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 前言 Linux中终端中的很多操作都是通过命令行实现的&#xff0c;最常用的输入命令的方法有以下两种。 (1).打开自带的终端&#xff0c;类似于Windows中的CMD (2).ssh远程连接&#xff0c;关于…

MYSQL存储

注意&#xff1a; 1.如果没有指定的SESSION/GLOBAL&#xff0c;默认是SESSION&#xff0c;会话变量。 2.mysql服务重新启动之后&#xff0c;所设置的全局参数会失效&#xff0c;要想不失效&#xff0c;可以在/etc/my.cnf中配置。 变量 用户定义变量是用户根据需要自己定义变量…

注解Annotation - Java

注解Annotation 一、介绍二、使用三、三个基本的Annotation四、JDK内置的基本注解类型1、Override2、Deprecated3、SuppressWarnings 五、JDK的元注解1、Retention2、Target3、Documented4、Inherited 一、介绍 注解&#xff08;Annotation&#xff09;也被称为元数据&#xf…

【分布式系统学习】CAP原理详解

CAP原理详解 前言CAP一张图 一、概念1.1 关键词解读1.2 关于CAP&#xff08;拆分解读&#xff09;1.3 CAP原理精髓 二、CAP模拟场景举例理解三、CAP原理证明为什么不能同时满足&#xff08;下面举例说明&#xff09;3.1 必须满足分区容错性P下的处理方式3.2 不是必须满足分区容…

如何通过“闻香”给葡萄酒分类?

有句话叫做“闻香识女人”&#xff0c;葡萄酒也如同美女&#xff0c;千娇百媚风情万种&#xff0c;所以通过“闻香”也可以给葡萄酒进行分类。 那么&#xff0c;云仓酒庄的品牌雷盛红酒分享葡萄酒都有哪些不同的香呢&#xff1f; 云仓酒庄是云仓酒庄的结合&#xff0c;也就是在…

可以免费使用的Axure在线版来了

Axure作为一种功能强大的原型设计工具&#xff0c;一直受到设计师的青睐。然而&#xff0c;其高昂的价格可能成为一个门槛&#xff0c;限制了一些设计师的选择。但不用担心&#xff0c;现在有一个免费的Axure在线工具即时设计&#xff0c;功能更完整&#xff0c;更划算&#xf…

Redis之秒杀系统

目录 Redis 秒杀 Mysql数据库设计 Mysql秒杀实现 MysqlRedis秒杀实现 秒杀是一种高并发场景&#xff0c;通常指的是在短时间内&#xff08;秒级别&#xff09;有大量用户同时访问某个商品或服务&#xff0c;争相抢购的情景。在这种情况下&#xff0c;系统需要处理大量并发请…

P27 C++this 关键字

目录 前言 01 this关键字的引入 02 this关键字 前言 本章的主题是 C 中的 this 关键字。 以前第一次学qt的时候就遇到了this关键字&#xff0c;那时候还不是很会C&#xff0c;所以有点懵&#xff0c;现在我们就来讲解以下C中的this关键字 C 中有一个关键字 this&#xff0…