消息重复消费
影响消息正常发送和消费的重要原因是网络的不确定性。
引起重复消费的原因
- ACK
正常情况下在consumer真正消费完消息后应该发送ack,通知broker该消息已正常消费,从queue中剔除。当ack因为网络原因无法发送到broker,broker会认为此条消息没有被消费,此后会开启消息重投机制把消息再次投递到consumer
- 消费模式
在CLUSTERING模式下,消息在broker中会保证相同group的consumer消费一次,但是针对不同group的consumer会推送多次。
解决方案
数据库表
处理消息前,使用消息主键在表中带有约束的字段中insert
Map
单机时可以使用map ConcurrentHashMap -> putIfAbsent guava cache
Redis
分布式锁搞起来。
幂等
顺序消息
RocketMQ可以严格保证消息有序。
@RocketMQMessageListener
这个监听消息的注解,它的的配置是 @RocketMQMessageListener(nameserver="你的rocketmq的nameserver", topic="你要订阅的topic名字", consumerGroup="你的消费者的组名字", consumerMode=ConsumeMode.ORDERLY(这个是顺序消费,默认是ConsumeMode.CONCURRENT异步多线程消费), selectorExpression="你的tag名字")
如何让RocketMQ保证消息的顺序消费
你们线上业务用消息中间件的时候,是否需要保证消息的顺序性?
如果不需要保证消息顺序,为什么不需要?
假如我有一个场景要保证消息的顺序,你们应该如何保证?
首先多个queue只能保证单个queue里的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是无法绝对保证消息的有序性的。所以总结如下:
同一topic,同一个QUEUE,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个queue里的消息。
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。
RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);
而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。
但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。
当发送和消费参与的queue只有一个,则是全局有序;
如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
怎么保证消息发到同一个queue?
Rocket MQ给我们提供了MessageQueueSelector接口,可以自己重写里面的接口,实现自己的算法,举个最简单的例子:判断i % 2 == 0
,那就都放到queue1里,否则放到queue2里。
RocketMQ如何保证消息不丢失
首先在如下三个部分都可能会出现丢失消息的情况:
- Producer端
- Broker端
- Consumer端
Producer端如何保证消息不丢失
同步发消息,发送结果是同步感知的。
同步发送失败后可以重试,设置重试次数。默认3次。
``` 在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。
只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试3次,如果重试再失败,就会以返回值或者异常的方式告知用户。
```
Broker端如何保证消息不丢失
- 修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。
flushDiskType = SYNC_FLUSH
- 集群部署,主从模式,高可用。
在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。 如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。 例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到2个以上的节点,再给客户端回复发送确认响应。 这样当某个 Broker 宕机时,其他的Broker 可以替代宕机的 Broker,也不会发生消息丢失。后面我会专门安排一节课,来讲解在集群模式下,消息队列是如何通过消息复制来确保消息的可靠性的。
Consumer端如何保证消息不丢失
- 完全消费正常后在进行手动ack确认。
消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。 编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。 正确的顺序是,先是把消息保存到数据库中,然后再发送消费确认响应。这样如果保存消息到数据库失败了,就不会执行消费确认的代码,下次拉到的还是这条消息,直到消费成功。
消费幂等的必要性
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
生产者重发导致相同,发送时消息重复
当一条消息已被成功发送到MQ服务端并完成持久化,此时MQ出现了网络闪断或者客户端宕机,导致MQ服务端对生产者客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
上面两条是RocketMQ本身存在的MessageID相同的问题,之前也有人说通过业务key来保证消息是不重复的。
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
重复消息解决:幂等
利用数据库的唯一约束实现幂等
例如我们刚刚提到的那个不具备幂等特性的转账的例子:将账户 X 的余额加 100 元。在这 个例子中,我们可以通过改造业务逻辑,让它具备幂等性。 首先,我们可以限定,对于每个转账单每个账户只可以执行一次变更操作,在分布式系统中,这个限制实现的方法非常多,最简单的是我们在数据库中建一张转账流水表,这个表有三个字段:转账单 ID、账户 ID 和变更金额,然后给转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。 这样,我们消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账户 ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。我们只要写一个 SQL,正确地实现它就可以了。
基于这个思路,不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。
为更新的数据设置前置条件
另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。 比如,刚刚我们说过,“将账户 X 的余额增加 100 元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为:“如果账户 X 当前的余额为 500 元,将余额加100 元,这个操作就具备了幂等性。对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。
但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用 什么作为前置判断条件呢?更加通用的方法是,给你的数据增加一个版本号属性,每次更数 据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更 新数据的同时将版本号 +1,一样可以实现幂等更新。
记录并检查操作
如果上面提到的两种实现幂等方法都不能适用于你的场景,我们还有一种通用性最强,适用 范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯 一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过 这个更新操作。
具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据 这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置 为已消费。
优化性能来避免消息积压
在使用消息队列的系统中,对于性能的优化,主要体现在生产者和消费者这一收一发两部分的业务逻辑中。对于消息队列本身的性能,你作为使用者,不需要太关注。为什么这么说呢?
主要原因是,对于绝大多数使用消息队列的业务来说,消息队列本身的处理能力要远大于业务系统的处理能力。主流消息队列的单个节点,消息收发的性能可以达到每秒钟处理几万至几十万条消息的水平,还可以通过水平扩展 Broker 的实例数成倍地提升处理能力。
而一般的业务系统需要处理的业务逻辑远比消息队列要复杂,单个节点每秒钟可以处理几百到几千次请求,已经可以算是性能非常好的了。所以,对于消息队列的性能优化,我们更关注的是,在消息的收发两端,我们的业务代码怎么和消息队列配合,达到一个最佳的性能。
1. 发送端性能优化
发送端业务代码的处理性能,实际上和消息队列的关系不大,因为一般发送端都是先执行自己的业务逻辑,最后再发送消息。如果说,你的代码发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。
对于发送消息的业务逻辑,只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。为什么这么说呢?
我们之前的课程中讲过 Producer 发送消息的过程,Producer 发消息给 Broker,Broker 收到消息后返回确认响应,这是一次完整的交互。假设这一次交互的平均时延是 1ms,我们把这 1ms 的时间分解开,它包括了下面这些步骤的耗时:
- 发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在发送网络请求之前的耗时;
- 发送消息和返回响应在网络传输中的耗时;
- Broker 处理消息的时延。
如果是单线程发送,每次只发送 1 条消息,那么每秒只能发送 1000ms / 1ms * 1 条 /ms = 1000 条 消息,这种情况下并不能发挥出消息队列的全部实力。
无论是增加每次发送消息的批量大小,还是增加并发,都能成倍地提升发送性能。至于到底是选择批量发送还是增加并发,主要取决于发送端程序的业务性质。简单来说,只要能够满足你的性能要求,怎么实现方便就怎么实现。
比如说,你的消息发送端是一个微服务,主要接受 RPC 请求处理在线业务。很自然的,微服务在处理每次请求的时候,就在当前线程直接发送消息就可以了,因为所有 RPC 框架都是多线程支持多并发的,自然也就实现了并行发送消息。并且在线业务比较在意的是请求响应时延,选择批量发送必然会影响 RPC 服务的时延。这种情况,比较明智的方式就是通过并发来提升发送性能。
如果你的系统是一个离线分析系统,离线系统在性能上的需求是什么呢?
它不关心时延,更注重整个系统的吞吐量。发送端的数据都是来自于数据库,这种情况就更适合批量发送,你可以批量从数据库读取数据,然后批量来发送消息,同样用少量的并发就可以获得非常高的吞吐量。
2. 消费端性能优化
使用消息队列的时候,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压。如果这种性能倒挂的问题只是暂时的,那问题不大,只要消费端的性能恢复之后,超过发送端的性能,那积压的消息是可以逐渐被消化掉的。
要是消费速度一直比生产速度慢,时间长了,整个系统就会出现问题,要么,消息队列的存储被填满无法提供服务,要么消息丢失,这对于整个系统来说都是严重故障。
所以,我们在设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。
消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的一点是,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。 如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。原因我们之前讲过,因为对于消费者来说,在每个分区上实际上只能支持单线程消费。
我见到过很多消费程序,他们是这样来解决消费慢的问题的:
如果收消息处理的业务逻辑可能比较慢,也很难再优化了,为了避免消息积压,在收到消息的 OnMessage 方法中,不处理任何业务逻辑,把这个消息放到一个内存队列里面就返回了。
然后它可以启动很多的业务线程,这些业务线程里面是真正处理消息的业务逻辑,这些线程从内存队列里取消息处理,这样它就解决了单个 Consumer 不能并行消费的问题。
这个方法是不是很完美地实现了并发消费? 请注意,这是一个非常常见的错误方法! 为什么错误?因为会丢消息。如果收消息的节点发生宕机,在内存队列中还没来及处理的这些消息就会丢失。
关于“消息丢失”问题,可以参考《如何确保消息不会丢失》。
所以应该是收到消息,先保存消息,然后开启多线程消费消息。
rocketMQ的消息堆积如何处理
下游消费系统如果宕机了,导致几百万条消息在消息中间件里积压,此时怎么处理?
你们线上是否遇到过消息积压的生产故障?如果没遇到过,你考虑一下如何应对?
首先要找到是什么原因导致的消息堆积,是Producer太多了,Consumer太少了导致的还是说其他情况,总之先定位问题。
导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了。
如果是单位时间发送的消息增多,比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。
如果Consumer和Queue不对等,上线了多台也在短时间内无法消费完堆积的消息怎么办?
- 准备一个临时的topic
- queue的数量是堆积的几倍
- queue分布到多Broker中
- 上线一台Consumer做消息的搬运工,把原来Topic中的消息挪到新的Topic里,不做业务逻辑处理,只是挪过去。即订阅原来的Topic 发送到新Topic。
- 上线N台Consumer同时消费新的临时Topic中的数据
- 改bug
- 恢复原来的Consumer,继续消费之前的Topic
还有一种不太常见的情况,你通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度。
追问:堆积时间过长消息超时了?
RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。
追问:堆积的消息会不会进死信队列?
不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),16次(默认16次)才会进入死信队列(%DLQ%+ConsumerGroup)。
RocketMQ在分布式事务支持这块机制的底层原理?
你们用的是RocketMQ?RocketMQ很大的一个特点是对分布式事务的支持,你说说他在分布式事务支持这块机制的底层原理?
消息队列中的“事务”,主要解决的是消息生产者和消息消费者的数据一致性问题。
分布式系统中的事务可以使用TCC(Try、Confirm、Cancel)、2pc来解决分布式系统中的消息原子性
RocketMQ 4.3+提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致
RocketMQ实现方式:
Half Message: 预处理消息,当broker收到此类消息后,会存储到RMQSYSTRANSHALFTOPIC的消息消费队列中
检查事务状态: Broker会开启一个定时任务,消费RMQSYSTRANSHALFTOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,Broker会定时去回调在重新检查。
超时: 如果超过回查次数,默认回滚消息。
也就是他并未真正进入Topic的queue,而是用了临时queue来放所谓的half message,等提交事务后才会真正的将half message转移到topic下的queue。
如果让你来动手实现一个分布式消息中间件,整体架构你会如何设计实现?
我个人觉得从以下几个点回答吧:
- 需要考虑能快速扩容、天然支持集群
- 持久化的姿势
- 高可用性
- 数据0丢失的考虑
- 服务端部署简单、client端使用简单
高吞吐量下如何优化生产者和消费者的性能?
开发
同一group下,多机部署,并行消费
单个Consumer提高消费线程个数
批量消费
- 消息批量拉取
- 业务逻辑批量处理
再说说RocketMQ 是如何保证数据的高容错性的?
- 在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker
- 如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用
- 如果上次失败的Broker可用那么还是会选择该Broker的队列
- 如果上述情况失败,则随机选择一个进行发送
- 在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测broker的可用时间
其实就是send消息的时候queue的选择。源码在如下:
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue()
任何一台Broker突然宕机了怎么办?
Broker主从架构以及多副本策略。Master收到消息后会同步给Slave,这样一条消息就不止一份了,Master宕机了还有slave中的消息可用,保证了MQ的可靠性和高可用性。而且Rocket MQ4.5.0开始就支持了Dlegder模式,基于raft的,做到了真正意义的HA。
重置消费点位
用过rocketmq的管理控制平台的同学应该都会看到重置消费位点这个功能,由于消费消息也只是对应的消费者的指针的偏移,消息也并没有消失,所以这个重置消费位点也就是指针的来回移动偏移而已。如下就是消费位点的界面
也很好理解,首先选择消费组,就是你订阅了这个topic的组,下边的时间点就是你要讲指针移动到什么时候,这个好像不能精确到其中的一条消息,只能通过时间来定位。然后就是重置后你选的消费组就开始从你选择的时间点开始消费消息,不过呢这个好像有点小bug还是怎么的 我测试的时候它的时间点好像并没有多么精确 大家可以自行测试。
还有一点 最重要的一点,这个重置消费位点你点过重置后它不会立马重置,你的消费者也不会立马接收到消息,点过重置后,好像会导致消息短暂的失效(这个时间好像是0-3分钟的样子,所以得等一下)