RabbitMQ解决消息丢失以及重复消费问题

文章目录

    • 1、概念
    • 2、基于ACK/NACK机制
      • 2.1 基于Spring AMQP框架整合ACK/NACK机制
      • 2.2 测试消费失败1.0
      • 2.3 测试结果1.0
      • 2.4 测试MQ宕机
      • 2.5 测试结果2.0
    • 3、RabbitMQ 如何实现幂等性设计
      • 3.1 幂等服务设计思路
        • 3.1.1 通过雪花算法生成分布式唯一ID
        • 3.1.2 通过枚举类,设计Message消费状态
        • 3.1.3 生产者
        • 3.1.4 消费者
        • 3.1.5 测试结果

1、概念

RabbitMQ作为一款消息中间件,其设计目标之一就是保证消息的可靠性。要实现RabbitMQ消息不丢失,可以从以下几个方面进行配置和优化:

  1. 生产者确认机制(Publisher Confirms): 生产者在发布消息时,可以开启publisher confirms机制。当消息投递到RabbitMQ Broker后,Broker会返回一个确认信息给生产者。如果Broker没有正确接收到消息或存储失败,则不会发送确认。这样生产者可以根据是否收到确认来决定是否需要重新发送消息。
  2. 持久化消息(Message Durability)
    • 对于队列(Queue),设置其为持久化的(durable)。即使RabbitMQ服务器重启,持久化的队列也会被恢复。
    • 对于消息(Message),在发布时设置delivery mode为2,这将使得消息在队列中持久化。持久化消息会在磁盘上存储备份,即使RabbitMQ服务重启也能保持消息不丢失。
  3. 消费者ACK确认机制: 消费者在消费消息后,需要发送ACK确认给RabbitMQ。如果消费者在处理完消息之前意外终止(如进程崩溃),RabbitMQ会认为该消息未被正确处理,从而重新将消息投入队列等待其他消费者消费。
  4. 集群部署: 通过集群部署的方式提高RabbitMQ服务的可用性和容灾能力,即使部分节点出现问题,其他节点依然能保证消息的正常收发。
  5. 预拉取策略调整: 避免因消费者的消费速度慢于生产者的发送速度而导致的消息积压无法持久化的问题,可以通过调整prefetch count限制消费者预拉取消息的数量。
  6. 监控与告警: 建立完善的监控系统,实时关注RabbitMQ的各项指标,包括队列深度、磁盘使用率等,及时发现可能造成消息丢失的风险点并采取措施。

以上这些方法综合应用,可以在很大程度上确保RabbitMQ消息的不丢失。但需要注意的是,完全避免消息丢失在分布式系统中往往难以做到,只能尽可能地降低这种可能性。

2、基于ACK/NACK机制

在Java中使用RabbitMQ的ACK/NACK机制时,通常会利用Channel对象来进行消息确认。

使用Spring AMQP框架,可以结合Acknowledgment注解或者容器级别的配置来更方便地管理ACK/NACK操作。

在这里插入图片描述

在这里插入图片描述

2.1 基于Spring AMQP框架整合ACK/NACK机制

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Service;/*** RabbitMqConsumer :** @author zyw* @create 2024-01-08  14:48*/@Slf4j
@Service
public class RabbitMqConsumer implements ChannelAwareMessageListener {@Override@RabbitListener(queues = "direct.queue", ackMode = "MANUAL")public void onMessage(Message message, Channel channel) throws Exception {try {// 处理消息逻辑processMessage(message);// 成功处理后手动确认消息long deliveryTag = message.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag, false);} catch (Exception e) {// 处理失败,可以选择重新入队列(取决于业务需求)if (shouldRequeueOnFailure()) {long deliveryTag = message.getMessageProperties().getDeliveryTag();channel.basicNack(deliveryTag, false, true);} else {long deliveryTag = message.getMessageProperties().getDeliveryTag();channel.basicReject(deliveryTag, false);}}}private boolean shouldRequeueOnFailure() {// 根据业务需求决定是否重新入队列return true; // 或者 false}/*** 消费逻辑* @param message* @throws Exception*/private void processMessage(Message message) throws Exception {System.out.println("Processing message: " + new String(message.getBody()));System.out.println("Processing : " + n);}
}

2.2 测试消费失败1.0

这里我基于RabbitMq的direct交换机模式,通过循环发送三条消息

    public void sendQueueBatch(String message) {for (int i = 0; i < 3; i++) {rabbitTemplate.convertAndSend("direct.exchange", "direct.key", message + "{}i:" + i);}log.info("3个消息都发送成功");}

消费的业务逻辑中,我模拟第三次消费的时候会报错

    //消费计数    private int n = 0;/*** 消费逻辑* @param message* @throws Exception*/private void processMessage(Message message) throws Exception {n++;if (n==3){throw new Exception("模拟消费失败");}System.out.println("Processing message: " + new String(message.getBody()));System.out.println("Processing : " + n);}

2.3 测试结果1.0

在这里插入图片描述

如图我们可以看到第三次消费失败后,系统自动再次尝试执行了第四次消费

2.4 测试MQ宕机

这里我们模拟每个消息的执行耗时4秒钟,在这期间我们手动关闭RabbitMq服务,模拟MQ宕机/网络波动。之后再手动重启MQ服务,查看之前未完成消费的消息是否能重新执行成功。

   //计数    private int n = 0;/*** 消费逻辑* @param message* @throws Exception*/private void processMessage(Message message) throws Exception {n++;//模拟MQ宕机Thread.sleep(4000);System.out.println("Processing message: " + new String(message.getBody()));System.out.println("Processing : " + n);}

2.5 测试结果2.0

这里我们可以看到消费第二个消息的过程中,MQ宕机了

在这里插入图片描述

MQ重启之后,第二个和第三个消息都被执行了。通过我们设置的变量计数n以及消息的标识i我们可以发现,第二个消息被重复执行了。

在这里插入图片描述

RabbitMq宕机时已经开始消费但还未消费结束的消息,重启MQ之后会重复执行

在RabbitMQ中,如果消费者在消费消息时宕机或者网络故障导致服务器没有接收到确认(acknowledgement),那么这条消息可能会被重新投递。具体来说:

  1. 当消费者从队列中接收一条消息后,默认情况下RabbitMQ会将消息标记为“不可见”(除非使用了manual acknowledgment模式)。
  2. 消费者在处理完消息并发送ack给RabbitMQ之前,若发生宕机或网络中断等情况,RabbitMQ无法得知该消息是否已经被正确处理。
  3. RabbitMQ会在一个称为prefetch count(预取数量)限制范围内持续尝试重新投递未被确认的消息。

因此,在RabbitMQ服务重启之后,那些之前已经开始消费但未被确认的消息会被认为是没有被正确处理,从而重新放回队列等待被其他消费者获取并处理,这就可能导致消息重复执行。为了避免这种情况造成的影响,通常需要在业务逻辑层面实现幂等性设计,即确保消息无论被消费多少次,其结果都是相同的,并且只产生一次有效操作。此外,可以使用事务、发布确认和高级消息确认机制来更好地控制消息的可靠性。

3、RabbitMQ 如何实现幂等性设计

在RabbitMQ中实现幂等性设计,确保消息无论被消费多少次都不会对业务状态造成重复影响,需要结合消息队列的机制以及业务逻辑的设计。以下是一些建议和方法:

  1. 业务层幂等处理

    • 每个消息携带一个全局唯一ID,在业务处理过程中,首先检查这个ID是否已经被处理过。例如,将已处理消息的ID记录到数据库的“已处理消息表”中,下次收到同样ID的消息时直接返回成功而不进行实际操作。
    • 对于更新型操作,可以使用乐观锁或分布式锁来保证同一事务多次执行结果相同,例如通过版本号(version)控制更新操作,只有当版本号未变时才执行更新。
    • 对于创建型操作,确保即使多次调用也不会生成多个资源,例如通过查询是否存在相同的唯一键来决定是否创建新的资源。
  2. 确认模式选择

    • 使用acknowledgement模式,消费者接收到消息后必须发送确认给RabbitMQ,只有收到确认后RabbitMQ才会从队列中移除消息,否则会在连接恢复后重新投递。
    • 设置publisher confirms,生产者可以得到消息发布的确认,确保消息确实到达了MQ服务器并持久化存储。
  3. 死信队列与重试策略

    • 配置死信交换机和死信队列,对于那些重复投递依然无法正确处理的消息,可以转移到死信队列,并设置相应的重试策略及最大重试次数,超过限制则记录日志、报警或手动介入处理。
  4. 幂等服务设计

    • 设计能够应对重复调用的服务接口,这些接口内部应该包含足够的逻辑判断以识别重复请求并作出正确的响应。
  5. 事务与补偿机制

    • 对于涉及多个系统的分布式事务场景,可以考虑采用TCC(Try-Confirm-Cancel)模式或其他分布式事务解决方案,使得整个流程具有幂等性。

总结来说,在RabbitMQ中实现幂等性主要依赖于业务逻辑层面的改造和优化,同时配合RabbitMQ自身的消息确认机制来确保消息不会因为异常情况而重复处理。

3.1 幂等服务设计思路

我们可以给每一个消息绑定一个分布式唯一ID,在通过Redis记录该消息的消费状态,保证每条消息只能被消费一次

在这里插入图片描述

3.1.1 通过雪花算法生成分布式唯一ID

我们可以将雪花算法的工具类抽出到微服务分布式系统的公共组件中,通过maven的依赖引用来使用。

在每个服务的配置文件中去配置专属的工作节点ID和数据中心ID,不同的服务去引用雪花算法工具类时,读取自身配置文件中的工作节点ID和数据中心ID。

zyw:# 工作节点ID(0~31)workerId: 0# 数据中心ID(0~31)datacenterId: 0

通过专属工作节点ID和数据中心ID构建专属的雪花算法工具类SnowflakeIdWorker

import org.springframework.beans.factory.annotation.Value;
import java.util.concurrent.atomic.AtomicLong;/*** SnowflakeIdWorker : 雪花算法** @author zyw* @create 2024-01-09  10:46*/public class SnowflakeIdWorker {// 起始的时间戳 (2010-01-01)private final long twepoch = 1288834974657L;// 机器标识位数private final long workerIdBits = 5L;private final long datacenterIdBits = 5L;// 序列号位数private final long sequenceBits = 12L;// 工作机器ID最大值private final long maxWorkerId = -1L ^ (-1L << workerIdBits);// 数据中心ID最大值private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);// 每一部分向左的偏移量private final long workerIdShift = sequenceBits;private final long datacenterIdShift = sequenceBits + workerIdBits;private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;// 时间戳边界值private long lastTimestamp = -1L;// 工作节点ID(0~31)@Value("${zyw.workerId}")private long workerId;// 数据中心ID(0~31)@Value("${zyw.datacenterId}")private long datacenterId;// 每个节点每毫秒内的序列号private AtomicLong sequence = new AtomicLong(0L);/*** 通过专属工作节点ID和数据中心ID构建专属的雪花算法工具类*/public SnowflakeIdWorker() {if (this.workerId > maxWorkerId || this.workerId < 0) {throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));}if (this.datacenterId > maxDatacenterId || this.datacenterId < 0) {throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));}}/*** 分布式唯一ID生成* @return*/public synchronized long nextId() {long timestamp = timeGen();// 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常if (timestamp < lastTimestamp) {throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));}// 如果是同一时间生成的,则进行序列号的自增if (lastTimestamp == timestamp) {sequence.incrementAndGet();// 判断是否溢出if (sequence.get() > (-1L ^ (-1L << sequenceBits))) {// 阻塞到下一个时间戳timestamp = tilNextMillis(lastTimestamp);}} else {// 时间戳改变,重置序列号sequence.set(0L);}// 上次生成ID的时间截lastTimestamp = timestamp;// 移位并通过或运算拼到一起组成64位的IDreturn ((timestamp - twepoch) << timestampLeftShift) |(datacenterId << datacenterIdShift) |(workerId << workerIdShift) | sequence.get();}/*** 从给定的最后时间戳中获取下一个时间戳** @param lastTimestamp 最后时间戳* @return 下一个时间戳*/protected long tilNextMillis(long lastTimestamp) {long timestamp = timeGen();while (timestamp <= lastTimestamp) {timestamp = timeGen();}return timestamp;}/*** 生成当前时间的毫秒数。** @return 当前时间的毫秒数。*/protected long timeGen() {return System.currentTimeMillis();}
}
3.1.2 通过枚举类,设计Message消费状态
import java.util.Arrays;
import java.util.List;/*** RabbitStatusEnum :** @author zyw* @create 2024-01-09  11:18*/public enum RabbitStatusEnum {CONSUME(0, "待消费"),BEGIN(1, "开始消费"),SUCCESS(2, "成功"),FAIL(3, "失败"),;private Integer code;private String message;RabbitStatusEnum(Integer code, String message) {this.code = code;this.message = message;}public int getCode() {return code;}public void setCode(Integer code) {this.code = code;}public String getMessage() {return message;}/*** 获取需要执行的状态集合* @return*/public static List<Integer> getNeedExecuteList(){return Arrays.asList(CONSUME.getCode(),FAIL.getCode());}/*** 获取不需要执行的状态集合* @return*/public static List<Integer> getCompletionExecuteList(){return Arrays.asList(CONSUME.getCode(),FAIL.getCode());}}
3.1.3 生产者

生产者发送消息时,生成专属分布式唯一业务ID,通过Redis记录消息的消费状态

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.example.demo.config.mq.RabbitStatusEnum;
import com.example.demo.config.redis.RedisKeyEnum;
import com.example.demo.uitls.RedisUtils;
import com.example.demo.uitls.SnowflakeIdWorker;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;/*** MqService :** @author zyw* @create 2023-12-19  16:26*/@Service
@Slf4j
public class MqService {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate SnowflakeIdWorker snowflakeIdWorke;@Resourceprivate RedisUtils redisUtils;/*** 批量发送消息** @param message*/public void sendQueueBatch(String message) {//请求头设置消息id(messageId)Map<String, Object> map = new HashMap<>();map.put("message", message);for (int i = 0; i < 3; i++) {long id = snowflakeIdWorker.nextId();map.put("id", id);JSONObject entries = JSONUtil.parseObj(map);redisUtils.setCacheObject(RedisKeyEnum.MQ_STATUS.getKey() + id, RabbitStatusEnum.CONSUME.getCode());rabbitTemplate.convertAndSend("direct.exchange", "direct.key", entries);}log.info("3个消息都发送成功");}}
3.1.4 消费者

我定义了一个实现ChannelAwareMessageListener接口的消费者类,并在@RabbitListener注解中设置了ackMode="MANUAL",这意味着消息确认将由开发者手动完成。当接收到消息时,可以通过获取的Channel对象调用basicAck()basicNack()basicReject()方法来进行消息确认或者拒绝操作。

  • 消息开始消费时,记录开始消费的状态
  • 消息成功完成后,记录成功消费的状态

这里是为了避免在消息开始消费后,RabbitMq宕机了,此时MQ并不知道这个消息最终有没有消费完成,因此重启MQ之后,MQ会重新消费这条消息。

因此我们只运行执行“待消费”和“消费失败”状态的消息。

  • 如果在执行消费的过程中,出错了(抛出Exception),则记录消费失败的状态,MQ会再次尝试去进行消费
  • 我们可以设置最多重试次数,以及两次重试消费的间隔时间
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.example.demo.config.mq.RabbitStatusEnum;
import com.example.demo.config.redis.RedisKeyEnum;
import com.example.demo.uitls.RedisUtils;
import com.rabbitmq.client.Channel;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Service;/*** RabbitMqConsumer : 消费者** @author zyw* @create 2024-01-08  14:48*/@Slf4j
@Service
public class RabbitMqConsumer implements ChannelAwareMessageListener {@Resourceprivate RedisUtils redisUtils;/*** 记录消费次数*/private int n = 0;@Override@RabbitListener(queues = "direct.queue", ackMode = "MANUAL")public void onMessage(Message message, Channel channel) throws Exception {JSONObject entries = JSONUtil.parseObj(new String(message.getBody()));Integer status = redisUtils.getCacheObject(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"));try {//只有代消费和消费失败的能进行消费if (RabbitStatusEnum.getNeedExecuteList().contains(status)) {//记录开始消费redisUtils.setCacheObject(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"), RabbitStatusEnum.BEGIN.getCode());// 处理消息逻辑processMessage(entries);System.out.println("执行成功了:" + entries.get("id"));//记录消费成功redisUtils.setCacheObject(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"), RabbitStatusEnum.SUCCESS.getCode());// 成功处理后手动确认消息long deliveryTag = message.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag, false);}} catch (Exception e) {// 处理失败,可以选择重新入队列(取决于业务需求)if (shouldRequeueOnFailure()) {long deliveryTag = message.getMessageProperties().getDeliveryTag();channel.basicNack(deliveryTag, false, true);System.out.println("执行失败了:" + entries.get("id"));//记录消费失败redisUtils.setCacheObject(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"), RabbitStatusEnum.FAIL.getCode());} else {long deliveryTag = message.getMessageProperties().getDeliveryTag();channel.basicReject(deliveryTag, false);}}}/*** 根据业务需求决定是否重新入队列* @return*/private boolean shouldRequeueOnFailure() {return true;}/*** 消费逻辑** @param entries* @throws Exception*/private void processMessage(JSONObject entries) throws Exception {n++;//模拟MQ消费时长Thread.sleep(4000);//消费System.out.println("Processing id: " + RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"));System.out.println("Processing message: " + entries.get("message"));System.out.println("第" + n + "次消费");}
}
3.1.5 测试结果

这里我在第二条消息的执行消费过程中,手动关闭了RabbitMQ服务(模拟RabbitMQ宕机/网络波动),等待几秒后,重启RabbitMQ服务。

可以看到三条消息都被正常消费完成,解决了之前MQ重启后,重复消费的问题,解决了RabbitMQ消息不丢失的问题。

在这里插入图片描述

在这里插入图片描述
Redis中记录了每条消息消费的状态
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/345237.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用“通义听悟”提高工作和学习效率

如何使用通义听悟提高工作和学习效率 通义听悟是一款利用人工智能技术&#xff0c;自动为音频和视频内容提供转写、翻译、总结、检索等功能的在线工具。它可以在会议、学习、访谈、培训等场景下&#xff0c;帮助您记录、阅读、整理、复习音视频信息&#xff0c;成为您的工作和…

【目标检测】YOLOv5算法实现(七):模型训练

本系列文章记录本人硕士阶段YOLO系列目标检测算法自学及其代码实现的过程。其中算法具体实现借鉴于ultralytics YOLO源码Github&#xff0c;删减了源码中部分内容&#xff0c;满足个人科研需求。   本系列文章主要以YOLOv5为例完成算法的实现&#xff0c;后续修改、增加相关模…

机器人模仿学习之动作分块ACT算法的代码剖析、部署训练

前言 本文最早是属于《斯坦福Mobile ALOHA背后的关键技术&#xff1a;动作分块ACT算法的原理解析》的第二、第三部分&#xff0c;涉及到动作分块ACT的代码剖析与部署训练 但因为想把ACT的代码逐行剖析的更细致些&#xff0c;加之为避免上一篇文章太过于长&#xff0c;故把动作…

vue3 - 自定义弹框组件

写了一个弹框组件 <template><transition name"modal-fade"><div v-if"showFlag" class"myModal"><div class"content"><div class"topBox"><div class"leftTitle"><spa…

AUTO SEG-LOSS SEARCHING METRIC SURROGATES FOR SEMANTIC SEGMENTATION

AUTO SEG-LOSS: 搜索度量替代语义分割 论文链接&#xff1a;https://arxiv.org/abs/2010.07930 项目链接&#xff1a;https://github.com/fundamentalvision/Auto-Seg-Loss ABSTRACT 设计合适的损失函数是训练深度网络的关键。特别是在语义分割领域&#xff0c;针对不同的场…

苹果手机怎么恢复备份?详细攻略为你整理好了!

随着智能手机和互联网的普及&#xff0c;手机中存储的个人信息、照片、视频、聊天记录等数据会变得越来越多。一旦手机丢失、损坏或系统出现问题&#xff0c;我们很可能会面临数据丢失的风险。因此&#xff0c;越来越多的人开始意识到保护手机数据的重要性。 当苹果手机数据丢…

爬虫—响应页面乱码问题解决方法

爬虫—响应页面乱码问题解决方法 案例&#xff1a;腾牛网图片抓取 源代码如下&#xff1a; import requestsurl https://www.qqtn.com/wm/meinvtp_1.html headers {user-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) …

[易语言]易语言部署yolox的onnx模型

【官方框架地址】 https://github.com/Megvii-BaseDetection/YOLOX 【算法介绍】 YOLOX是YOLO系列目标检测算法的进一步演变和优化。它由Megvii Technology的研究团队开发&#xff0c;是一个高性能、可扩展的对象检测器。YOLOX在保留快速处理速度的同时&#xff0c;通过引入一…

NX二次开发PK获取对象类型

PK_ENTITY_ask_class(),获取对象类型建议用这个函数&#xff0c;比较通用&#xff0c;包含所有对象类型&#xff0c;可以替代UF_MODL_ask_edge_type(),UF_MODL_ask_body_type(),UF_MODL_ask_face_type()等函数 PK_ENTITY_t entity; PK_CLASS_t PK_TYPE; PK_ENTITY_ask_class(e…

html的全选反选

一、实验题目 html实现选择框的全选和反选 二、实验代码 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>全选和反选</title></head><body><ul>兴趣爱好</ul><input id"all"…

怎么取消开机密码?4个必备方法!

“每次我开机都要输入密码&#xff0c;感觉有点麻烦&#xff0c;有什么方可以快速取消开机密码的吗&#xff1f;快给我推荐推荐吧&#xff01;” 为电脑设置开机密码&#xff0c;可以更好地保护电脑中的重要数据。但是用户需要在每次开机时都输入密码。这对于部分用户来说可能是…