MQ消息持久化解决方案

news/2025/3/24 15:13:17/文章来源:https://www.cnblogs.com/Abner-rudolf/p/18787474

消息持久化

1. RabbitMQ 发送与消费消息的模型

2. 消息丢失的几种情况?

  1. 生产者发送消息未到达交换机

  2. 消息到达交换机,没有正确路由到队列

  3. MQ 宕机,队列中的消息不见了

  4. 消费者收到消息,还没消费,消费者宕机

3. 如何保证消息不丢失?

3.1 生产者确认机制

  1. publisher-confirm

    1. 消息成功投递到交换机,返回 ack

    2. 消息未成功投递到交换机,返回 nack

      记录消息以及交换机等相关信息到数据库,后期可以编写任务去补偿发送

  2. publisher-return

    1. 未正确到达队列,返回 ack 及失败原因

      记录消息以及交换机等相关信息到数据库,后期可以编写任务去补偿发送

图示

实现

  1. 配置文件

    spring:rabbitmq:host: 192.168.200.130 # 虚拟机 IPport: 5672 # 端口virtual-host: / # MQ 的虚拟主机username: usernamepassword: passwordpublisher-confirm-type: correlatedpublisher-returns: true # 开启 publisher-returnstemplate:mandatory: true
    

    参数说明:

    • publish-confirm-type:开启 publisher-confirm
      • none:关闭 confirm 机制
      • simple:同步阻塞等待 MQ 的回执(回调方法)
      • correlated:MQ 异步回调返回回执
    • template.mandatory:定义消息路由失败时的策略。
      • true:调用 ReturnCallback
      • false:则直接丢弃消息
  2. 定义 ConfirmCallback

    ConfirmCallback 可以在发送消息时指定,因为每个业务处理 confirm 成功或失败的逻辑不一定相同。

    public void testSendMessage2SimpleQueue() throws InterruptedException {// 1 消息体String message = "hello, spring amqp!";// 2 全局唯一的消息 ID,需要封装到 CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3 添加 callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()) {log.debug("消息发送成功, ID:{}", correlationData.getId());} else {log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage()));// 4 发送消息rabbitTemplate.convertAndSend("", "simple.queue", message, correlationData);// 休眠一会儿,等待 ack 回执Thread.sleep(2000);
    }
    
  3. 定义 Return 回调

    每个 RabbitTemplate 只能配置一个 ReturnCallback,因此需要在项目加载时配置。

    @Slf4j
    @Configuration
    public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取 RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置 ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
    replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});}
    }
    

3.2 持久化机制

  1. 交换机持久化:

    默认就是持久化,durable 默认就是 true

  2. 队列持久化

    默认就是持久化,durable 默认就是true

  3. 消息持久化

    默认就是持久化。在发送消息时,使用 Message 对象,并设置 delivery-mode 为持久化

3.3 消费者 ack 机制

ack 取值情况:

  1. none:只要消息到达消费者,消费者直接返回 ack 给 MQ

    MQ 收到 ack,会把队列中的消息删除,消息可能会丢失

    • 消费者配置

      spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭 ack
      
  2. manual:手动 ack

    1. 消费成功,调用 API 给 MQ 返回 ack
    2. 消费失败,调用 API 给 MQ 返回 nack,并且让消息重回队列

    消费者配置

    spring:rabbitmq:listener:simple:acknowledge-mode: manual  # 手动 ack
    

    测试代码:

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) {try {// 从 redis 获取一个 retry_count >= 3 直接记录日志,不重回队列,中断操作 returnlog.warn("消费者接收到 simple.queue 的消息:{}", msg);int i = 1 / 0;log.info("消息成功消费了 ---> SUCCESS");// 手动 ack// 可以使用 org.springframework.amqp.core.Messagee 拿到 deLiveryTagchannel.basicAck(deliveryTag, false);} catch (Exception e) {e.printStackTrace();try {// 返回 nack,并且让消息重回队列channel.basicNack(deliveryTag, false, true);Thread.sleep(1000);log.error("消息消费失败,重回队列-->");// 向 redis 中设置值// redisTemplate.opsForValue().incr(retry_count)} catch (Exception ex) {ex.printStackTrace();}}
    }
    
  3. auto:自动 ack。消费消息不出异常,返回 ack 给 MQ。消费消息出异常了,返回 nack,把消息重回队列

    1. 本地重试

      spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初始的失败等待时长为1秒multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true 无状态;false 有状态。如果业务中包含事务,这里改为 false
      

      达到重试次数后,还是失败,则返回 ack,不 requeue。MQ 会删除队列消息

    2. 失败策略

      1. RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认方式

      2. ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队

      3. RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

    3. 使用 RepublishMessageRecoverer

      需求:把消息投递到失败的交换机,路由队列。记录日志,将来人工干预

      实现

      1. 定义错误交换机、队列、绑定关系。定义 RepublishMessageRecoverer

      2. 监听错误队列

4. 总结

  1. 创建交换机、队列、消息进行持久化

    1. 交换机、队列默认就是持久化的

    2. 消息持久化

  2. 生产者开启确认机制

    1. 开启消息发送失败的重试策略

      1. 设置重试次数和重试间隔比例

      2. 耗尽重试次数后,依旧失败,记录失败消息到数据库失败消息表,用于后期执行补偿错误。如使用定时任务去扫描这个表,重新发送消息

    2. 开启 confirm 机制:保证消息正确到达交换机

      • 返回 ack,正确到达

      • 返回 nack,没有到达交换机,写入数据库,后期重试

    3. 开启 return 机制

      • 保证消息正确到达队列

      • 没有到达队列,会调用ReturnCallback,写入数据库,后期重试

  3. 消费者确认机制

    1. 开机自动确认机制

    2. 开启重试策略

      重试次数耗尽后,定义RepublishMessageRecoverer策略来让消息路由到错误队列,落库


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/904052.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

『Plotly实战指南』--柱状图绘制高级篇

在数据可视化的世界里,柱状图是一种直观且强大的工具,用于展示数据的分布、比较和趋势。 从基础的柱状图出发,我们可以进一步探索更复杂的图表类型,如分组柱状图和堆积柱状图,它们在处理多维数据和复杂关系时具有独特的优势。 本文将深入探讨如何使用Plotly库绘制这些高级…

读DAMA数据管理知识体系指南28文件和内容管理概念(下)

读DAMA数据管理知识体系指南28文件和内容管理概念(下)1. 文件和档案 1.1. 文件(Document)是包含任务说明,对执行任务或功能的方式和时间的要求以及任务执行和决策的日志等的电子或纸质对象 1.2. 只有部分文件才能称为档案(Record)1.2.1. 档案可用于证明所做的决策和所采取的…

AMD Instinct™MI300系列微架构

AMD Instinct™MI300系列微架构 AMD Instinct MI300系列加速器基于AMD CDNA 3架构,旨在为HPC、人工智能(AI)和机器学习(ML)工作负载提供领先性能。AMD Instinct MI300系列加速器非常适合极端的可扩展性和计算性能,可以在单个服务器到世界上最大的EB级超级计算机的所有设备…

节点级架构与MI300和MI200系列性能计数器和指标

节点级架构 MI300系列节点级架构,显示了8个完全互连的MI300X OAM模块,通过重定时器和HGX连接器连接到(可选)PCIEe交换机。 如图5-9所示,显示了具有双插槽配置的AMD EPYC处理器和八个AMD Instinct MI300X加速器的系统的节点级架构。MI300X OAM通过PCIe Gen 5 x16链路(黄线…

推荐专著《AI芯片开发核心技术详解》(1)、《智能汽车传感器:原理设计应用》(2)、《TVM编译器原理与实践》(3)、《LLVM编译器原理与实践》(4)

4本书推荐《AI芯片开发核心技术详解》、《智能汽车传感器:原理设计应用》、《TVM编译器原理与实践》、《LLVM编译器原理与实践》由清华大学出版社资深编辑赵佳霓老师策划编辑的新书《AI芯片开发核心技术详解》已经出版,京东、淘宝天猫、当当等网上,相应陆陆续续可以购买。该…

GPU到GPU通信选项

GPU到GPU通信选项 将讨论使用AMD Instinct™MI250和AMD InstinctTM MI250X GPU的系统中的GPU到GPU通信选项。每个MI250(X)GPU由两个图形计算芯片(GCD)组成。如图4-20所示,显示了具有4个MI250 GPU(8个GCD)的节点的示意图。每个绿色框代表一个MI250 GPU和两个GCD。GCD通过…

2025年3月月记

2025.3.1 新的一月到来啦!今天干了个啥呢?好像没干啥也是把昨天编程学习的作业做了,待会又要去学S组的知识了,我先去刷题了。。。 OK啊,也是把课学完了,待会我又要去打atcoder了,今天学的是差分约束,其实就是图上的知识,主要的表达形式是:u <= v + w或者u >= v…

Linux版本的MAT(Eclipse Memory Analyzer)内存分析工具使用

首先先下载对应平台的工具 官方地址:https://eclipse.dev/mat/download/ 因为我是arm的架构 所以下载的是arm64的安装包 下载完成后解压 得到以下内容 先修改初始化的启动的内存大小 vim MemoryAnalyzer.ini 主要修改这个值 这个要尽量大点 不然我们的内存分析文件很大 会执行…

【CodeForces训练记录】Codeforces Round 1011 (Div. 2)

训练情况赛后反思 B题因为分讨的问题WA了一发,异或还是不大会做 A题 猜猜题,显然对于字符串全部都是一个字母的,无论怎么换字典序都不可能更小,对于其他情况因为可以选择两个字母互换,我们容易观察到对于某一个字符串一定存在一种换法能让字典序更小(无非就是换头或者换尾…

集美大学课程实验报告-实验3:栈、队列与递归

集美大学课程实验报告-实验3:栈、队列与递归项目名称 内容课程名称 数据结构班级 网安2413指导教师 郑如滨学生姓名 林沁茹学号 202421336067实验项目名称 实验3:栈、队列与递归上机实践日期上机实践时间 2学时一、目的(本次实验所涉及并要求掌握的知识点) 以下内容请根据实…

2025-03-22 闲话

2025-03-22 闲话有些闲话是纪实的,它们可能只是平淡的文字。它们可能没有感受,不带思考。你看不到装饰,只有琐碎、补也补不到自圆其说的细节。柴米油盐大抵是这样的。 来北京独居后的生活着实安逸。每天执行一个蛮正常的作息,保证三顿饮食、偶尔晚上和网友去搓搓夜宵。睡觉…

3.22 三重积分计算方法

三重积分的实际意义:计算一个立体的质量(可以) 1 投影法(先一后二)(一个土豆切成土豆丝,最后再累加Dxy平面) 一个立体图形可以看成是两个曲面拼接而成,z=(x,y)可表示一个曲面假设x和y都是确定的,然后就累加z,最后再算面积分 先假设有一条竖线,注意竖线是从哪里进入…