Spring RabbitMQ那些事(2-两种方式实现延时消息订阅)

目录

  • 一、序言
  • 二、死信交换机和消息TTL实现延迟消息
    • 1、死信队列介绍
    • 2、代码示例
      • (1) 死信交换机配置
      • (2) 消息生产者
      • (3) 消息消费者
    • 3、测试用例
  • 三、延迟消息交换机实现延迟消息
    • 1、安装延时消息插件
    • 2、代码示例
      • (1) 延时消息交换机配置
      • (2) 消息生产者
      • (3) 消息消费者
    • 3、测试用例
  • 四、两种实现方式优缺点
    • 1、延时消息插件
    • 2、TLL&死信交换机

一、序言

业务开发中有很多延时操作的场景,比如最常见的超时订单自动关闭延时异步处理,我们常用的实现方式有:

  • 定时任务轮询(有延时)。
  • 借助Redission的延时队列
  • Redis的key过期事件通知机制(需开启key过期事件通知,对Redis有性能损耗)。
  • RocketMQ中定时消息推送(支持的时间间隔固定,不支持自定义)。
  • RabbitMQ中的死信队列和延迟消息交换机

其中用的最多的也是借助Redisson实现的数据结构延迟队列RabbitMQ中的死信队列来实现,今天我们通过RabbitMQ死信队列和延迟消息交换机(新特性)来实现延时消息推送。


二、死信交换机和消息TTL实现延迟消息

1、死信队列介绍

这种方式主要通过结合消息过期和私信交换机来实现延迟消息推送,首先先了解下哪些消息会进入死信队列:

  • 被消费者nack(negatively acknowleged)的消息。
  • TTL过期后未被消费的消息。
  • 超过队列长度限制后被丢弃的消息。

备注:更多信息请参考RabbitMQ中的 Dead Letter Exchange。

2、代码示例

(1) 死信交换机配置

@Configuration
protected static class DeadLetterExchangeConfig {@Beanpublic Queue deadLetterQueue(){return QueueBuilder.durable("dead-letter-queue").build();}@Beanpublic DirectExchange deadLetterExchange() {return ExchangeBuilder.directExchange("dead-letter-exchange").build();}@Beanpublic Binding bindQueueToDeadLetterExchange(Queue deadLetterQueue, DirectExchange deadLetterExchange) {return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");}@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal-queue").deadLetterExchange("dead-letter-exchange").deadLetterRoutingKey("dead-letter-routing-key").build();}}

(2) 消息生产者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;public void sendMsgToDeadLetterExchange(String body, int timeoutInMillSeconds) {log.info("开始发送消息到dead letter exchange 消息体:{}, 消息延迟:{}ms, 当前时间:{}", body, timeoutInMillSeconds, LocalDateTime.now());MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setExpiration(String.valueOf(timeoutInMillSeconds)).build();Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();rabbitTemplate.send("normal-queue", message);}}

(3) 消息消费者

@Slf4j
@Component
public class RabbitMqConsumer {@RabbitListener(queues = "dead-letter-queue")public void handleMsgFromDeadLetterQueue(String msg) {log.info("Message received from dead-letter-queue, message body: {}, current time:{}", msg, LocalDateTime.now());}
}

3、测试用例

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {private final RabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/dead-letter")public ResponseEntity<String> sendMsgToDeadLetterExchange(String body, int timeout) {rabbitMqProducer.sendMsgToDeadLetterExchange(body, timeout);return ResponseEntity.ok("消息发送到死信交换机成功");}}

浏览器访问http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000,可以看到消息被延迟5s处理。

2023-11-26 11:50:33.041  INFO 19152 --- [nio-8080-exec-7] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到dead letter exchange 消息体:hello, 消息延迟:5000ms, 当前时间:2023-11-26T11:50:33.041
2023-11-26 11:50:38.054  INFO 19152 --- [ntContainer#4-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from dead-letter-queue, message body: hello, current time:2023-11-26T11:50:38.054

三、延迟消息交换机实现延迟消息

上面通过消息TTL和死信交换机实现延迟消息的解决方案是由一个叫James Carr的人提出来的,后来RabbitMQ提供了一个开箱即用的解决方案,通过延时消息插件来实现。

该插件以前被当做是试验性产品,但是现在已经可以投产使用了。(PS:2015年4月16号就已经有该插件文档)

Spring AMQP中,同样提供了对该延时消息插件的支持,并且在RabbitMQ 3.6.0版本就已经测试通过。

1、安装延时消息插件

该延时消息插件为社区插件,因此需要自己手动下载安装的RabbMQ版本对应的插件,下载地址:RabbitMQ延时消息插件releases。

我安装的RabbitMQ版本为3.9.9,3.9.0版本的插件对所有3.9.x版本的RabbitMQ都支持。

在这里插入图片描述
下载完后把.ez结尾的插件复制RabbitMQ的插件目录下,插件目录为/usr/lib/rabbitmq/plugins

在这里插入图片描述
通过命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange安装该插件,通过命令rabbitmq-plugins list查看插件列表,可以看到该延时消息插件已经成功安装。

在这里插入图片描述

2、代码示例

(1) 延时消息交换机配置

@Configuration
protected static class DelayedMsgExchangePluginConfig {@Beanpublic Queue delayedQueue() {return QueueBuilder.durable("delayed-queue").build();}@Beanpublic DirectExchange delayedExchange() {return ExchangeBuilder.directExchange("delayed-exchange").delayed().build();}@Beanpublic Binding bindDelayedQueueToDelayedChange(Queue delayedQueue, DirectExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed-routing-key");}
}

备注:延时交换机的类型可以为DirectExchage、TopicExcahgeFanoutExchange,这些都支持。

(2) 消息生产者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;public void sendDelayedMsg(String body, int timeoutInMillSeconds) {log.info("开始发送消息到delayed-exchange 消息体:{}, 消息延迟:{}ms, 当前时间:{}", body, timeoutInMillSeconds, LocalDateTime.now());MessageProperties messageProperties = new MessageProperties();messageProperties.setDelay(timeoutInMillSeconds);Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();rabbitTemplate.send("delayed-exchange", "delayed-routing-key", message);}
}

(3) 消息消费者

@Slf4j
@Component
public class RabbitMqConsumer {@RabbitListener(queues = "delayed-queue")public void handleMsgFromDelayedQueue(String msg) {log.info("Message received from delayed-queue, message body: {}, current time:{}", msg, LocalDateTime.now());}
}

3、测试用例

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {private final RabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/delayed")public ResponseEntity<String> sendMsgToHeadersExchange(String body, int timeout) {rabbitMqProducer.sendDelayedMsg(body, timeout);return ResponseEntity.ok("消息发送到延迟交换机成功");}}

浏览器访问http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000,可以看到消息被延迟5s处理。

2023-11-26 13:02:07.816  INFO 26524 --- [nio-8080-exec-3] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到delayed-exchange 消息体:Hello, 消息延迟:5000ms, 当前时间:2023-11-26T13:02:07.816
2023-11-26 13:02:12.830  INFO 26524 --- [ntContainer#5-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from delayed-queue, message body: Hello, current time:2023-11-26T13:02:12.829

四、两种实现方式优缺点

1、延时消息插件

  • 优点:配置更加简单,少配置1个过期消息接收队列,且语义更明确,容易定位消息出入口。
  • 缺点:延时消息插件对RabbitMQ版本有要求,只有RabbitMQ 3.8.x及以上版本支持。

2、TLL&死信交换机

  • 优点:基本适用于所有RabbitMQ版本。
  • 缺点:配置相对来说复杂一些,还有就是我们最开始提到的,不只是TTL过期的消息才会进入死信队列,还有超出队列限制nack的消息也会进入死信队列,触发的条件没那么纯粹。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/235275.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++ Primer Plus学习记录】嵌套循环和二维数组

for循环是一种处理数组的工具。下面进一步讨论如何使用嵌套for循环中来处理二维数组。 C没有提供二维数组类型&#xff0c;但是用户可以创建每个元素本身都是数组的数组。例如&#xff0c;假设要存储5个城市在4年间的最高温度&#xff0c;可以这样声明数组&#xff1a; int m…

Intellij IDEA 的安装和使用以及配置

IDE有很多种&#xff0c;常见的Eclipse、MyEclipse、Intellij IDEA、JBuilder、NetBeans等。但是这些IDE中目前比较火的是Intellij IDEA&#xff08;以下简称IDEA&#xff09;&#xff0c;被众多Java程序员视为最好用的Java集成开发环境&#xff0c;今天的主题就是IDEA为开发工…

数据库基础教程之数据库的创建(二)

双击打开Navicat,点击:文件-》新建连接-》PostgreSQL 在下图新建连接中输入各参数,然后点击:连接测试,连接成功后再点击确定。 创建数据表   3.1 方法1   3.1.1.双击你的数据库-》双击public-》双击选中表-》右键-》新建表-》常规 3.1.2.设置字段信息   双击选中创建…

Python之Appium 2自动化测试(Android篇)

一、环境搭建及准备工作 1、Appium 2 环境搭建 请参考另一篇文章: Windows系统搭建Appium 2 和 Appium Inspector 环境 2、安装 Appium-Python-Client&#xff0c;版本要求3.0及以上 pip install Appium-Python-ClientVersion: 3.1.03、手机连接电脑&#xff0c;并在dos窗口…

2021年8月16日 Go生态洞察:Go 1.17版本的发布及其影响

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

园区智能配电系统(电力智能监控系统)

园区智能配电系统是一种针对园区电力配送和管理的智能化系统。它的主要功能是实时监控设备运行情况&#xff0c;进行电能质量分析&#xff0c;监控电能损耗&#xff0c;以及分时段用电统计等。 具体来说&#xff0c;园区智能配电系统可以利用现代技术如RS-485总线通信、数据库管…

XXL-Job详解(一):组件架构

目录 XXL-Job特性系统组成架构图调度模块剖析任务 “运行模式” 剖析执行器 XXL-Job XXL-JOB是一个分布式任务调度平台&#xff0c;其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线&#xff0c;开箱即用。 特性 1、简单&#…

JAVEE初阶 多线程基础(四)

线程安全 一.线程安全存在的问题二.锁三.关于锁的理解四.关于锁操作混淆的理解4.1两个线程是否对同一对象加锁 一.线程安全存在的问题 为什么这里的count不是一百万呢?这就是线程所存在的不安全的问题,由于线程是抢占式执行,同时执行count,操作本质是三个指令 1.load 读取内存…

开关电源工作时,如何抑制纹波和减小高频噪声?

开关电源的纹波和噪声是一个本质问题&#xff0c;换而言之无论纹波和噪声多么小&#xff0c;也无法从根本上去除&#xff0c;再绝对的讲开关电源无论成本怎么提高&#xff0c;也无法完全达到线性电源的性能和特点。那么&#xff0c;通常抑制或减少它的做法有五种&#xff1a; …

【驱动】串口驱动分析(二)-tty core

前言 tty这个名称源于电传打字节的简称&#xff0c;在linux表示各种终端&#xff0c;终端通常都跟硬件相对应。比如对应于输入设备键盘鼠标&#xff0c;输出设备显示器的控制终端和串口终端。也有对应于不存在设备的pty驱动。在如此众多的终端模型之中&#xff0c;linux是怎么…

如何创建曼达洛人风格的照片效果

如何把一个普通的头盔变成一个以曼达洛人为灵感的头盔&#xff1b;如何使用一个场景创建戏剧性的天空效果 1. 如何在 Photoshop 中创建戏剧性的天空 步骤 1 我们将从拼凑我们的天空开始&#xff0c;专注于创造日落和繁星点点的夜空的完美融合。我将使用这张照片作为基地。 步…

自定义链 SNAT / DNAT 实验举例

参考原理图 实验前的环境搭建 1. 准备三台虚拟机&#xff0c;定义为内网&#xff0c;外网以及网卡服务器 2. 给网卡服务器添加网卡 3. 将三台虚拟机的防火墙和安全终端全部关掉 systemctl stop firewalld && setenforce 0 4. 给内网虚拟机和外网虚拟机 yum安装 httpd…