MQ如何保证可靠性

       📝个人主页:五敷有你      

 🔥系列专栏:MQ

⛺️稳中求进,晒太阳

消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。

2.数据持久化

为了提高性能,默认情况下,MQ的数据都是再内存存储的临时数据,重启后就会消失,为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化

  • 队列持久化

  • 消息持久化

我们以控制台界面为例来说明。

2.1.交换机持久化

在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:

image.png

设置为Durable就是持久化模式,Transient就是临时模式。

2.2.队列持久化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

image.png

除了持久化以外,你可以看到队列还有很多其它参数,有一些我们会在后期学习。

2.3.消息持久化

在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties

image.png

说明

        在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。

        不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

 代码层次实现:

 @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "msg.queue",durable ="true"),exchange = @Exchange(name = "msg.topic",type = ExchangeTypes.TOPIC,durable = "true"),key = "msg"))public void listenMsg(String jsonStr){log.info("收到消息{}", jsonStr);Map<String,Object> map = JSONUtil.toBean(jsonStr, Map.class);JSONObject object=new JSONObject(map);String actionName =object.getString(Action.ACTION);Action action = getAction(actionName);action.doMessage(getWebSocketManager(),object);}

3.消费者的可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障

  • 消费者接收到消息后突然宕机

  • 消费者接收到消息后,因处理不当导致异常

RabbitMQ如何得知消费者的处理状态呢?

3.1消费者确认机制

        为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息

  • nack:消息处理失败,RabbitMQ需要再次投递消息

  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

        一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

        由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

测试下面代码验证: 在none情况下,异常产生后,消息队列中的消息被删除了

  @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "msg.queue",durable ="true"),exchange = @Exchange(name = "msg.topic",type = ExchangeTypes.TOPIC,durable = "true"),key = "msg"))public void listenMsg(String jsonStr){log.info("收到消息{}", jsonStr);throw new RuntimeException("异常");}

  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回nack

    • 如果是消息处理或校验异常,自动返回reject;

测试下面代码验证: 在auto情况下,异常产生后,消息一直在被重复投递,

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):放行以后,由于抛出的是业务异常,所以Spring返回ack,最终消息恢复至Ready状态,并且没有被RabbitMQ删除: 这个一直Unack的状态。当我们把配置改为auto时,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

如果是消息转换异常,spring会返回reject

刚刚发现,当消费者出现异常后,消息会不断的requeue(重入队)到队列,再重新发送给消费者,如果消费者执行依然出错,消息会再次投递到队列,直到处理成功为止。

极端情况下,消费之一直无法执行成功,那么消息requeue就会无限循环,导致mq的处理消息飙升,带来不必要的压力。

3.2.失败重试机制

为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次

  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

试了三次,失败就停下来了。

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试

  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

3.2失败处理策略

本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
package com.aqiuo.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic TopicExchange errorMessageExchange(){return new TopicExchange("error.topic",true,false);}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, TopicExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/670203.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在uniapp里面使用 mp-html 并且开启 latex 功能

在uniapp里面使用 mp-html 并且开启 latex 功能 默认情况下 mp-html 是不会开启 latex 功能的, 如果需要开启 latex 功能是需要到代码操作拉取代码自行打包的。 这里说一下 mp-html 里面的 latex 功能是由 https://github.com/rojer95/katex-mini 提供的技术实现&#xff0c;…

与 Apollo 共创生态:Apollo 7 周年大会的启示与心得

文章目录 前言Apollo X 全新征程Application X 企业预制套件总结 前言 在过去的七年中&#xff0c;Apollo 开放平台经历了一段令人瞩目的发展历程。从最初的构想到如今的成熟阶段&#xff0c;Apollo 已经推出了 13 个版本&#xff0c;吸引了来自全球 170 多个国家和地区的 16 …

大数据技术主要学什么,有哪些课程

大数据技术是指在海量数据的环境下&#xff0c;采集、存储、处理、分析和管理数据的一系列技术与方法。随着互联网、物联网以及各种智能设备的普及&#xff0c;数据量呈爆炸性增长&#xff0c;传统数据处理手段已难以应对&#xff0c;因此大数据技术应运而生&#xff0c;旨在从…

[Collection与数据结构] 七大排序算法汇总

&#x1f338;个人主页:https://blog.csdn.net/2301_80050796?spm1000.2115.3001.5343 &#x1f3f5;️热门专栏:&#x1f355; Collection与数据结构 (90平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm1001.2014.3001.5482 &#x1f9c0;Java …

PyTorch中 DataLoader 和 TensorDataset 的详细解析

DataLoader 和 TensorDataset PyTorch DataLoader 和 TensorDataset 的详细解析DataLoader 介绍DataLoader 的核心功能 TensorDataset 介绍TensorDataset 的核心功能 使用 DataLoader 和 TensorDataset 加载数据关键内容解析 结论 PyTorch DataLoader 和 TensorDataset 的详细解…

VTK —— 三、图形格式 - 示例1 - 读取.vtp文件并输出.ply文件(附完整源码)

代码效果&#xff1a;演示程序读取.vtp后输出.ply文件&#xff0c;使用paraview打开该输出的.ply文件 本代码编译运行均在如下链接文章生成的库执行成功&#xff0c;若无VTK库则请先参考如下链接编译vtk源码&#xff1a; VTK —— 一、Windows10下编译VTK源码&#xff0c;并用V…

堡垒机——网络技术手段

目录 一、简介 1.什么是跳板机 2.跳板机缺陷 3.什么是堡垒机 4.为什么要使用堡垒机 4.1堡垒机设计理念 4.2堡垒机的建设目标 4.3堡垒机的价值 4.4总结 5.堡垒机的分类 6.堡垒机的原理 7.堡垒机的身份认证 8.堡垒机的运维方式常见有以下几种 9.堡垒机其他常见功能…

低代码优于无代码?

从1804年打孔式编程出现&#xff0c;编程语言至今已经存在了200多年。而从50年代以来&#xff0c;新的编程语言也不断涌现&#xff0c;现在已经有250多种了。这就意味着&#xff0c;开发人员最需要习惯的事情就是不断改变。 编程界最近的一个变化是集成开发环境&#xff08;IDE…

如何使用摇摆交易?fpmarkets实例讲解

各位投资者五一节后快乐&#xff01;祝愿投资者在接下来的日子里每次交易都以盈利结算。 五一节日也是劳动节&#xff0c;在这个特殊的日子里fpmarkets澳福和各位勤劳的投资者一起学习如何使用摇摆交易策略进行交易&#xff1f; 其实很简单&#xff0c;首先判断出买卖点&#x…

FANUC机器人故障诊断—报警代码(五)

FANUC机器人故障诊断中关于报警代码的介绍更新如下&#xff1a; 一、报警代码&#xff08;SRVO-214&#xff09; SRVO-214 6轴放大器保险丝熔断 [原因]6轴伺服放大器上的保险丝(FS2,FS3)已熔断。括号内的数字表示在第几台6轴伺服放大器上检测出了保险丝熔断。 [对策] 1.保险…

省公派出国|社科类普通高校教师限期内赴英国访学交流

在国外访问学者申请中&#xff0c;人文社科类相对难度更大&#xff0c;尤其是英语语言学&#xff0c;作为非母语研究并不被国外高校看重。经过努力&#xff0c;最终我们帮助Z老师申请到英国坎特伯雷基督教会大学的访学职位&#xff0c;并在限期内出国。 Z老师背景&#xff1a; …

开源之夏 2024 学生报名通道现已正式开启!奖金都是12000元,冲啊!!!

Apache SeaTunnel作为数据集成平台的先行者&#xff0c;数以千计的开发者活跃在这个开源社区&#xff0c;我们深知开源社区就像是“众人拾柴火焰高”&#xff0c;希望有更多的青年力量能参与到社区的建设中来&#xff01; 在前段时间&#xff0c;我们不仅成功入选中科院软件所主…