消息持久化
1. RabbitMQ 发送与消费消息的模型

2. 消息丢失的几种情况?
-
生产者发送消息未到达交换机
-
消息到达交换机,没有正确路由到队列
-
MQ 宕机,队列中的消息不见了
-
消费者收到消息,还没消费,消费者宕机
3. 如何保证消息不丢失?
3.1 生产者确认机制
-
publisher-confirm
-
消息成功投递到交换机,返回 ack
-
消息未成功投递到交换机,返回 nack
记录消息以及交换机等相关信息到数据库,后期可以编写任务去补偿发送
-
-
publisher-return
-
未正确到达队列,返回 ack 及失败原因
记录消息以及交换机等相关信息到数据库,后期可以编写任务去补偿发送
-
图示

实现
-
配置文件
spring:rabbitmq:host: 192.168.200.130 # 虚拟机 IPport: 5672 # 端口virtual-host: / # MQ 的虚拟主机username: usernamepassword: passwordpublisher-confirm-type: correlatedpublisher-returns: true # 开启 publisher-returnstemplate:mandatory: true
参数说明:
publish-confirm-type
:开启publisher-confirm
none
:关闭 confirm 机制simple
:同步阻塞等待 MQ 的回执(回调方法)correlated
:MQ 异步回调返回回执
template.mandatory
:定义消息路由失败时的策略。- true:调用 ReturnCallback
- false:则直接丢弃消息
-
定义 ConfirmCallback
ConfirmCallback 可以在发送消息时指定,因为每个业务处理 confirm 成功或失败的逻辑不一定相同。
public void testSendMessage2SimpleQueue() throws InterruptedException {// 1 消息体String message = "hello, spring amqp!";// 2 全局唯一的消息 ID,需要封装到 CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3 添加 callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()) {log.debug("消息发送成功, ID:{}", correlationData.getId());} else {log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage()));// 4 发送消息rabbitTemplate.convertAndSend("", "simple.queue", message, correlationData);// 休眠一会儿,等待 ack 回执Thread.sleep(2000); }
-
定义 Return 回调
每个 RabbitTemplate 只能配置一个 ReturnCallback,因此需要在项目加载时配置。
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取 RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置 ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});} }
3.2 持久化机制
-
交换机持久化:
默认就是持久化,durable 默认就是 true
-
队列持久化
默认就是持久化,durable 默认就是true
-
消息持久化
默认就是持久化。在发送消息时,使用 Message 对象,并设置 delivery-mode 为持久化
3.3 消费者 ack 机制

ack 取值情况:
-
none:只要消息到达消费者,消费者直接返回 ack 给 MQ
MQ 收到 ack,会把队列中的消息删除,消息可能会丢失
-
消费者配置
spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭 ack
-
-
manual:手动 ack
- 消费成功,调用 API 给 MQ 返回 ack
- 消费失败,调用 API 给 MQ 返回 nack,并且让消息重回队列
消费者配置
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 手动 ack
测试代码:
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) {try {// 从 redis 获取一个 retry_count >= 3 直接记录日志,不重回队列,中断操作 returnlog.warn("消费者接收到 simple.queue 的消息:{}", msg);int i = 1 / 0;log.info("消息成功消费了 ---> SUCCESS");// 手动 ack// 可以使用 org.springframework.amqp.core.Messagee 拿到 deLiveryTagchannel.basicAck(deliveryTag, false);} catch (Exception e) {e.printStackTrace();try {// 返回 nack,并且让消息重回队列channel.basicNack(deliveryTag, false, true);Thread.sleep(1000);log.error("消息消费失败,重回队列-->");// 向 redis 中设置值// redisTemplate.opsForValue().incr(retry_count)} catch (Exception ex) {ex.printStackTrace();}} }
-
auto:自动 ack。消费消息不出异常,返回 ack 给 MQ。消费消息出异常了,返回 nack,把消息重回队列
-
本地重试
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初始的失败等待时长为1秒multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true 无状态;false 有状态。如果业务中包含事务,这里改为 false
达到重试次数后,还是失败,则返回 ack,不 requeue。MQ 会删除队列消息
-
失败策略
-
RejectAndDontRequeueRecoverer
:重试耗尽后,直接 reject,丢弃消息。默认方式 -
ImmediateRequeueMessageRecoverer
:重试耗尽后,返回 nack,消息重新入队 -
RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
-
-
使用 RepublishMessageRecoverer
需求:把消息投递到失败的交换机,路由队列。记录日志,将来人工干预
实现
-
定义错误交换机、队列、绑定关系。定义 RepublishMessageRecoverer
-
监听错误队列
-
-
4. 总结
-
创建交换机、队列、消息进行持久化
-
交换机、队列默认就是持久化的
-
消息持久化
-
-
生产者开启确认机制
-
开启消息发送失败的重试策略
-
设置重试次数和重试间隔比例
-
耗尽重试次数后,依旧失败,记录失败消息到数据库失败消息表,用于后期执行补偿错误。如使用定时任务去扫描这个表,重新发送消息
-
-
开启 confirm 机制:保证消息正确到达交换机
-
返回 ack,正确到达
-
返回 nack,没有到达交换机,写入数据库,后期重试
-
-
开启 return 机制
-
保证消息正确到达队列
-
没有到达队列,会调用ReturnCallback,写入数据库,后期重试
-
-
-
消费者确认机制
-
开机自动确认机制
-
开启重试策略
重试次数耗尽后,定义RepublishMessageRecoverer策略来让消息路由到错误队列,落库
-
