如何保证消息队列的消息只能被消费一次,首先先保证消息不会丢失
首先先生产者到消费者到消费者有哪些场景会消息丢失
一、问题场景
场景一、生产者发送到消息队列失败
场景二、消息队列接受到消息磁盘化失败
场景三、消费者接受到消息消费失败
二、场景原因,如何解决
1、场景一失败的原因:可能出现在生产者发送给消息队列消息时可能会出现网络抖动,导致发送失败。也有可能消息队列服务挂掉导致发送失败。
解决方法:可以采用消息重传的方式,先可以采用在内存中重试几次,如果重试次数达到最大重试次数,将该消息放到一张记录表中,延时重发即可。当然这种做法可能会有重复消息,
这里只需要确保消息发送到消息队列即可,如何保证消息发送到消息队列,以rocketMQ为例,确保rocketMQ开启ack确认机制,当rocketMQ接受到消息会返回给生产者一条
ack,当生产者收到了ack才说明此条消息到达了rocketMQ,若没有收到一律按照重传处理
2、场景二失败的原因:会出现这种原因是因为消息队列将消息持久化机制有关。持久化机制有两种,一种是异步刷盘,一种是同步刷盘。
异步刷盘是生产者将消息发送给消息队列,消息队列并不是将消息立即将消息存到磁盘,而是将消息消息存到内存或者操作系统的缓存里,消息累计到一定的数量才会将消息持久化到磁盘上,这时消息在内存到中消息队列就会发送给生产者一条消息确认消息,在单机的状态下,如果消息没有消费消息队列重启或者挂掉会导致消息丢失。在集群的情况下也有可能会出现消息丢失,当主节点收到生产者的消息,在主节点和从节点消息同步时,主节点还没有消息持久化,没有同步给从节点,这个时候主节点挂点,从节点晋升为主节点,这个时候会出现消息丢失
同步刷盘是将消息持久化到磁盘中才会返回给生产者ack。
解决方法:将消息队列消息持久化设置成同步刷盘就可以解决这个问题
3、场景三失败原因:以上两点只需要保证消息不丢失即可。那为什么消费者也会出现消息的丢失呢?有几个原因,第一个原因当消费者还有没有消费完消费者出现了宕机或者异常,提前将进度更新到消息队列中,消息队列就不会重复的消费了。第二个原因生产者生产重复消息到了消费者,消费者无法重复消费消息。
解决方法:
首先消费者先插入一个消费记录表,插入成功就执行业务代码,业务代码程序成功就更新记录表状态,业务代码未执行成功,删除记录表中的数据重新执行,如果是bug可以人工干预一下。如果插入记录表失败,检查记录表的状态。如果说已经完成,直接返回消费成功,如果是未完成,延迟消费即可。
这个时候会有一个问题,如果在执行业务代码时出现了宕机等状况,没有来得及删除删除记录表数据,会出现消息开始 -> 插入状态 ->延迟消费的死循环,这个时候就需要人工干预,或者记录消息消费的消费次数时候还是不是需要再消费