RabbitMQ 是如何做延迟消息的 ?——Java全栈知识(15)

RabbitMQ 是如何做延迟消息的 ?

1、什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用 basic.rejectbasic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递
    如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
    死信交换机有什么作用呢?
  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因 TTL(有效期)到期的消息

2、死信队列

架构:
image.png
由于第一个队列没有消费者,所以可以在第一个队列中设置 TTL,当消息过期的时候,这个消息就变成了死信,被丢掉私信交换机中,以此实现延迟任务功能。

3、延迟消息

前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer作用类似。

而最后一种场景,大家设想一下这样的场景: 如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:
假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的有效期为5000毫秒: image.png注意:尽管这里的ttl.fanout不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct才能正确路由消息。
消息肯定会被投递到 ttl.queue 之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信: image.png 死信被再次投递到死信交换机 hmall.direct,并沿用之前的 RoutingKey,也就是 blueimage.png 由于 direct.queue1hmall.direct 绑定的 key 是 blue,因此最终消息被成功路由到 direct.queue1,如果此时有消费者与 direct.queue1 绑定,也就能成功消费消息了。但此时已经是5秒钟以后了: image.png 也就是说,publisher 发送了一条消息,但最终 consumer 在5秒后才收到消息。我们成功实现了延迟消息

[!info]
而且,RabbitMQ 中的这个 TTL 是可以设置任意时长的,这相比于 RocketMQ 只支持一些固定的时长而显得更加灵活一些。

死信队列消息堆积问题

[!danger] 死信队列消息堆积问题
但是,死信队列的实现方式存在一个问题,那就是可能造成队头阻塞。RabbitMQ 会定期扫描队列的头部检查队首的消息是否过期。如果队首消息过期了,它会被放到死信队列中。然而,RabbitMQ 不会逐个检查队列中的所有消息是否过期,而是仅检查队首消息。这样,如果队列的队头消息未过期,而它后面的消息已过期,这些后续消息将无法被单独移除,直到队头的消息被消费或过期。
因为队列是先进先出的,在普通队列中的消息,每次只会判断邢队头的消息是否过期,那么,如果队头的消息时间很长,一直都不过期,那么就会阻塞整个队列,这时候即使排在他后面的消息过期了,那么也会被一直阻塞。

基于 RabbitMQ 的死信队列,可以实现延迟消息,非常灵活的实现定时关单,并且借助 RabbitMQ 的集群扩展性,可以实现高可用,以及处理大并发量。他的缺点第一是可能存在消息阻塞的问题,还有就是方案比较复杂,不仅要依赖 RabbitMQ, 而目还需要声明很多队列出来,增加系统的复杂度

3、DelayExchange 插件

前面我们提到的基于死信队列的方式,是消息先会投递到一个正常队列,在 TTL 过期后进入死信队列。但是基于插件的这种方式,消息并不会立即进入队列,而是先把他们保存在一个基于 Erlang 开发的 Mnesia 数据库中,然后通过一个定时器去查询需要被投递的消息,再把他们投递到 x-delayed-message 交换机中。
基于 RabbitMQ 插件的方式可以实现延迟消息,并且不存在消息阻塞的问题,但是因为是基于插件的,而这个插件支持的最大延长时间是 (2^32)-1 毫秒,大约 49 天,超过这个时间就会被立即消费。

插件下载地址: GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
由于我们安装的 MQ 是 3.8 版本,因此这里下载 3.8.17 版本:
image.png|600px
附件:![[rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez]]

4.2.2. 安装

因为我们是基于 Docker 安装,所以需要先查看 RabbitMQ 的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:

[  {  "CreatedAt": "2024-06-19T09:22:59+08:00",  "Driver": "local",  "Labels": null,  "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",  "Name": "mq-plugins",  "Options": null,  "Scope": "local"  }  
]  

插件目录被挂载到了 /var/lib/docker/volumes/mq-plugins/_data 这个目录,我们上传插件到该目录下

注意上传插件

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果如下:
image.png

4.2.3. 声明延迟交换机

image.png

根据

1、创建交换机:
image.png
2、创建队列
image.png
3、根据 bandingKey 绑定队列:
image.png|500

基于注解方式:

@RabbitListener(bindings = @QueueBinding(  value = @Queue(name = "delay.queue", durable = "true"),  exchange = @Exchange(name = "delay.direct", delayed = "true"),  key = "delay"  
))  
public void listenDelayMessage(String msg){  log.info("接收到delay.queue的延迟消息:{}", msg);  
}

基于 @Bean 的方式:

package com.itheima.consumer.config;import lombok.extern.slf4j.Slf4j;  
import org.springframework.amqp.core.*;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;@Slf4j  
@Configuration  
public class DelayExchangeConfig {@Bean  public DirectExchange delayExchange(){  return ExchangeBuilder  .directExchange("delay.direct") // 指定交换机类型和名称  .delayed() // 设置delay的属性为true  .durable(true) // 持久化  .build();  }@Bean  public Queue delayedQueue(){  return new Queue("delay.queue");  }  @Bean  public Binding delayQueueBinding(){  return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");  }  
}  

4.2.4. 发送延迟消息

发送消息时,必须通过 x-delay 属性设定延迟时间:

@Test  
void testPublisherDelayMessage() {  // 1.创建消息  String message = "hello, delayed message";  // 2.发送消息,利用消息后置处理器添加消息头  rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {  @Override  public Message postProcessMessage(Message message) throws AmqpException {  // 添加延迟消息属性  message.getMessageProperties().setDelay(5000);  return message;  }  });  
}

warning 注意: 延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/671246.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++】 认识多态 + 多态的构成条件详细讲解

前言 C 目录 1. 多态的概念2 多态的定义及实现2 .1 虚函数:2 .2 虚函数的重写:2 .2.1 虚函数重写的两个例外: 2 .3 多态的两个条件(重点)2 .4 析构函数为啥写成虚函数 3 新增的两个关键字3.1 final的使用:3…

Java面试题:多线程2

如何停止正在运行的线程 1,使用退出标志,使线程正常退出(run方法中循环对退出标志进行判断) 2,使用stop()方法强行终止(不推荐) 3,调用interrupt()方法中断线程 打断阻塞线程(sleep,wait,join),线程会抛出InterruptedException异常 打断正常的线程,可以根据打断状态来标记…

LLVM的ThinLTO编译优化技术在Postgresql中的应用

部分内容引用:https://blog.llvm.org/2016/06/thinlto-scalable-and-incremental-lto.html LTO是什么? 链接时优化(Link-time optimization,简称LTO)是编译器在链接时对程序进行的一种优化。它适用于以文件为单位编译…

Llama3-Tutorial之XTuner微调Llama3个人小助手

Llama3-Tutorial之XTuner微调Llama3个人小助手 使用XTuner微调llama3模型。 参考: https://github.com/SmartFlowAI/Llama3-Tutorial 1. web demo部署 参考上一节内容已经完成web demo部署,进行对话测试, 当前回答基于llama3官方发布的模型进行推理生成&…

项目|保障房房产管理系统,政务房产解决方案

一、系统概况 保障房管理系统是是为了落实中央关于住房保障的相关政策,实现对低收入家庭住房状况的调查管理、保障计划及落实管理、保障申请及审核管理、保障户和保障房源档案管理等。 针对政府保障房产管理的一站式解决方案,专注于为解决复杂、繁琐的…

为何美国多IP服务器搭建蜘蛛池SEO更经济?

为何美国多IP服务器搭建蜘蛛池SEO更经济? 随着网络时代的不断演进,搜索引擎优化(SEO)已经成为企业和个人提升网站曝光度的必经之路。在这个过程中,蜘蛛池(Spider Pool)服务被广泛应用。但是有趣的是,美国多IP服务器搭建蜘蛛池SEO服务却相对…

【学习AI-相关路程-工具使用-自我学习-cudavisco-开发工具尝试-基础样例 (2)】

【学习AI-相关路程-工具使用-自我学习-cuda&visco-开发工具尝试-基础样例 (2)】 1、前言2、环境说明3、总结说明4、工具安装0、验证cuda1、软件下载2、插件安装 5、软件设置与编程练习1、创建目录2、编译软件进入目录&创建两个文件3、编写配置文…

代码随想录算法训练营第三天 | 链表理论基础,203.移除链表元素,707.设计链表,206.反转链表

对于链表完全陌生,但是看题目又觉得和数组一样的 链表理论基础 Q:什么是链表? A:链表是由一系列结点组成的。每一个结点由两部分组成:数据和指针。 203.移除链表元素 题目: 给你一个链表的头节点 head 和…

【电影】【指环王】【中土世界】影碟播放记录

一、写在前面 笔者于5月5日(昨天)在新加坡淘到了一套《指环王 The Lord of the Rings》DVD光碟,今天却听闻噩耗,Rohan国王Theoden的扮演者,英国演员Bernard Hill去世(享年79岁),发文…

从键入网址到网页显示,期间发生了什么?

从键入网址到网页显示,期间发生了什么? 孤单小弟【HTTP】真实地址查询【DNS】指南帮手【协议栈】可靠传输【TCP】远程定位【IP】两点传输【MAC】出口【网卡】送别者【交换机】出境大门【路由器】互相扒皮【服务器与客户端】相关问答 不少小伙伴在面试过程…

一竞技MSI:淘汰赛抽签结果出炉 BLG和T1同半区,TES首轮交手TL!

北京时间5月6日,MSI在今天进入短暂的休赛,在昨天结束的入围赛之后,PSG战队作为外卡赛区唯一的队伍进入到正赛,另外欧洲赛区的FNC战队也是击败GAM战队拿到正赛的资格。在比赛结束之后,也是进行了淘汰赛的胜败分组赛的抽…

GateWay检查接口耗时

添加gateway依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId> </dependency>创建一个LogTimeGateWayFilterFactory类&#xff0c;可以不是这个名字但是后面必须是x…