RabbitMQ-如何保证消息不丢失

RabbitMQ常用于 异步发送,mysql,redis,es之间的数据同步 ,分布式事务,削峰填谷等.....

在微服务中,rabbitmq是我们经常用到的消息中间件。它能够异步的在各个业务之中进行消息的接受和发送,那么如何保证rabbitmq的消息不丢失就显得尤为重要。

首先要分析问题,我们就要明确rabbitmq在什么时候可能会出现消息丢失的情况呢?

我们直接说结果

RabbitMQ在每个阶段都有可能使消息发生丢失

我们在这里把他们简单归结为三个层面

层面一 :生产者发送消息没有到达交换机或者没有到达绑定的队列。

层面二:RabbitMQ宕机可能导致的消息的丢失。

层面三:消费者宕机导致消息丢失。

层面一的解决方法常见的是

1.生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到Mq的过程中丢失,消息发送到Mq以后,会返回一个结果给发送者,表示消息的发送成功。

情况一:发送成功 生产者正常发送消息到队列之后会返回一个publish-confirm ack 这个意思是告诉生产者已经接收到消息了。

情况二:发送失败 这里的发送失败有两种,一种是生产者发送到交换机失败 此时返回 publish-confirm nack  。第二种是生产者发送到队列失败 返回 publish-return ack。

开启生产者确认机制的代码如下 ,在生产者的配置文件中加入以下配置
 

spring:rabbitmq:publisher-confirm-type: correlated #开启生产者确认机制publisher-returns: true

这里的

publisher-confirm-type:有三种模式可以选择:

第一种是none:代表关闭confirm机制

第二种是 simple:表示同步阻塞并等待mq的回执消息,即发送完消息后不能干其他的事情,只能等待mq的回执,很显然这样效率很低。

第三种是correlated:MQ异步回调方式返回回执消息,即生产者发送完消息后可以干其他的事情,直到接收到mq的回执。很明显这种效率要优于第二种。

配置return callback的代码如下,每个RabbitTemplate只能配置一个 代码如下
 

package com.itheima.publisher.com.it.heima.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;/*** @Auther: QuJingChuan* @Date: 2024/1/13 10:34* @Description:*/
@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//配置回调rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.debug("收到消息return的callback,  {},{},{},{},{}",returnedMessage.getExchange(),returnedMessage.getRoutingKey(),returnedMessage.getMessage(),returnedMessage.getReplyCode(),returnedMessage.getReplyText());}});}
}

Confirm Callback需要每次发消息的时候都要配置(要制定发消息的id方便回执的时候直到是谁发的消息)这里写一个测试类方便大家看。

 @Testvoid testConfirmCallback() throws InterruptedException {//创建cd 参数为每次发送消息的idCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());//添加confirmCallBackcorrelationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {//这种情况一般是运行出现bug,一般不会发生。log.error("消息回调失败",ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {log.debug("收到confirm callback 回执");if (result.isAck()){//消息发送成功log.debug("消息发送成功收到ack");}else {//消息发送失败log.debug("消息发送失败收到nack,原因:{}",result.getReason());//TODO 重发消息等业务}}});rabbitTemplate.convertAndSend("amqp.test","amqptest","hello qjc",correlationData);Thread.sleep(2000);}

那么我们如何解决这个问题呢
方案一:重发消息 

方案二:记录日志

方案三:保存到数据库中定时发送,发送成功后删除表中的数据。

方案四:交给人工处理。

~生产者确认机制需要额外的网络和系统的资源开销,尽量不要使用。

~如果业务需要,那么无需开启publisher-return机制,因为一般路由失败都是自己业务的原因。

~对于nack消息可以有限次数的重试,依然失败则记录异常消息。

层面二的解决方法常见的是

2.消息持久化

由于mq是基于内存存储消息的,那么在mq服务宕机等一些情况下可能导致消息的丢失。同时内存空间有限,当消费者出现故障或者处理过慢,会导致消息积压,mq会对消息做迁移(page out 写入磁盘)从而引发mq阻塞。我们将消息存储在磁盘上就避免了这个问题。

一 :持久化交换机。

这里要选择Durable,因为Transient是临时交换机,当mq宕机后会消失。

代码展示
 

 @Beanpublic DirectExchange simpleExchange(){//分别是三个参数 交换机名称 是否持久化 当没有队列绑定时是否自动删除return new DirectExchange("qjc.exchange",true,false);}

二 :持久化队列。

这个与交换机类似,在此不做赘述。

代码展示

@Beanpublic Queue simpleQueue(){//springamqp在使用QueueBuilder来创建队列的时候,默认就是持久化的return QueueBuilder.durable("qjc.queue").build();}

三 :持久化消息。

这里选择delivery mode 选择2 ,1是不持久的。

代码展示

 Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
如果不选择持久化队列,交换机,消息的话我们还有另一种方案

Lazy Queue(惰性队列)

惰性队列的特征如下

~接受到消息的时候直接存入磁盘而非内存(内存中只保留最近的消息)

~消费者需要消息的时候才会从磁盘中取出数据加载到内存

~支持数百万条的消息存储

在mq3.12版本后,所有的队列都是Lazy Queue模式,无法更改。

如果各位小伙伴的版本低于3.12那我这里提供了两种方式创建惰性队列

或用注解声明

    @RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode",value = "lazy")))public void listenLazyQueue(String msg){log.debug("接收到lazyqueue的消息" + msg);}

3.消费者确认机制

RabbitMQ支持消费者确认机制,即:当消费者处理消息后可以向mq发送ack回执,mq收到消息后会在队列中删除该消息。

SpringAMQP已经实现了消息确认的功能,并且允许我们通过配置文件选择ack的处理方式,有三种方式。

- none: 不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用  
- manual: 手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活  
- auto: 自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.  
当业务出现异常时,根据异常判断返回不同结果:  
- 如果是业务异常,会自动返回nack  
- 如果是消息处理或校验异常,自动返回reject

注意我们需要再消费者的配置文件中加入参数

这就是mq保证消息不丢失的一些方式和解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/438381.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python根据Excel表进行文件重命名

一、问题背景 在日常办公过程中&#xff0c;批量重命名是经常使用的操作。之前我们已经进行了初步探索&#xff0c;主要是通过批处理文件、renamer软件或者Python中的pathlib等模块对当前目录下的文件进行批量重命名。 而今天我们要使用的是PythonExcel的方法对指定目录下的文…

Thymeleaf基础教程

系列文章目录 文章目录 系列文章目录一、Thymeleaf 语法规则二、Thymeleaf 语法分为以下 2 类标准表达式语法th 属性2.1 基础语法2.1.1 变量表达式 ${}2.1.2 选择变量表达式 *{}2.1.3 链接表达式 {} 2.1.4 消息表达式 三、常用的 th 标签四、迭代循环 一、Thymeleaf 语法规则 …

C++类和对象引入以及类的介绍使用

文章目录 一、面向过程和面向对象的初步认识二、类的引入2.2 类的引入 三、类的访问限定符及封装3.3 访问限定符3.4 【面试题】C中struct和class的区别3.5 类的两种定义方式 四、封装【面试题】面向对象的三大特性 五、类的作用域六、类的实例化七、类对象模型7.1 类对象的存储…

网络通信实现

【 一 】网络通信实现 【 1 】实现网络通信的四要素 本机的ip地址 子网掩码 网关的IP地址 DNS的IP地址( 域名系统) DNS服务器是指提供域名解析服务的服务器。它负责将域名转换为相应的IP地址&#xff0c;以便计算机可以通过IP地址与其他设备进行通信。 通过使用DNS服务器…

ESP8266采用AT指令连接华为云服务器方法(MQTT固件)

一、前言 本篇文章主要介绍3个内容&#xff1a; &#xff08;1&#xff09;ESP8266-WIFI模块常用的AI指令功能介绍 &#xff08;2&#xff09;ESP8266烧写MQTT固件连接华为云IOT服务器。 &#xff08;3&#xff09;介绍华为云IOT服务器的配置过程。 ESP8266是一款功能强大…

LVGL部件

一.标签部件 1.如何创建标签部件以及设置文本 ![2024-01-28T09:54:08.png][3] void my_lvgl(void) {lv_obj_t *lablelv_label_create(lv_scr_act()); //创建一个标签lv_label_set_text(lable,"hello"); //普通更改文字lv_label_set_text_fmt(lab…

统计学-认识数据

数据 如&#xff1a; 定性数据&#xff1a; 性别&#xff1a;男、女 颜色&#xff1a;红、绿、青、蓝、紫 教育程度&#xff1a;高中、本科、硕士、博士 评价&#xff1a;好评、中评、差评 定量数据&#xff1a; 年份&#xff1a;2019、2018、2017、2016 温度&#xff1a;10、…

专业133总分400+上海交通大学819考研经验分享上交819电子信息与通信工程

今年专业819信号系统与信号处理133&#xff0c;总分400&#xff0c;如愿考上梦中上海交通大学&#xff0c;通过自己将近一年的复习&#xff0c;实现了人生中目前为止最大的逆袭&#xff08;自己本科学校很普通&#xff09;&#xff0c;总结自己的复习经历&#xff0c;希望可以给…

C++ 数论相关题目 扩展欧几里得算法(裴蜀定理)

给定 n 对正整数 ai,bi &#xff0c;对于每对数&#xff0c;求出一组 xi,yi &#xff0c;使其满足 aixibiyigcd(ai,bi) 。 输入格式 第一行包含整数 n 。 接下来 n 行&#xff0c;每行包含两个整数 ai,bi 。 输出格式 输出共 n 行&#xff0c;对于每组 ai,bi &#xff0c;求…

多只动物3D姿态估计与行为识别系统

动物社会行为的量化是动物科学研究的重要步骤。虽然现有的深度学习方法已经实现了对常见动物的精确姿态估计、识别和行为分类&#xff0c;但由于缺乏注释良好的数据集&#xff0c;其应用依然受到挑战。因此该研究展示了一个计算框架&#xff0c;即社会行为图谱&#xff08;SBeA…

MkDocs 部署指南

简介 MkDocs 可以同时编译多个 markdown 文件&#xff0c;形成书籍一样的文件。有多种主题供你选择&#xff0c;很适合项目使用。 MkDocs 是快速&#xff0c;简单和华丽的静态网站生成器&#xff0c;可以构建项目文档。文档源文件在 Markdown 编写&#xff0c;使用单个 YAML …

大数据StarRocks(八):资源隔离实战

前言 自 2.2 版本起&#xff0c;StarRocks 支持资源组管理&#xff0c;集群可以通过设置资源组&#xff08;Resource Group&#xff09;的方式限制查询对资源的消耗&#xff0c;实现多租户之间的资源隔离与合理利用。在 2.3 版本中&#xff0c;StarRocks 支持限制大查询&#…