RabbitMQ高阶使用

1. 问题

img

2. 延时任务

2.1 什么是延时任务

在当前时间往后延迟多少时间执行的任务

2.1.1 和定时任务区别
  1. 定时任务有明确的触发时间,延时任务没有
  2. 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期
  3. 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务

2.2 延时队列使用场景

  1. 订单在十分钟之内未支付则自动取消。
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  3. 账单在一周内未支付,则自动结算。
  4. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  6. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

如美团点餐,超时时间

2.3 常见方案

2.3.1 数据库轮询

该方案通常是在小型项目中使用,即通过一个线程定时的去扫描数据库,通过订单时间来判断是否有超时的订单,然后进行update或delete等操作

优点

代码简单,复杂度小

缺点
  1. 对服务器内存消耗大
  2. 存在延迟,比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟
  3. 假设你的订单有几千万条,每隔几分钟这样扫描一次,数据库损耗极大
2.3.1 JDK的延迟队列

该方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。

'xxs'

优点

效率高,任务触发时间延迟低。

缺点
  1. 服务器重启后,数据全部消失,怕宕机
  2. 集群扩展相当麻烦
  3. 因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
  4. 代码复杂度较高
2.3.3 netty时间轮算法

时间轮算法可以类比于时钟,如图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick

'xxs'

这样可以看出定时轮由3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。

如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈,位置是在2圈之后的5上面(20 % 8 + 1)

优点

效率高,任务触发时间延迟时间比delayQueue低,代码复杂度比delayQueue低。

缺点
  • 服务器重启后,数据全部消失,怕宕机
  • 集群扩展相当麻烦
  • 因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
2.3.4 使用消息队列

'xxs'

可以采用RabbitMQ的延时队列,RabbitMQ具有以下两个特性,可以实现延迟队列

  • RabbitMQ可以针对Queue和Message设置 x-message-ttl,来控制消息的生存时间,如果超时,则消息变为dead letter
  • RabbitMQ的Queue可以配置x-dead-letter-exchangex-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了dead letter,则按照这两个参数重新路由。
优点

高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。

缺点

本身的易用度要依赖于RabbitMq的运维,因为要引用RabbitMq,所以复杂度和成本变高

2.4 延时队列

RabbitMQ中没有对消息延迟进行实现,但是可以通过TTL以及死信路由来实现消息延迟。
还有一种使用官方自带的插件, 插件的方式参考: 跳转

'xxs'

2.4.1 TTL(消息过期时间)

在介绍延时队列之前,还需要先介绍一下RabbitMQ中的一个高级特性——TTL(Time To Live)

TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒,换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

2.4.1.1 配置队列TTL

一种是在创建队列的时候设置队列的“x-message-ttl”属性

@Bean
public Queue taxiOverQueue() {Map<String, Object> args = new HashMap<>(2);args.put("x-message-ttl", 30000);return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build();
}

这样所有被投递到该队列的消息都最多不会存活超过30s,如果没有任何处理,消息会被丢弃,如果配置有死信队列,超时的消息会被投递到死信队列

2.5 死信队列

2.5.1 什么是死信队列

顾名思义就是无法被消费的消息

一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;

2.5.2 死信队列使用场景

RabbitMQ中的死信交换器(dead letter exchange)可以接收下面三种场景中的消息:

  • 消费者对消息使用了basicReject或者basicNack回复,并且requeue参数设置为false,即不再将该消息重新在消费者间进行投递
  • 消息在队列中超时,RabbitMQ可以在单个消息或者队列中设置TTL属性
  • 队列中的消息已经超过其设置的最大消息个数
2.5.3 死信队列如何使用

死信交换器不是默认的设置,这里是被投递消息被拒绝后的一个可选行为,是在创建队列的时进行声明的,往往用在对问题消息的诊断上。

死信交换器仍然只是一个普通的交换器,创建时并没有特别要求和操作,在创建队列的时候,声明该交换器将用作保存被拒绝的消息即可,相关的参数是x-dead-letter-exchange

'xxs'

2.5.4 相关代码
@Bean
public Queue taxiOverQueue() {Map<String, Object> args = new HashMap<>(2);// x-dead-letter-exchange    这里声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", TAXI_DEAD_QUEUE_EXCHANGE);// x-dead-letter-routing-key  这里声明当前队列的死信路由keyargs.put("x-dead-letter-routing-key", TAXI_DEAD_KEY);return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build();
}

2.6 延迟消息处理

2.6.1 延迟消息实现

在创建队列的时候配置死信交换器并设置队列的“x-message-ttl”属性

@Bean
public Queue taxiDeadQueue() {return new Queue(TAXI_DEAD_QUEUE,true);
}@Bean
public Queue taxiOverQueue() {Map<String, Object> args = new HashMap<>(2);// x-dead-letter-exchange    这里声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", TAXI_DEAD_QUEUE_EXCHANGE);// x-dead-letter-routing-key  这里声明当前队列的死信路由keyargs.put("x-dead-letter-routing-key", TAXI_DEAD_KEY);// x-message-ttl  声明队列的TTLargs.put("x-message-ttl", 30000);return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build();
}

这样所有被投递到该队列的消息都最多不会存活超过30s,超时后的消息会被投递到死信交换器

3. RabbitMQ消息可靠性保障

消息的可靠性投递是使用消息中间件不可避免的问题

'xxs'

从上面的图可以看到,消息的投递有三个对象参与:

  • 生产者
  • broker
  • 消费者

3.1 生产者保证

生产者发送消息到broker时,要保证消息的可靠性,主要的方案有以下2种

  • 失败通知
  • 发送方确认
3.1.1 RabbitMQ流程

生产者通过指定一个 exchange 和 routingkey 把消息送达到某个队列中去,然后消费者监听队列,进行消费处理, 但是在某些情况下,如果在发送消息时,当前的 exchange 不存在或者指定的 routingkey 路由不到,这个时候如果要监听这种不可达的消息,这个时候就需要失败通知。

'xxs'

不做任何配置的情况下,生产者是不知道消息是否真正到达RabbitMQ,也就是说消息发布操作不返回任何消息给生产者。

3.1.2 失败通知

如果出现消息无法投递到队列会出现失败通知

可以启动失败通知,在原生编程中在发送消息时设置mandatory标志,即可开启故障检测模式。

'xxs'

注意:它只会通知失败,而不会通知成功,如果消息正确路由到队列,则发布者不会受到任何通知,带来的问题是无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失

3.1.2.1 实现方式

spring配置

spring:rabbitmq:# 消息在未被队列收到的情况下返回publisher-returns: true

关键代码,注意需要发送者实现ReturnCallback接口方可实现失败通知

/*** 失败通知* 队列投递错误应答* 只有投递队列错误才会应答*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//消息体为空直接返回if (null == message) {return;}TaxiBO taxiBO = JSON.parseObject(message.getBody(), TaxiBO.class);if (null != taxiBO) {//删除rediskeyredisHelper.handelAccountTaxi(taxiBO.getAccountId());//记录错误日志recordErrorMessage(taxiBO, replyText, exchange, routingKey, message, replyCode);}
}
3.1.2.2 遇到的问题问题

如果消息正确路由到队列,则发布者不会受到任何通知,带来的问题是无法确保发布消息一定是成功的,因为路由到队列的消息可能会丢失

3.1.3 发送发确认

发送方确认是指生产者投递消息后,如果 Broker 接收到消息,则会给生产者一个应答,生产者进行接收应答,用来确认这条消息是否正常的发送到 Broker,这种方式也是消息可靠性投递的核心保障

rabbitmq消息发送分为两个阶段:

  • 将消息发送到broker,即发送到exchage交换机
  • 消息通过交换机exchange被路由到队列queue

一旦消息投递到队列,队列则会向生产者发送一个通知,如果设置了消息持久化到磁盘,则会等待消息持久化到磁盘之后再发送通知

注意:发送发确认只有出现RabbitMQ内部错误无法投递才会出现发送发确认失败。

发送方确认模式需要分两种情况下列来看

3.1.3.1 不可路由

当前消息到达交换器后对于发送者确认是成功的

'xxs'

首先当RabbitMQ交换器不可路由时,消息也根本不会投递到队列中,所以这里只管到交换器的路径,当消息成功送到交换器后,就会进行确认操作

另外在这过程中,生产者收到了确认消息后,那么因为消息无法路由,所以该消息也是无效的,无法投递到队列,所以一般情况下这里会结合失败通知来一同使用,这里一般会进行设置mandatory模式,失败则会调用addReturnListener监听器来进行处理。

发送方确认模式的另一种情况肯定就是消息可以进行路由

3.1.3.2 可以路由

只要消息能够到达队列即可进行确认,一般是RabbitMQ发生内部错误才会出现失败

'xxs'

可以路由的消息,要等到消息被投递到所有匹配的队列之后,broker会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。

如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号。

3.1.3.3 使用方式

spring配置

spring:rabbitmq:    # 开启消息确认机制publisher-confirm-type: correlated

关键代码,注意需要发送者实现ConfirmCallback接口方可实现失败通知

/*** 发送发确认* 交换器投递后的应答* 正常异常都会进行调用** @param correlationData* @param ack* @param cause*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {//只有异常的数据才需要处理if (!ack) {//关联数据为空直接返回if (correlationData == null) {return;}//检查返回消息是否为nullif (null != correlationData.getReturnedMessage()) {TaxiBO taxiBO = JSON.parseObject(correlationData.getReturnedMessage().getBody(), TaxiBO.class);//处理消息还原用户未打车状态redisHelper.handelAccountTaxi(taxiBO.getAccountId());//获取交换器String exchange = correlationData.getReturnedMessage().getMessageProperties().getHeader("SEND_EXCHANGE");//获取队列信息String routingKey = correlationData.getReturnedMessage().getMessageProperties().getHeader("SEND_ROUTING_KEY");//获取当前的消息体Message message = correlationData.getReturnedMessage();//记录错误日志recordErrorMessage(taxiBO, cause, exchange, routingKey, message, -1);}}
}
3.1.4 Broker丢失消息

如何在mq挂掉重启之后还能保证消息是存在的?

开启RabbitMQ的持久化,也即消息写入后会持久化到磁盘,此时即使mq挂掉了,重启之后也会自动读取之前存储的额数据

3.1.4.1 持久化队列
@Bean
public Queue queue(){return new Queue(queueName,true);
}
3.1.4.2 持久化交换器
@Bean
DirectExchange directExchange() {return new DirectExchange(exchangeName,true,false);
}
3.1.4.3 发送持久化消息

发送消息时,设置消息的deliveryMode=2

注意:如果使用SpringBoot的话,发送消息时自动设置deliveryMode=2,不需要人工再去设置

3.1.4.4 Broker总结

失败通知和发送方确认结合使用, 确保消息发送成功

3.2 消费方消息可靠性

3.2.1 消费者手动确认

RabbitMQ提供的ack机制,RabbitMQ默认是自动ack的,此时需要将其修改为手动ack,也即自己的程序确定消息已经处理完成后,手动提交ack,此时如果再遇到消息未处理进程就挂掉的情况,由于没有提交ack,RabbitMQ就不会删除这条消息,而是会把这条消息发送给其他消费者处理,但是消息是不会丢的。

3.2.1.1 配置文件
spring:rabbitmq:listener:simple:acknowledge-mode: manual  # 手动ack
3.2.1.2 参数介绍

acknowledge-mode: manual就表示开启手动ack,该配置项的其他两个值分别是none和auto

  • auto:消费者根据程序执行正常或者抛出异常来决定是提交ack或者nack
  • manual: 手动ack,用户必须手动提交ack或者nack
  • none: 没有ack机制

默认值是auto,如果将ack的模式设置为auto,此时如果消费者执行异常的话,就相当于执行了nack方法,消息会被放置到队列头部,消息会被无限期的执行,从而导致后续的消息无法消费。

3.3.1.3 消费者实现
@RabbitListener(bindings ={@QueueBinding(value = @Queue(value = RabbitConfig.TAXI_DEAD_QUEUE, durable = "true"),exchange = @Exchange(value = RabbitConfig.TAXI_DEAD_QUEUE_EXCHANGE), key = RabbitConfig.TAXI_DEAD_KEY)})@RabbitHandlerpublic void processOrder(Message massage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {TaxiBO taxiBO = JSON.parseObject(massage.getBody(), TaxiBO.class);try {//开始处理订单logger.info("处理超时订单,订单详细信息:" + taxiBO.toString());taxiService.taxiTimeout(taxiBO);//手动确认机制 参数二: 是否批量进行确认channel.basicAck(tag, false);} catch (Exception e) {e.printStackTrace();}}

3.3 业务可靠性分析

3.3.1 消息丢失

结合上面尽量减少消息的丢失, 如果丢失可以写入失败日志, 对业务进行回滚操作

3.3.2 幂等性校验

使用redis进行幂等性校验, 对key设置有效期, 或者MessageId入库

3.3.3 数据回滚

虽然无需做到消息完全不丢失以及消息的幂等性,但是需要考虑如果出现问题,需要将插入Redis的的key值回滚掉,防止影响业务正常判断

3.3.4 限流QOS

因为RabbitMQ是消息推送的模式, 大量消息服务器可能崩溃, 设置QOS解决

spring:rabbitmq:host: 192.168.153.130port: 5672username: guestpassword: guest#virtual-host:listener:simple:prefetch: 2  # 代表多少消息未被ack时,rabbitmq不会给消费者发送新的消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/21521.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

二、RocketMQ消息存储源码分析

RocketMQ源码深入剖析 6 Broker源码分析 Broker模块涉及到的内容非常多&#xff0c;本课程重点讲解以下技术点&#xff1a; 1、Broker启动流程分析 2、消息存储设计 3、消息写入流程 4、亮点分析&#xff1a;NRS与NRC的功能号设计 5、亮点分析&#xff1a;同步双写数倍性…

洛谷P1059 [NOIP2006 普及组] 明明的随机数

(一)Question 1. 问题描述 2. Input 输入有两行,第 1 行为 1 个正整数,表示所生成的随机数的个数 N。第 2 行有 N 个用空格隔开的正整数,为所产生的随机数。 3. Output 输出也是两行,第 1 行为 1 个正整数 M,表示不相同的随机数的个数。第 2 行为 M 个用空格隔开的正…

“简单易懂的排序:深入了解直接选择排序“

文章目录 &#x1f50d; 选择排序的原理与过程&#x1f4c8; 选择排序的优缺点&#x1f449; 代码实现 &#x1f50d; 选择排序的原理与过程 本文我们直接说一个优化过的直接选择排序。其思路大同小异. 选择排序的思路很简单 每次从待排序的数据中选择一个最小和最大的元素&a…

fatal: unable to access ‘http://xxxx‘: Empty reply from server

当你遇到 “fatal: unable to access ‘http://xxxx’: Empty reply from server” 的错误信息时&#xff0c;通常表示 Git 客户端无法连接到指定的服务器或仓库。 以下是一些可能导致该错误的原因以及一些排除故障的步骤&#xff1a; 错误的 URL&#xff1a;确保你提供的 URL…

pdf转图片操作方法是什么?分享两个简单的方法!

PDF转图片是一个常见的需求&#xff0c;无论是为了方便编辑、共享&#xff0c;还是为了其他用途&#xff0c;我们需要简单而有效的方法来实现这个目标。本文将介绍两种简单的PDF转图片方法&#xff1a;记灵在线工具和截图方法。 记灵在线工具是一个强大而易于使用的在线工具&a…

UNIX网络编程卷一 学习笔记 第二十三章 高级SCTP套接字编程

SCTP是一个面向消息的协议&#xff0c;递送给用户的是部分的或完整的消息。只有当发送大消息时&#xff0c;在对端才会递送部分的消息。部分消息被递送给应用后&#xff0c;多个部分消息组合成单个完整消息不由SCTP负责。在SCTP应用进程看来&#xff0c;一个消息既可由单个输入…

Mars3d采用ellipsoid球实现模拟地球旋转效果

1.Mars3d采用ellipsoid球实现模拟地球旋转效果 2.开始自选装之后&#xff0c;模型一直闪烁 http://mars3d.cn/editor-vue.html?idgraphic/entity/ellipsoid 3.相关代码&#xff1a; import * as mars3d from "mars3d"export let map // mars3d.Map三维地图对象 …

WPS Office AI实战:智能表格化身智能助理

前面我们已经拿 WPS AI 对Word文字、PPT幻灯片、PDF 做了开箱体验&#xff0c;还没有看过的小伙伴&#xff0c;请翻看以前的文章&#xff0c;本文开始对【智能表格】进行AI开箱测验。 表格在日常的数据处理中占绝对地位&#xff0c;但表格处理并不是每一个人都擅长&#xff0c;…

第2讲 KMD ISP子系统缩略词及目录结构

QCOM Camera子系统缩略词介绍 CPAS(Camera Peripherals and Support)CDM(Camera Data Mover)TFE(Thin Front End)IFE(Image Front End)OPE(Offline Processing Engine)BPS(Bayer Processing Segment)SFE(Sensor Front End)LRME(Low Resolution Motion Estimation)CSID(Camera …

taro3 微信小程序 createIntersectionObserver 监听无效

项目&#xff1a; taro3 vue3 官方文档 版本&#xff1a;3.x Taro.createIntersectionObserver(component, options) 创建并返回一个 IntersectionObserver 对象实例。在自定义组件或包含自定义组件的页面中&#xff0c;应使用 this.createIntersectionObserver([options]) …

常见面试题之垃圾收回

1. 简述Java垃圾回收机制&#xff1f;&#xff08;GC是什么&#xff1f;为什么要GC&#xff1f;&#xff09; 为了让程序员更专注于代码的实现&#xff0c;而不用过多的考虑内存释放的问题&#xff0c;所以&#xff0c;在Java语言中&#xff0c;有了自动的垃圾回收机制&#x…

性能测试工具 jmeter 录制脚本,传递 cookie,循环执行接口

目录 前言&#xff1a; 代理录制脚本 循环重复添加接口 登录并传递 cookie 给新建产品接口 循环执行脚本 前言&#xff1a; 在使用JMeter进行性能测试时&#xff0c;录制脚本是一种常用的方法。录制脚本可以帮助你捕获和重放用户与应用程序之间的交互&#xff0c;以模拟真…