RocketMQ快速实战以及集群架构原理详解
组成部分
- 启动Rocket服务之前要先启动NameServer
- NameServer
- 提供轻量级Broker路由服务,主要是提供服务注册
- Broker
- 实际处理消息存储、转发等服务的核心组件
- Producer
- 消息生产者集群,通常为业务系统中的一个功能模块
- Consumer
- 消息消费者集群,通常是业务系统中的一个功能模块
- Topic
- 区分消息的种类,生产端可以发送消息给一个或多个topic,消费端可以进行一个或多个消息进行消费
集群中的角色
- Producer
- 消息发送者(寄信人),在生产者中会把同一类生产者组成一个集合,称之为生产者组,这类生产者发送同一类消息且发送逻辑一致,如果发送的是事务消息是原始生产者在发送之后崩溃,则Broker服务会联系同一生产组的其它生产者来提交或回溯消费
- Consumer
- 消息接受者(收信人),消费者同样会把一类消费者组成一个集合,称之为消费者组,这类消费者消费同一类消息且消费逻辑一致,消费者组在消息消费方面,实现负载均衡和容错非常容易,消费组中的消费者必须订阅相同的topic
RocketMQ支持两种消息模式
- 集群消费模式
- 相同消费者组下的每个消费者平摊消息
- 广播消费模式
- 相同消费者组的每个消费者接受全量消息
- 集群消费模式
- Broker Server
- 暂存和传输消息(邮局),也存储消息相关的元数据信息(包括消费者组、消费进度偏移、主题、队列消息等),Broker Server是RocketMQ真正的业务核心
- 子模块
- Remoting Module
- 整个Broker的实体,负责处理来自Client端的请求
- Client Manager
- 负责管理客户端以及维护消费者订阅的topic信息
- Store Service
- 提供方便简单的API接口处理消息存储到物理硬盘的查询功能
- HA Service
- 高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能
- Index Service
- 根据特定的Message Key对投递到Broker的消息进行索引服务,以提供消息的快速查询
- Remoting Module
- Broker架构模式
- 普通集群
- 每个节点分配一个固定的角色,master负责响应客户端的写以及存储消息,slave只负责对master的消息进行同步以及响应客户端的读
- 消息同步方式分为同步和异步
- 每个节点分配一个固定的角色,master负责响应客户端的写以及存储消息,slave只负责对master的消息进行同步以及响应客户端的读
- Dledger高可用集群
- 基于Raft协议随机选举出一个master,而master挂了之后,会从slage中自动选举出一个节点作为新master
Dledger的职责
- 接管Broker的Commitlog的消息存储
- 从集群中选举出master节点
- 完成master节点往slave节点的消息同步
- 普通集群
- 子模块
- 暂存和传输消息(邮局),也存储消息相关的元数据信息(包括消费者组、消费进度偏移、主题、队列消息等),Broker Server是RocketMQ真正的业务核心
- Name Server
- 管理Broker(邮局的管理机构),Broker Server启动时会向所有的Name Server注册自己的服务信息,并且后续通过心跳来保证服务信息的实时性,生产者或消费者可以通过名称服务查找各个topic响应的Broker IP列表,多个Name Server实例组成集群(AP模式),但相互独立,没有信息互换,意味着Name Server中任意的节点挂了,只要有一台服务节点正常,整个路由服务不会受影响
- Topic
- 区分消息的种类,一个发送者可以发送消息给一个或多个topic,一个消息的接受者可以订阅一个或多个topic消息,同一个topic下的数据,会分片存储到不同的Broker上,而每一个分片单位MessageQueue(类似于Kafka中的Partition)
- MessageQueue
- 相当于Topic中的分区,用于并行发送和接收消息,每个Topic默认有4个MessageQueue
消息确认机制
-
消息生产端采用消息确认多次重试的机制来保证消息能发送到MQ
-
3种发送消息的方式
-
同步发送
- 必须等到Broker反馈之后才能继续发,安全性最高但发消息最慢
-
单向发送
- 不管消息是否发成功都能继续发,所以吞吐量最高,但是安全性低,容易丢消息
-
异步发送
- 发送消息的同时回注册一个回调去处理响应,安全性低,容易丢消息
-
-
-
消息消费者端采⽤状态确认机制保证消费者⼀定能正常处理对应的消息
- Broker会通过记录重试次数,为了不影响topic下其它正常的消息,会给每个消费组设计对应的重试topic,在消息重试时,会将原topic的消息移动到对应的重试topic中去,当重试达到一定阈值会将失败的消息推入到死信topic中
- 消费者组由多个消费者实例组成,Broker只需要向某一个实例推送消息即可,保障消息重试机制正常运行,并且Broker只通过消费者返回的状态来判断是否处理成功,但是业务执行是否正确是无法知道的
-
消费者也可以⾃⾏指定起始消费位点
- Broker通过消费者返回的状态来推进消费者组对应的消息offset,虽然offset是Broker来维护,但是消费者可以自己指定offset进行消费
消息模型
顺序消息
- 只能保证局部消息有序,不能保证全局有序,要保证全局有序需要从生产端、Broker、消费端三个角度同时考虑才行
- 生产端
- 默认情况下,生产端采用轮询将消息投递到不同的MessageQueue种,而消费端会从多个MessageQueue中拉取消息,所以这种情况下是无法保证顺序的,所以
只有让一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这组消息有序
- 默认情况下,生产端采用轮询将消息投递到不同的MessageQueue种,而消费端会从多个MessageQueue中拉取消息,所以这种情况下是无法保证顺序的,所以
- Broker
- Broker中的一个MessageQueue是可以保证有序的
- 消费端
- 消费端会从多个MessageQueue上拉取消息,此时每个MessageQueue的消息是有序的,但是多个MessageQueue直接混合到一起却是乱序的,如果想要保证消费有序,可以通过锁MessageQueue的方式,消费完一个MessageQueue再去消费下一个来保证
- MessageListenerOrderly会锁队列,取完一个才能下一个
- MessageListenerConcurrently不会锁队列,每次从多个MessageQueue取出一批数据(默认不超过32条)
- 消费端会从多个MessageQueue上拉取消息,此时每个MessageQueue的消息是有序的,但是多个MessageQueue直接混合到一起却是乱序的,如果想要保证消费有序,可以通过锁MessageQueue的方式,消费完一个MessageQueue再去消费下一个来保证
- 生产端
实现思路简概
- 生产者只有将一批有序的消息放到同一个MessageQueue上,Broker才有可能保持这一批消息的顺序
- 消费者只有一次锁定一个MessageQueue,拿到MessageQueue上消息
注意点
- 大部分业务场景下只要保证局部有序,如果要保证全局有序,只能保留一个MessageQueue,性能较低
- 生产者端尽可能将有序消息打散到不同的MessageQueue上,避免数据热点竞争
- 消费者端只能使用同步方式处理消息,不要使用异步处理,更不能自行批量处理
- 消费者端只进行有限次数的重试,如果一条消息处理失败,RocketMQ会将后续消息阻塞,让消费者进行重试,但是如果消费者一直处理失败,超过最大重试次数,RocketMQ会跳过这条消息,直接处理后面的消息,导致消费乱序
- 消费者端如果处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代
广播消息
- 广播消息并没有特定的消费者,因为这涉及到消费者的集群消费模式,默认是集群模式
实现思路简概
- 集群模式
- 一个消息只会被一个消费组中的多个消费者共同处理一次(
平摊
)- Broker端会给每个消费者组维护一个统一的offset来保证同一个消费组内只会被消费一次
- 一个消息只会被一个消费组中的多个消费者共同处理一次(
- 广播模式
- 一个消息会被推送给所有消费者消费,不再关心消费组
- Broker端只管推消息,消费端自己维护offset
- 一个消息会被推送给所有消费者消费,不再关心消费组
- 集群模式
注意点
- Broker端不维护消费进度,如果消费者处理消息失败了,将⽆法进⾏消息重试
- 消费端自己维护offset可以在服务重启后继续之前的进度,消息丢了也不影响服务稳定性
延迟消息
- RocketMQ给消息定制了18个默认的延迟级别,延迟消息的难点其实是性能,需要不断进⾏定时轮询,全部扫描所有消息是不可能的
实现思路简概
- RocketMQ预设一个系统topic(SCHEDULE_TOPIC_XXXX),在这个topic下有18个延迟队列,每次只针对这些队列里的消息进行延迟操作
注意点
- 预设延迟时间导致不灵活,后续版本已经支持预设一个具体的时间戳,不建议调整延迟级别对应的延迟时间
批量消息
-
生产者端发送的消息过多时,可以将多条消息合并进行批量发送,减少网络IO,提升消息发送的吞吐量
-
注意点
- 只能对同一topic下的消息进行批量发送,不支持延迟消息,以及批量消息的大小不超过1MB(超过了自行拆分)
过滤消息
- 同一topic下不同的消息,消费者只关注某一类消息,有简单过滤和SQL过滤方式
实现思路简概
- 简单过滤
- ⽣产者端需要在发送消息时,增加Tag属性,消费者端/Broker端就可以通过这个Tag属性过滤出需要的消息
- SQL过滤
- ⽣产者端需要在发送消息时,增加Tag属性以及自定义的属性,消费者端/Broker端可以指定一个SQL进行过滤
- 简单过滤
注意点
- 消息过滤在消费者端和Broker端都可以做,消费者端进行过滤可以保障消息过滤的可控性,而Broker端过滤可以减少不必要数数据网络IO(只把消费者端需要的消息发送出去就行)
- 在实际生产中,被过滤的消息并不会直接丢弃,会交给其它需要的消费者进行消费,如果一直没有消费者进行消费,Broker端会继续推进offset
事务消息
-
通过RocketMQ的事务机制,来保障本地事务(比如数据库)与MQ消息发送的事务一致性(上下游的数据一致性)
-
实现思路简概
- 生产者端将消息发送到MQ服务端
- MQ服务端将消息持久化成功之后,向生产者端反馈已发送成功,此时消息处于半事务消息状态(暂不能投递)
- 生产者端开始执行本地事务逻辑
- 生产者端根据本地事务执行结果向MQ服务端
提交二次确认结果来判断是否提交或回滚
提交
- 服务端将半事务消息标记为可投递,将半事务消息转交给消费端
回滚
- 服务端将回滚事务,放弃将半事务消息转交给消费端
- 当出现网络故障或生产者端重启时,若果MQ服务端未收到二次确认消息结果或收到的结果为未知状态,经过一定时间后,MQ服务端将对生产者组的任一生产者发送消息回查,生产者收到消息回查后,需要检查对应消息的本地事务执行最终结果,然后生产者端根据检查到的最终结果
再次提交二次确认来判断是否提交或回滚
-
注意点
- 半消息是对消费者不可⻅的⼀种消息,RocketMQ的做法是将消息转到了⼀个系统Topic(RMQ_SYS_TRANS_HALF_TOPIC)
- 事务消息中,本地事务默认回查次数15次,本地事务回查的默认间隔60秒,超过回查次数后,消息将会被丢弃
- 事务消息不支持延迟消息和批量消息
最佳实战注意点
- 合理分配Topic、Tag
- ⼀个应⽤尽可能⽤⼀个Topic,⽽消息⼦类型则可以⽤tags来标识,tags可以由应⽤⾃由设置,只有⽣产者在发送消息设置了tags,消费⽅在订阅消息时才可以利⽤tags通过broker做消息过滤
- 使⽤Key加快消息索引
- 分配好Topic和Tag之后,⾃然就需要优化Key属性了,因为Key也可以参与消息过滤,通常建议每个消息要分配 ⼀个在业务层⾯的唯⼀标识码,设置到Key属性中
作用
- 可以配合Tag进⾏更精确的消息过滤
- Broker端会为每个消息创建⼀个hash索引,应⽤可以通过topic、key来查询某⼀条历史的消息内容,以及消息在集群内的处理情况,为了避免哈希冲突问题,客户端要尽量保证key的唯⼀性
- 分配好Topic和Tag之后,⾃然就需要优化Key属性了,因为Key也可以参与消息过滤,通常建议每个消息要分配 ⼀个在业务层⾯的唯⼀标识码,设置到Key属性中
- 关注错误消息重试
- RocketMQ消费者端,如果处理消息失败了,Broker是会将消息重新进⾏投送,⽽在重试时,每个消费者组创建⼀个对应的重试队列(“%RETRY%”+ConsumeGroup),多关注重试队列,可以及时了解消费者端的运⾏情况,如果这个队列中出现了⼤量的消息,就意味着消费者的运⾏出现了问题,要及时跟踪进⾏⼲预
- RocketMQ默认允许每条消息最多重试16次(可以定制),如果消息重试16次后仍然失败,消息将不再投递。转为进⼊死信队列
- ⼿动处理死信队列
- 当⼀条消息消费失败,RocketMQ就会⾃动进⾏消息重试。⽽如果消息超过最⼤重试次数,RocketMQ就会认为这个消息有问题,RocketMQ不会⽴刻将这个有问题的消息丢弃,⽽会将其发送到这个消费者组对应的死信队列,此时需要人工去查看死信队列(%DLQ%+ConsumGroup)中的消息,对错误原因进行排查以及对死信进行处理(转发到正常的tipic进行重新消费或者丢弃)
死信队列的特征
- ⼀个死信队列对应⼀个ConsumGroup,⽽不是对应某个消费者实例
- 如果⼀个ConsumeGroup没有产⽣死信,RocketMQ就不会为其创建相应的死信队列
- 死信队列中的消息不会再被消费者正常消费
- 死信队列的有效期跟正常消息相同,默认3天(可配置),超过这个最⻓时间的消息都会被删除,⽽不管消息是否消费过
- 消费者端进⾏幂等控制
- 在MQ系统中,对于消息幂等有三种实现语义
- at most once 最多⼀次:每条消息最多只会被消费⼀次
- 可以⽤异步发送、sendOneWay等⽅式就可以保证
- at least once ⾄少⼀次:每条消息⾄少会被消费⼀次
- 可以⽤同步发送、事务消息等很多⽅式能够保证
- exactly once 刚刚好⼀次:每条消息都只会确定的消费⼀次
- RocketMQ只能保证at least once,保证不了exactly once
- 云上版本支持
- RocketMQ只能保证at least once,保证不了exactly once
- at most once 最多⼀次:每条消息最多只会被消费⼀次
- 消息幂等的必要性
- 出现重复的情况
- 发送时消息重复
- 当⼀条消息已被成功发送到服务端并完成持久化,此时出现了⽹络闪断或者客户端宕机,导致服务端对客户端
应答失败, 如果此时⽣产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且消息ID也相同的消息
- 当⼀条消息已被成功发送到服务端并完成持久化,此时出现了⽹络闪断或者客户端宕机,导致服务端对客户端
- 投递时消息重复
- 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候⽹络闪断,为
了保证消息⾄少被消费⼀次,Broker端将在⽹络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且消息ID也相同的消息
- 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候⽹络闪断,为
- 负载均衡时消息重复(不限于⽹络抖动、Broker 重启以及订阅⽅应⽤重启)
- 当 Broke端 或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息
- 发送时消息重复
- 处理方式
- 在RocketMQ中,是⽆法保证每个消息只被投递⼀次的,所以要在业务上⾃⾏来保证消息消费的幂等性,RocketMQ的每条消息都有⼀个唯⼀的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以⽤这个MessageId来作为判断幂等的关键依据,但是最好使用分布式ID来避免出现冲突
- 出现重复的情况
- 在MQ系统中,对于消息幂等有三种实现语义