RabbitMQ之延迟消息

文章目录

  • 前言
  • 一、死信交换机
  • 二、延迟消息
    • 死信交换机实现延迟消息
      • 图解流程
    • DelayExchange插件实现延迟消息
      • 安装插件
      • 声明延迟交换机
      • 发送延迟消息
  • 总结


前言

死信交换机、延迟消息


一、死信交换机

  • 当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

    • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
    • 消息是一个过期消息,超时无人消费
    • 要投递的队列消息满了,无法投递
  • 如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

  • 死信交换机有什么作用呢?

    • 收集那些因处理失败而被拒绝的消息
    • 收集那些因队列满了而被拒绝的消息
    • 收集因TTL(有效期)到期的消息

二、延迟消息

上面讲诉的两种作用可以用来实现消费者重试的处理,即将处理失败、溢出的消息放在特定队列由人工处理,与消费者重试时讲的RepublishMessageRecoverer作用类似(在RabbitMQ之消费者的可靠性里讲过RepublishMessageRecoverer):

  • 收集那些因处理失败而被拒绝的消息
  • 收集那些因队列满了而被拒绝的消息

收集因TTL(有效期)到期的消息这个作用可以用来实现延迟消息

死信交换机实现延迟消息

  • 声明一个Fanout交换机ttl.fanout、一个队列ttl.queue、一个Direct交换机dragon.direct、一个队列directt.queue。发送的消息设置有效期RoutingKey是blue。
  • ttl.queue通过dead-letter-exchange属性绑定dragon.direct交换机,使dragon.direct成为死信交换机,这样ttl.queue队列中过期的消息成为死信就会自动到达dragon.direct中。
  • direct.queue队列通过RoutingKey与死信交换机dragon.direct绑定,且RoutingKey为blue,这样,过期的消息先到达死信交换机,因死信交换机与direct.queue通过RoutingKey绑定,过期的消息通过RoutingKey由死信交换机路由到direct.queue队列。
  • 此时若有消费者消费direct.queue队列,就实现了延迟消费,具体的延时时间就是设置的有效期时间。

图解流程

在这里插入图片描述

注意:

  • RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
  • 当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

DelayExchange插件实现延迟消息

安装插件

插件下载地址

在这里插入图片描述

将下载的文件放到了/mnt目录下,然后输入sudo docker ps命令查看自己的rabbitmq是否正在运行,如果不在运行则输入sudo docker start id这里填你自己的容器id,如果不知道自己id的,输入sudo docker pa -a查看。

在这里插入图片描述

当容器运行起来后,输入sudo docker cp /mnt/rabbitmq_delayed_message_exchange-3.12.0.ez rabbit:/plugins命令,将刚插件拷贝到容器内plugins目录下。

在这里插入图片描述

拷贝完成后,输入sudo docker exec -it rabbit /bin/bash命令,进入容器。
进入plugins文件夹

在这里插入图片描述

在容器内plugins目录下,查看插件是否上传成功ls -l|grep delay

在这里插入图片描述

然后启动插件,在当前目录下输入rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令

在这里插入图片描述

到这里插件安装就完成了,接下来我们需要重启RabbitMQ容器。执行exit命令退出RabbitMQ容器内部,然后执行docker restart 容器名命令重启RabbitMQ容器

声明延迟交换机

两种方式,自行选择
基于注解:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于bean:

package com.itheima.consumer.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}

发送延迟消息

@Test
void testPublisherDelayMessage() {// 1.创建消息String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}});
}

注意:
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息。


总结

以上就是延迟消息的详细讲解了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/225639.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CentOS 7 部署 MariaDB 的 2 种方法

有两种安装 MariaDB 服务器的方法。您可以安装 CentOS 7 存储库中可用的默认版本,也可以通过手动添加 MariaDB 存储库来安装最新版本。 如果安装过MariaDB或MySQL,使用以下命令彻底删除它们: yum remove mariadb* yum remove mysql* 方法一: 使用 Yum…

Python assert断言函数及用法与while循环详解

Python assert断言函数及用法 断言语句和 if 分支有点类似,它用于对一个 bool 表达式进行断言,如果该 bool 表达式为 True,该程序可以继续向下执行;否则程序会引发 AssertionError 错误。 例如如下程序: s_age inpu…

亚马逊,shein,temu如何避免爆品评分低被强制下架

近期,一些Temu卖家反映产品下架问题,无论是日出千单的爆品还是其他商品,都有可能面临下架的风险。这其中最主要的原因之一是产品质量问题,导致消费者差评较多,评分降至4.2分或4.0分以下时,平台可能会强制下…

EDA实验-----正弦信号发生器的设计(Quartus II )

目录 一、实验目的 二、实验仪器 三、实验原理 四、实验内容 五、实验步骤 六、注意事项 七、实验过程(操作过程) 1.定制LPM_ROM模块 2.定制LPM_ROM元件 3.计数器定制 4.创建锁相环 5.作出电路图 6.顶层设计仿真 一、实验目的 学习使用Ver…

Matlab R2022b 安装成功小记

Matlab R2022b 安装成功小记 前言一、 下载链接二、 安装过程小记 叮嘟!这里是小啊呜的学习课程资料整理。好记性不如烂笔头,今天也是努力进步的一天。一起加油进阶吧! 前言 windows 10系统之前安装过Matlab R2010b做基础研究,最…

每日一练 | 华为认证真题练习Day138

1、IPv6地址FE80::2EO:FCFF:FE6F:4F36属于哪一类? A. 组播地址 B. 任播地址 C. 链路本地地址 D. 全球单播地址 2、如果IPv6的主机希望发出的报文最多经过10台路由器转发,则应该修改IPv6报文头中的哪个参数? A. Next Header B. Version …

每日一题(LeetCode)----链表--链表中的下一个更大节点

每日一题(LeetCode)----链表–链表中的下一个更大节点 1.题目(1019. 链表中的下一个更大节点) 给定一个长度为 n 的链表 head 对于列表中的每个节点,查找下一个 更大节点 的值。也就是说,对于每个节点,找到它旁边的第…

一文带你读懂骨传导耳机危害性都有哪些!以及如何选择骨传导耳机!

如果说正常的使用骨传导耳机,是不会有危害的。 那么如何正确的使用骨传导耳机呢? 1、音量不要太大 骨传导耳机是通过震动人体骨骼来传递声音的,而在传递过程中,会出现漏音情况,而漏出的声音,便会通过耳道…

服务器中启动和停止项目

服务器中启动和停止项目 一、前言二、使用命令启动和关闭项目1、启动项目2、停止项目 三、使用可执行脚本启动和关闭项目1、启动项目2、停止项目 一、前言 在服务器上部署项目,一般就是将项目挂在后台,如果是微服务首选docker-compose,但如果…

Selenium 学习(0.15)——软件测试之测试用例设计方法——场景法

1、场景法的基本概念 场景法是黑盒测试中一种重要的测试用例设计方法。它通过场景描述业务流程,包括基本流和备选流设计测试用例遍历软件系统功能,从而验证其正确性。 通过运用场景对系统的功能点或业务流程进行描述,从而提…

Spring Security 6.x 系列(6)—— 显式设置和修改登录态信息

一、前言 此篇是对上篇 Spring Security 6.x 系列(5)—— Servlet 认证体系结构介绍 中4.9章节显式调用SecurityContextRepository#saveContext进行详解分析。 二、设置和修改登录态 2.1 登录态存储形式 使用Spring Security框架,认证成功…