RabbitMQ学习二

RabbitMQ学习二

  • 发送者的可靠性
    • 生产者连接重试机制
    • 生产者确认机制
      • 开启生产者确认
      • 定义ReturnCallback
      • 定义confirmCallback
  • MQ的可靠性
    • 交换机和队列持久化
    • 消息持久化
    • LazyQueue
      • 控制台配置Lazy模式
      • 代码配置Lazy模式
  • 消费者的可靠性
    • 失败重试机制
    • 失败处理策略
    • 业务幂等性
      • 唯一消息ID
      • 业务判断
  • 学习代码获取方式

在使用消息队列的时候,最重要的问题就是,保证消息的安全性,也就是如何防止消息丢失。就必须从3个方面入手:

  • 确保生产者一定把消息发送到MQ
  • 确保MQ不会将消息弄丢
  • 确保消费者一定要处理消息

在这里插入图片描述

发送者的可靠性

生产者连接重试机制

第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试
修改publisher模块的application.yml文件,添加下面的内容:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

使用命令停掉docker中的rabbitMQ服务

docker stop mq

测试发送一条消息会发现会每隔1秒重试1次,总共重试了3次。消息发送的超时重试机制配置成功了!
在这里插入图片描述

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,

生产者确认机制

一般情况下,发送者都会发送成功,在少数情况下,消息会发送失败,主要会出现以下几种现象:

  • MQ内部处理消息的进程发生了异常
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由

RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。 具体如图所示:
在这里插入图片描述
总结如下:

  • 消息投递到MQ,但是会路由失败,此时会通过Publisher Return返回路由异常原因(一般是路由写错了),然后ACK,告知投递成功
  • 临时消息投递到MQ,并且入队成功,返回ACK,告知投递成功
  • 持久化消息投递到MQ,并且入队完成持久化,告知投递成功
  • 其他情况返回NACK,告知投递失败。

其中ack和nack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。 默认两种机制都是关闭状态,需要通过配置文件来开启。

开启生产者确认

在publisher模块的application.yaml中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制

  • simple:同步阻塞等待MQ的回执(不适合)

  • correlated:MQ异步回调返回回执(推荐)

定义ReturnCallback

@Slf4j
@Configuration
public class RabbitmqConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.error("触发return callback,");log.info("message: {}", message.toString());log.info("err_code: {}", i);log.info("err_msg:{}", s);log.info("exchange: {}", s1);log.info("route: {}", s2);}});}
}

定义confirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:CorrelationData
在这里插入图片描述

向交换机发送消息,并且添加confirmCallback

    @Testpublic void directProducer() throws InterruptedException {//创建CorrelationDataCorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑-spring处理的异常跟rabbitmq没关系,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.info("消息成功发送 收到ack");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 交换机名称String exchangeName = "hmall.direct";// 消息Map<String, Object> msg = new HashMap<>();msg.put("姓名","produce");msg.put("routingKey","blue");rabbitTemplate.convertAndSend(exchangeName,"blue",msg,cd);//单元测试中 需要等待异步回调 否则接受不到回调消息 Thread.sleep(2000);}
  • 如果传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack(只是交换机收到消息)。
  • 当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。
  • 而如果连交换机都是错误的,则只会收到nack。

注意: 开启生产者确认比较消耗MQ性能,一般不建议开启,而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

MQ的可靠性

消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

 1 交换机持久化2 队列持久化3 消息持久化

交换机和队列持久化

交换机和队列的持久化是在界面创建时设置Durability参数为Durable模式,Transient就是临时模式。
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/ba5f5912c1324749a8f143e7f2482ccb.png

消息持久化

在控制台发送消息时,消息的持久化是配置一个properties
在这里插入图片描述
说明:在开启消息持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。 不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在几毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

持久化消息会同时写入磁盘和内存(加快读取速度),非持久化消息会暂存在内存中,只有当内存不够用时,此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。

LazyQueue

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储
    而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

控制台配置Lazy模式

在添加队列的时候,添加x-queue-mod=lazy参数即可设置队列为Lazy模式:
在这里插入图片描述

代码配置Lazy模式

在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:

@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}

可以基于注解来声明队列并设置为Lazy模式:

@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}

消费者的可靠性

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:**当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。**回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
  • 一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    1. 如果是业务异常,会自动返回nack;
    2. 如果是消息处理或校验异常,自动返回reject;

通过在消费者服务下面的配置可以修改SpringAMQP的ACK处理方式:

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 自动模式

然后在队列消费者添加RuntimeException异常,auto模型就会一直返回nack 消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

@RabbitListener(queuesToDeclare = @Queue(name = "simple.queue"))public void simpleQueueListener(String msg){System.out.println("simpleQueueListener 收到消息【"+msg+"】");throw new RuntimeException("故意的");}

失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。 极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力:

 当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:

在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。 因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:(策略模式)

- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个专门存放异常消息的队列,后续由人工集中处理。
在这里插入图片描述
配置存放异常消息的队列的代码如下:

@Configuration
//属性满足时  生效
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {// 在consumer服务中定义处理失败消息的交换机和队列@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}//定义一个RepublishMessageRecoverer,关联队列和交换机@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

重新模拟上面的发送异常消息,消费者收到三次后,消息便投递投了error.queue
在这里插入图片描述

业务幂等性

在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。 然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断

唯一消息ID

给每个消息都设置一个唯一id,利用idl区分是否是重复消息

  • 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  • 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  • 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。

以Jackson的消息转换器为例:

@Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}

业务判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。 例如 处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

学习代码获取方式

学习代码地址springboot-rabbitmq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/266153.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MySQL系列】Centos安装MySQL

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

win 10 hp hotkey uwp service占用内存高解决方法

hp hotkey uwp service hp hotkey uwp service high cpu hp audio analytics service high cpu 我是惠普战66笔记本, 这个问题断断续续好久了都没有得到解决, 作为一个能折腾的人, 热键也就亮度和声音是常用的, 而且鼠标进行这些操作也很简单, 最后想了想干脆直接把该服务关闭了…

6个免费设计素材库,设计师都在用,赶紧收藏!

设计师应该都知道&#xff0c;在设计过程中找素材真的很费时间&#xff0c;有的时候全网翻遍都未必能找到自己想要的&#xff0c;以至于现在很多设计师都花钱去购买素材&#xff0c;你说要是拿去参赛或者商用还好&#xff0c;就拿平常设计来说你舍得花这个钱去买吗&#xff0c;…

Swift “黑魔法”之动态获取类实例隐藏属性的值

概览 在 Swift 代码的调试中,我们时常惊叹调试器的无所不能:对于大部分“黑盒”类实例的内容,调试器也都能探查的一清二楚。 想要自己在运行时也能轻松找到 Thread 实例“私有”属性的值吗(比如 seqNum)? 在本篇博文中您将学到如下内容: 概览1. 借我,借我,一双慧眼吧…

JDK多版本集成 Jacoco 配置指南

JDK多版本集成 Jacoco 配置指南 本篇相关 JDK 版本配置如下&#xff1a; JDK8 JDK11 JDK17 Jacoco 是什么 Jacoco 是一个用于Java程序的代码覆盖率报告工具。它通过动态分析&#xff08;在代码执行时收集数据&#xff09;来生成代码覆盖率报告文件。Jacoco 支持多种覆盖率标…

数据结构与算法-Rust 版读书笔记-2线性数据结构-队列

数据结构与算法-Rust 版读书笔记-2线性数据结构-队列 1、队列&#xff1a;先进先出 队列是项的有序集合&#xff0c;其中&#xff0c;添加新项的一端称为队尾&#xff0c;移除项的另一端称为队首。一个元素在从队尾进入队列后&#xff0c;就会一直向队首移动&#xff0c;直到…

N皇后,回溯【java】

问题描述 八皇后问题是十九世纪著名的数学家高斯于1850年提出的。 问题是&#xff1a;在88的棋盘上摆放八个皇后&#xff0c;使其不能互相攻击&#xff0c;即任意两个皇后都不能处于同一行、同一列或同一斜线上。可以把八皇后问题扩展到n皇后问题&#xff0c;即在nn的棋盘上摆…

Shell三剑客:正则表达式简介

前言 一、名称解释 正则表达式&#xff08;regular expression&#xff0c;RE&#xff09;是一种字符模式&#xff0c;用于在查找过程中匹配指定的字符。在大多数程序里&#xff0c;正则表达式都被置于两个正斜杠之间&#xff1b;例如/l[oO]ve/就是由正斜杠界定的正则表达式&am…

川崎ZX-6R确定引进,636它真的来了,3C认证已过。

最新消息&#xff0c;兄弟们&#xff0c;你们期待已久的川崎ZX6R&#xff08;636&#xff09;基本已经确定引进了&#xff0c;官方的3C认证已经通过&#xff0c;那么从3C里面我们可以看到哪几个信息&#xff1f;产品代号ZX636J就是心心念念的ZX-6R了。 有些小伙伴不太清楚3C认…

[算法基础 ~排序] Golang 实现

文章目录 排序什么是排序排序的分类1. 冒泡1.1 冒泡排序1.2. 快速排序 2. 选择2.1 简单选择排序2.2 堆排序 3. 插入3.1 直接插入3.2 折半插入3.3 希尔排序 4. 归并排序代码实现 5. 基数排序 排序图片就不贴了吧 排序 什么是排序 以下部分动图来自CSDN ::: tip 稳定性的概念 …

我的 CSDN 三周年创作纪念日:2020-12-12

本人大叔一枚&#xff0c;自1992年接触电脑&#xff0c;持续了30年的业余电脑发烧爱好者&#xff0c;2022年CSDN博客之星Top58&#xff0c;阿里云社区“乘风者计划”专家博主。自某不知名财校毕业后进入国有大行工作至今&#xff0c;先后任职于某分行信息科技部、电子银行部、金…

游戏中小地图的制作__unity基础开发教程

小地图的制作 Icon标识制作制作摄像机映射创建地图UI效果“不一样的效果” 在游戏中经常可以看到地图视角的存在&#xff0c;那么地图视角是如何让实现的呢&#xff1f; 这一期教大家制作一个简易的小地图。 &#x1f496;点关注&#xff0c;不迷路。 老样子&#xff0c;我们还…