13-RocketMQ主从同步(HA实现)源码原理

目录

HAClient端

Master端

AcceptSocketService实现原理

HAConnection实现原理

ReadSocketService

WriteSocketService

GroupTransferService实现原理

五大线程的协调关系


 

 

HAClient端

首先要去connect一下master,从而建立一个SocketChannel连接通道,并把这个通道对应的文件描述符,注册到多路复用器selector上,并注册一个针对该通道的READ读事件,从而能读取Master发过来的心跳包、Commitlog数据

 

c006e8c0ffcc4cc7885e5f6e43c7497c.png

一个HAClient就是一个服务线程,负责处理当前slave与master之间连接中的数据往来

一个HAClient就持有一个Selector,也就是一个IO多路复用器

 

8c15af42da0f425a979d032cccc20c77.png

b364245fba1e4c8494be609280bbd602.png

通过死循环,来每隔一段时间,往master的ReadSocketService线程,发送一次心跳,心跳数据就是当前slave的commitlog最大物理偏移

核心逻辑就是,不断循环select,一旦发现通道中有数据到达,则将通道中的数据读取到byteBufferRead字节数组中来,然后根据粘包/拆包,将byteBufferRead字节数组中的数据,切分成一个个的数据包,每个数据包中附带一段commitlog,每次往磁盘commitlog文件末尾append追加一个数据包中的一段commitlog

这一套流程,都在HAClient这一个服务单线程中完成的

 

处理读事件processReadEvent

6ba13ef256fa4e39be7f23590ed1a376.png

使用while循环,如果连续三次都没有从通道中读取到数据,则跳出循环。这样写,就是因为前面selector.select(1000) ,没有判断返回值,而是不管超时不超时,流程都继续往后走了

3b34930ed5e349db99dcb1c802eea000.png

 

解析Master过来的每个commitlog数据包

313905dab9d34ecca97b3f7bf478546f.png

36e6f46ee0954899a981569b4e8dbeca.png

6e3243cb4ecb453694763aefc6dd3279.png

478行,slave每次接收到master发过来的一批commitlog数据时,会看master传过来的这段commitlog的起始端对应的全局物理偏移量,和slave本地存储的批commitlog数据的最大物理偏移量,是否相等。如果相等,也说明master端没有给slave漏掉某一段commitlog,说明本次master传过来的这段commitlog片段,可以直接拼接在slave本地的commitlog之后。如果不相等,则说明master端给slave漏掉某一段commitlog

490行,执行当前commitlog数据包的append到本地磁盘的动作,同时还会同步唤醒reputMessageService线程

492行,比如当前byteBufferRead有3.7个数据包,我们第一次while循环处理完并将第一个数据包中的commitlog数据追加到本地磁盘后,开始进行第二次while循环前,先将dispatchPosition划动到下一个数据包的开端,将position恢复到byteBufferRead中已有数据的末端,然后开始第二次while循环,开始准备将第二个数据包append到磁盘,注意,每轮while循环都会往master report上报一次slave的maxPhyPosition

一直到某一轮循环发现diff小于8+4后,来开始执行503行,并break跳出循环

503行,两块byteBufferRead交换逻辑,如果当前的byteBufferRead写满后,也交换另一块空的byteBuffer作为新的byteBufferRead

0aa6ed913acd4f8f97d6c4f67a356c55.png

注意,在append的时候,还会同步唤醒reputMessageService线程,来现场构建ConsumeQueue和IndexFile c0f25744ab92440285ed9d5018c0839c.png

每个commitlog数据包,append成功后,都会给master上报一次当前commitlog最大位置

 

两块byteBufferRead交换逻辑

c951cfed6952426284bc892c0b34938a.png

因为应用层,如果只定义一个byteBuffer字节数组,不停的从内核中读数据往里面写,它总有被写满的时候,所以定义了两个byteBuffer字节数组,轮着来写,上一个写满了就转头去写另一个,并且把上一个因为粘包拆包解析读取还剩余的比如0.7个数据包,拷贝到另一个byteBuffer字节数组中去


 

74a8d6d0f23d4acc90a2648544394371.png

当HAClient端的selector多路复用器,监听到有READ事件进来,也就是master端有数据写过来时,就会通过socketChannel.read(byteBufferRead),来把通道中的master写过来的心跳数据包、commitlog数据包数据,从内核读取到应用层的byteBufferRead中来

byteBufferRead,每次dispatchPosition都是一个完整数据包的开始位置、offset是后面带的那段commitlog数据,在master中commitlog磁盘总数据的全局物理偏移量、size是后面带的那段commitlog数据的字节数

此时,应用层就需要自己对byteBufferRead中的数据进行切分,也就是粘包拆包处理,因为此时byteBufferRead中可能有3.7个数据包

byteBufferRead中,dispatchPosition一个完整数据包的开始位置,position是byteBuffer当前数据末端所在的位置,此时

  1. 如果dispatchPosition和position之间的差值diff大于8+4字节,则可以进行解析将offset和size解析出来
  2. 如果dispatchPosition和position之间的差值diff小于8+4字节,则说明当前这段数据还不够拼装解析成offset和size,所以,就需要多路复用器继续等待新的IO READ事件的到来,应用线程从而能从内核通道中读取更多的字节数据,并将之追加到position往后的位置,直至dispatchPosition和position之间的差值diff大于8+4字节,则可以开始第1步的动作
  3. 解析出offset和size后,又要看diff是不是大于8+4 + bodySize,如果大于,则开始slave真正的将master传过来的commitlog append到磁盘本地commitlog文件后,如果小于,则多路复用器又继续等待新的IO READ事件
  4. 将dispatchPosition更新到下一个数据包,在byteBufferRead中的位置

 

粘包/半包问题

整个网络通信传输,RocketMQ采用了java nio包中的selector io多路复用技术,应用这个技术读取数据时,是直接面对的tcp数据,而tcp是面向字节流的而不是面向数据包的,所以,我们无法保证tcp通道的接收缓冲区中的字节流数据一定就是一个数据包,可能是0.8个数据包,可能是2.3个数据包,当然也可能刚好就是一个或者两个完整的数据包,这是不确定的事情

我们应用程序猿,能做的就是把内核中的tcp通道的接收缓冲区中的已经接收到的字节流数据,从内核读取到应用层的byte buffer字节数组中,然后在应用层自己根据数据包协议头的offset + dataSize,来进行拆包粘包

 

Master端

 

c3cb1083b3234ef3999baa84f47284b0.png

 

AcceptSocketService实现原理

76108b85c1ec40baa7714202791b4e5c.png

064a4a5a928347f8a641a37db0c5ac03.png

48f2cb2327664622937511a91ef754d8.png

如果一个Master有两个slave,则235行的集合中就有有两个HAConnection

 

HAConnection实现原理

f04e2a4f19f84626af895ab358efd51e.png

686f512cd3604ca0ab07304750518900.png

 

ReadSocketService

bb083f09131c4afd9f07f91de375290c.png

这个服务线程,在99行初始化的时候,该selector就被初始化为了关注READ读事件,也就是关注从slave写过来的数据

08d7e565677a4aab88314597f8ca1fbd.png

 

处理读事件

497e6f15a0e141b4a0f8b88f9b34fb46.png

1278170ace2d47dc9459aeea4f55299a.png

185行,可以看出这段READ逻辑,既用来读取slave发过来的响应当前slave已经有的commitlog的全局offset,也用来读取slave发过来的要拉取的某段commitlog的起始offset

slave向master返回拉取偏移量。这里有两重意义,对于slave来说,是发送下一次待拉取的消息偏移量、而对于master来说,既可以认为是slave本地请求拉取消息偏移量,也可以理解为slave的消息同步ACK确认消息

e77b58828761436a9b15ee726de2b064.png

因为我们当前就是处理最后一个包,前面的都处理过了

fb3bcc55529244e08cf7b5fda670a7d6.png

由于可能存在多个slave,多个slave就可能有多个连接,来同时连接到同一个master,这是就可能涉及到一些并发处理,比如有多个线程同时去更改push2SlaveMaxOffset这个值,所以97行是一个原子的更新操作

这里如果更新push2SlaveMaxOffset成功,则会去唤醒GroupTransferService线程,因为GroupTransferService每wait 1s才会醒来自己执行一轮

9a82e28a9d7a4f7b83a7b8f4fa2d0479.png

4665fbfe67ec457ab5e978ecdf1b2b0b.png

 

WriteSocketService

74a8d8ca09734e69a492aa03f355a0c7.png

 

WRITE事件

        @Overridepublic void run() {HAConnection.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// OP_WRITE,等待写事件this.selector.select(1000);/** 说明,master还未收到slave的拉取请求,放弃本次事件处理* slaveRequestOffset在收到slave broker拉取请求时更新* */if (-1 == HAConnection.this.slaveRequestOffset) {Thread.sleep(10);continue;}/** nextTransferFromWhere等于-1,表示初次进行数据传输,需要计算需要传输的物理偏移量* 如果不是开机以后的第一次传输,而是第二次以后的,就不会再进这一段if逻辑了* */if (-1 == this.nextTransferFromWhere) {/** 如果slaveRequestOffset等于0,则从当前最后一个commitlog文件起始偏移量开始传输,* 否则,根据slave broker的拉取请求偏移量开始传输* */if (0 == HAConnection.this.slaveRequestOffset) {long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();masterOffset = masterOffset - (masterOffset %// commitlog文件的大小:1GHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());if (masterOffset < 0) {masterOffset = 0;}this.nextTransferFromWhere = masterOffset;} else {this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;}log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr+ "], and slave request " + HAConnection.this.slaveRequestOffset);}/** lastWriteOver 上一次写是否结束:* 因为通道是非阻塞的,所以不一定会一次写完,所以要有这个标志位** nio设置非阻塞时,并不是等所有数据写完再返回,而是写一部分就返回* */if (this.lastWriteOver) {long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;// 写的时间间隔,超过了预定心跳发送间隔,发送一个心跳包,包中没有commitlog数据// 心跳包,是为了避免长连接,由于commitlog一直没有新数据进来导致空闲而被关闭if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {// Build Headerthis.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(this.nextTransferFromWhere);// offset 不变this.byteBufferHeader.putInt(0);// size为0,即包中没有commitlog数据this.byteBufferHeader.flip();// 传输数据this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}// 上次传输没有结束,继续传输数据} else {this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}// ---------------------------------- 上面都是校验,前戏,下面才是每一轮write的重头戏 --------------------------------------------------------------/** 下面是构造,并传送一个完整的commitlog数据包* */SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);if (selectResult != null) {int size = selectResult.getSize();if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {// 默认最多只能传32k,超过32k,需要做数据截断size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();}long thisOffset = this.nextTransferFromWhere;// nextTransferFromWhere,会在这里随着每增加一个数据包,这个nextTransferFromWhere就往后挪动32k的位置this.nextTransferFromWhere += size;/** 比如,* 第一次传送时,nextTransferFromWhere为0,则thisOffset也变成0,就传送0 -> (32k-1)之间的数据,然后置nextTransferFromWhere为32k* 第二次传送时,nextTransferFromWhere为32k,则thisOffset也变成32k,就传送32 -> (64k-1)之间的数据,然后置nextTransferFromWhere为64k* 第三次传送时,nextTransferFromWhere为64k* .....* 如果一直有数据传输,则每隔一秒,就会往slave传入32k的消息数据,无限循环* */selectResult.getByteBuffer().limit(size);this.selectMappedBufferResult = selectResult;// Build Headerthis.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(thisOffset);this.byteBufferHeader.putInt(size);this.byteBufferHeader.flip();// 传输数据this.lastWriteOver = this.transferData();} else {// 若没有获取到commitlog数据,则等待应用层追加HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);}} catch (Exception e) {HAConnection.log.error(this.getServiceName() + " service has exception.", e);break;}}HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();if (this.selectMappedBufferResult != null) {this.selectMappedBufferResult.release();}this.makeStop();readSocketService.makeStop();haService.removeConnection(HAConnection.this);SelectionKey sk = this.socketChannel.keyFor(this.selector);if (sk != null) {sk.cancel();}try {this.selector.close();this.socketChannel.close();} catch (IOException e) {HAConnection.log.error("", e);}HAConnection.log.info(this.getServiceName() + " service end");}

 

GroupTransferService实现原理

主要是同步复制的场景下

4665fbfe67ec457ab5e978ecdf1b2b0b.png

push2SlaveMaxOffset大于等于request中的offset,则说明当前commitRequest中携带的消息,已经复制到了slave上,如果是小于的,则GroupTransferService线程会睡在WaitNotifyObject上,但是有睡眠超时时间,如果睡醒又判断一次push2SlaveMaxOffset是否大于等于request中的offset,重复了6次,都是小于,那么就会给producer端发回一个同步slave超时的状态

 

 

f024d2b7b1d5450c875796789d703bd9.png

678行,可能有多个slave的写线程,都因为没有新的消息进入commitlog而处于睡眠状态,所以此时如果有新的消息进来后,就需要通过wakeAll()唤醒所有的slave写线程

fc9e277dbd3e42a98fc89204604074ac.png

可以看到,不管同步slave是否成功,最终都会去唤醒producer写线程,只是会去给producer返回一个FLUSH_SLAVE_TIMEOUT

 

 

五大线程的协调关系

消息写入主线程:就是当同步slave未完成之前,自己睡在GroupCommitRequest上,多个GroupCommitRequest组成一批,由GroupTransferService统一管理

GroupTransferService:就是统一管理一批同时进来的,因为还未成功同步slave而阻塞住的消息写入主线程,每wait 1s后就自己醒来判断一次当前管理的这批阻塞线程,有谁完成了同步,则将它唤醒

ReadSocketService:每个M-S连接特有的线程,用于接收slave当前已同步的偏移量,既然收到slave的反馈,则需要唤醒某些消息发送主线程。ReadSocketService在成功更新push2SlaveMaxOffset后就会唤醒GroupTransferService,GroupTransferService再去比较主从位点来决定唤醒哪些消息发送主线程

WriteSocketService:每个M-S连接特有的线程,

HAClient:通过死循环,来每隔一段时间,往master的ReadSocketService线程,发送一次心跳,心跳数据就是当前slave的commitlog最大物理偏移

 

具体流程

当前同时有3个消息写入主线程进来,成功将消息写入commitlog后开始handleHA,此时一个wakeAll()将分别针对两台slave的WriteSocketService线程唤醒,然后3个主线程直接睡在各自的GroupCommitRequest上

当ReadSocketService每接收到一次slave上报过来的新的offset,并成功更新push2SlaveMaxOffset后,就会唤醒睡3个睡在各自的GroupCommitRequest上的主线程,主线程就可以给producer端发回消息写入并同步slave成功

WriteSocketService线程因为没有没有新的消息进来,当前正睡在HAService的WaitNotifyObject上

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/108736.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

不知道有用没用的Api

encodeURIComponent(https://www.baidu.com/?name啊啊啊) decodeURIComponent(https%3A%2F%2Fwww.baidu.com%2F%3Fname%3D%E5%95%8A%E5%95%8A%E5%95%8A) encodeURI(https://www.baidu.com/?name啊啊啊) decodeURI(https://www.baidu.com/?name%E5%95%8A%E5%95%8A%E5%95%8A) …

面试核心技巧--spring篇

答题技巧&#xff1a; 总&#xff1a;当前问题的是那些具体点 分&#xff1a;1、2、3、4突出重点&#xff0c; 避重就轻:没有重点 一个问题能占用面试官多少时间?问的越多可能露馅越多 当面试官问到一个你熟悉的点的时候&#xff0c;一定要尽量拖时间 什么是底层实现&#…

真空腔体的设计要点

真空腔体是保持内部为真空状态的容器&#xff0c;真空腔体设计制作要考虑容积、材质和形状。 1、根据应用需求选择腔体形状。几种代表性的真空腔体包括垂直真空腔体、水平真空腔体、立方真空腔体和球形真空腔体。 2、根据获得真空度选择腔体材质。钛用于极高真空&#xff1b;…

旋转偏心裁切刀切向跟踪及半径补偿

1 裁刀半径补偿问题的提出 偏心裁刀一般皮革和纸箱行业用的比较多&#xff0c;它适用于裁切比较厚的材料。对于如图1所示的偏心裁刀&#xff0c;它的刀尖和旋转轴(也就是刀心)存在一个距离&#xff0c;设为半径r。由于改刀刀刃有方向&#xff0c;所以用该刀去切割直线时&#…

Linux学习笔记-Ubuntu系统下配置用户ssh只能访问git仓库

目录 一、基本信息1.1 系统信息1.2 git版本[^1]1.2.1 服务器端git版本1.2.2 客户端TortoiseGit版本1.2.3 客户端Git for windows版本 二、创建git用户和群组[^2]2.1 使用groupadd创建群组2.2 创建git用户2.2.1 使用useradd创建git用户2.2.2 配置新建的git用户ssh免密访问 2.3 创…

Packet Tracer安装、汉化

Packet Tracer是一款由思科系统开发的网络模拟器&#xff0c;用于学习和实验网络配置、协议和拓扑结构。它提供了一个虚拟的网络环境&#xff0c;让用户能够在不需要实际硬件设备的情况下进行网络实验和模拟。 安装 Packet Tracer的安装注册&#xff1a;在Packet Tracer软件上…

基于SSM的快餐店点餐服务系统设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

2023 CCF国际AIOps挑战赛,报名倒计时!|截止时间9月15日

智能运维领域最具影响力的专业赛事——2023 CCF国际AIOps挑战赛&#xff0c;自报名启动以来已收到230余支队伍报名&#xff0c;约600余位选手参与本次挑战赛。本次大赛的报名截止时间为9月15日&#xff0c;目前报名已经进入倒计时&#xff0c;请选手们抓紧最后时间报名参赛&…

Science adv | 转录因子SPIC连接胚胎干细胞中的细胞代谢与表观调控

代谢是生化反应网络的结果&#xff0c;这些反应吸收营养物质并对其进行处理&#xff0c;以满足细胞的需求&#xff0c;包括能量产生和生物合成。反应的中间体被用作各种表观基因组修饰酶的底物和辅助因子&#xff0c;因此代谢与表观遗传密切相关。代谢结合表观遗传涉及疾病&…

使用Puppeteer爬取地图上的用户评价和评论

导语 在互联网时代&#xff0c;获取用户的反馈和意见是非常重要的&#xff0c;它可以帮助我们了解用户的需求和喜好&#xff0c;提高我们的产品和服务质量。有时候&#xff0c;我们需要从地图上爬取用户对某些地点或商家的评价和评论&#xff0c;这样我们就可以分析用户对不同…

Linux下修改jar包中的配置文件application.conf

文件位置 jar包文件工程目录 打包后解压jar包目录 提取和上传 jar tf XXX.jar # 获取包内文件 application.conf是jar包的配置文件&#xff0c;如果修改需要 提取文件 jar xf my-app.jar application.conf 修改后上传文件 jar uf my-app.jar application.conf

NDK (ndk)报错 Unity requires NDK r19 (64-bit)(19.0.05232133)

一、介绍 在 Android 添加 NDK ndk 的时候&#xff0c;出现 Unity requires NDK r19 (64-bit)(19.0.05232133)。 二、环境 1、Unity 2020.3.48f1c1 2、Android NDK 配置 三、报错信息 NDK (ndk)报错 Unity requires NDK r19 (64-bit)(19.0.05232133) 四、解决方法 1、下…