rabbit MQ的延迟队列处理模型示例(基于SpringBoot死信模式)

在这里插入图片描述

说明:
生产者P 往交换机X(type=direct)会发送两种消息:一、routingKey=XA的消息(消息存活周期10s),被队列QA队列绑定入列;一、routingKey=XB的消息(消息存活周期40s),被队列Q B队列绑定入列。QA、QB两个队列消息在失活(变成死信消息)以routingKey=YD发送到交换机Y(type=direct)。队列QD用routingKey绑定交换机Y消息入列。消费者监听处理QD的消息。
这个设计模型达到了消息从生产者到消费者延迟10s、40s不等的延迟队列处理。

这里用SpringBoot maven:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

在封装工具类中 其中【交换机】【队列】【绑定器】 可直接使用工具类,这里对案例图所用到组件器声明注解出来。
在这里插入图片描述

框内的组件和关系 可以在SpringBoot配置类中做出如下的组件声明与关系绑定:

package com.esint.configs;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** TTL延迟队列配置文件类**/
@Configuration
public class TtlQueueConfig {////普通交换机的名称 Xpublic static final String X_EXCHANGE = "X";//死信交换机名称 Ypublic static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列QA QBpublic static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";//死信队列名称QDpublic static final String DEAD_LETTER_QUEUE = "QD";////声明X_EXCHANGE@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明死信交换Y_DEAD_LETTER_EXCHANGE@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列 QA@Bean("queueA")public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKey (死信后充当了消费者的发送路由)arguments.put("x-dead-letter-routing-key","YD");//消息过期时间arguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明队列 QB@Bean("queueB")public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKey (死信后充当了消费者的发送路由)arguments.put("x-dead-letter-routing-key","YD");//消息过期时间arguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//声明死信队列QD@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//捆绑//绑定队列QA与交换机X_EXCHANGE@Beanpublic Binding queueABingXExchange(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定队列QB与交换机X_EXCHANGE@Beanpublic Binding queueBBingXExchange(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定队列QD与交换机Y_Exchange@Beanpublic Binding queueDBingYExchange(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange")DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}
生产者与交换机X:这里方便测试 我们把生产者放在一个Controller逻辑里
package com.esint.controller;//发送延迟消息import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMesController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/senMsg/{message}")public void sendMes(@PathVariable String message){log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);}
}
消费者与死信队列创建一个监听者示例:
package com.esint.consumer;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 队列TTL消费者*/@Slf4j
@Component
public class DeadLetterQueueConsumer {//接受消息@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws Exception{String msg = new String(message.getBody());log.info("当前时间:{},收到私信队列的消息:{}",new Date().toString(),msg);}
}

rabbitmq的配置文件:

spring:rabbitmq:host: *.*.*.*port: 5672username: guestpassword: guest
接下来可以启动SpringBoot: 启动后,配置方法类会把交换机/队列/绑定器初始化配置

队列:
在这里插入图片描述

交换机:
在这里插入图片描述
点开详细后,也能考到他们之间的绑定关系:

在这里插入图片描述

在这里插入图片描述

消息发布测试:

生产者发送消息:

浏览器:
http://127.0.0.1:19092/ttl/senMsg/nice

通过生产者发送:nice

当前时间:Tue Nov 21 14:50:05 CST 2023,发送一条消息给两个TTL队列:nice

消费者在10s后和40秒分别收到了消息:
在这里插入图片描述


拓展:是不是有一种可能,如果再队列中不设置过期时间,在生产者发送消息时设置过期时间 来实现过期时间自由设定,而延迟自由?

结论是不能:
rabbitMQ队列只会检查第一个消息是否过期。举例如果第一个消息的ttl为30s,第二个消息ttl为3s。第二个消息不会再3s后到达,而是会在第一个过期后,再第二个到达。

示例验证:

增加一个无过期时间约束的队列,以routing-key为XC绑定X交换机,过期后以routing-key为YD绑定Y交换机。
过期时间放生产者发送时设定。

在的rabbitMQ配置类中增加QC 绑定前(X routing-key=XC)后(Y routing-key=YD)交换机:

    // 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时public static final  String QUEUE_C ="QC";//声明队列 QC 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时@Bean("queueC")public Queue queueC(){Map<String,Object> arguments = new HashMap<>(2);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置routing-keyarguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();}
//绑定队列QC与交换机X_EXCHANGE   优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时@Beanpublic Binding queueCBindXExchange(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange")DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}

生产者:

@GetMapping("/sendttl/{message}/{ttlTime}")public void sendMes(@PathVariable String message,@PathVariable String ttlTime){
/*** 死信队列做延迟时的缺陷:* rabbitMQ只会检查第一个消息是否过期带来的问题就是,如果第一个消息的ttl为30s,第二个消息ttl为3s。第二个消息不会再3s后到达,而是会在第一个过期后,再第二个到达。*/log.info("当前时间:{},发送一条ttl为{}ms的消息给QC队列:{}",new Date().toString(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC",message,mes->{mes.getMessageProperties().setExpiration(ttlTime);return mes;});}

消费者不变,启动服务!

生产者发送消息:第一条 3000ms
http://127.0.0.1:19092/ttl/sendttl/第一条30000ms消息/30000
http://127.0.0.1:19092/ttl/sendttl/第二条3000ms消息/3000

在这里插入图片描述
结论:第二条虽然早早过期,它依然需要等待第一条过期后,才能排到他。rabbitMQ的队列过期检查机制。

总结:
阻塞层在队列。
只能满足固定延迟时段的消息,如果延迟时间不一致,及时后来消息的延迟短,也会等待它的上一条出去后才能被检测到是否到期才被消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/211856.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uboot中nfs和tftp方式获取文件

NFS文件系统挂载 服务器端配置如下 1.Server端需要安装NFS服务&#xff1a; sudo apt-get install nfs-kernel-server2.创建需要挂载的路径&#xff1a; mkdir -p /home/workspace/mercury/nfs_path3.创建共享目录&#xff1a; ①vim /etc/exports ②在文件中添加&#xff…

Vue3 封装组件库并发布到npm仓库

一、创建 Vue3 TS Vite 项目 输入项目名称&#xff0c;并依次选择需要安装的依赖项 npm create vuelatest 项目目录结构截图如下&#xff1a; 二、编写组件代码、配置项和本地打包测试组件 在项目根目录新建 package 文件夹用于存放组件 &#xff08;以customVideo为例&a…

在 CentOS 7 上安装 MySQL 8

在 CentOS 7 上安装 MySQL 8 步骤 1: 添加 MySQL Yum 存储库 首先&#xff0c;我们需要添加 MySQL Yum 存储库。打开终端并执行以下命令&#xff1a; sudo yum install -y https://repo.mysql.com/mysql80-community-release-el7-3.noarch.rpm步骤 2: 导入 MySQL GPG 公钥 …

Redis报错:JedisConnectionException: Could not get a resource from the pool

1、问题描述&#xff1a; redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool 2、简要分析&#xff1a; redis.clients.util.Pool.getResource会从JedisPool实例池中返回一个可用的redis连接。分析源码可知JedisPool 继承了 r…

爱奇艺的再进化:好内容强化溢出效应,会员长期价值凸显

作为长视频行业率先探索业绩良性成长路径的玩家&#xff0c;爱奇艺每一次的业绩质量&#xff0c;始终是观摩行业发展状态不可或缺的参照物。继历史最佳二季度之后&#xff0c;11月21日晚间&#xff0c;爱奇艺发布2023年第三季度业绩&#xff0c;再度强势打出原创内容高质量发展…

Arm64版本的centos编译muduo库遇到的问题的归纳

环境&#xff1a;Mac m2 pro下的VMware虚拟机中Arm64 centos ./build.sh 执行后提示如下 cmake -DCMAKE_BUILD_TYPErelease -DCMAKE_INSTALL_PREFIX…/release-install-cpp11 -DCMAKE_EXPORT_COMPILE_COMMANDSON /root/package/muduo-master – Boost version: 1.69.0 – Co…

机器学习/sklearn笔记:MeanShift

1 算法介绍 一种基于质心的算法通过更新候选质心使其成为给定区域内点的均值候选质心的位置是通过一种称为“爬山”技术迭代调整的&#xff0c;该技术找到估计的概率密度的局部最大值 1.1 基本形式 给定d维空间的n个数据点集X&#xff0c;那么对于空间中的任意点x的均值漂移…

稻谷飘香金融助力——建行江门市分行助力乡村振兴

7月的台山&#xff0c;稻谷飘香。在大耕户李胜业的农田里&#xff0c;金灿灿的稻谷翻起层层稻浪&#xff0c;收割机在稻浪里来回穿梭&#xff0c;割稻、脱粒、装车等工序一气呵成。空气中弥漫着丰收的喜悦。 夏粮迎丰收的背后&#xff0c;是中国建设银行江门市分行&#xff08…

卷积神经网络(AlexNet)鸟类识别

文章目录 一、前言二、前期工作1. 设置GPU&#xff08;如果使用的是CPU可以忽略这步&#xff09;2. 导入数据3. 查看数据 二、数据预处理1. 加载数据2. 可视化数据3. 再次检查数据4. 配置数据集 三、AlexNet (8层&#xff09;介绍四、构建AlexNet (8层&#xff09;网络模型五、…

Vue3-provide和inject

作用和场景&#xff1a;顶层组件向任意的底层组件传递数据和方法&#xff0c;实现跨层组件通信 跨层传递普通数据&#xff1a; 1.顶层组件通过provide函数提供数据 2.底层组件通过inject函数获取数据 既可以传递普通数据&#xff0c;也可以使用ref传递响应式数据&#xff08…

创新洞察|展望2030 – 企业数字化转型的10大趋势(阿里研究院)

企业是否一定要 数字化创新 转型&#xff1f;究竟如何数字化转型&#xff1f;难点和坑又是什么&#xff1f;阿里研究院副院长针对未来十年中国的数字化转型提出十个方面需要关注的趋势&#xff1a;1.大国优势 2. 重构的消费者决策体系 3. 下一代数字原生企业 4. 所有企业都会成…

消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!

前言 本章我们来一次快速入门RabbitMQ——生产者与消费者。需要构建一个生产端与消费端的模型。什么意思呢&#xff1f;我们的生产者发送一条消息&#xff0c;投递到RabbitMQ集群也就是Broker。 我们的消费端进行监听RabbitMQ&#xff0c;当发现队列中有消息后&#xff0c;就进…