延迟队列的理解与使用

目录

一、场景引入

二、延迟队列的三种场景

 1、死信队列+TTL对队列进行延迟

 2、创建通用延时消息+死信队列 对消息延迟

 3、使用rabbitmq的延时队列插件

x-delayed-message使用

父pom文件

pom文件

配置文件

config

生产者

消费者

结果


一、场景引入

我们知道可以通过TTL来对队列进行设置过期时间;通过后置处理器MessagePostProcessor对消息进行设置过期时间;

 那么根据TTL及MessagePostProcessor机制可以处理关于延迟方面的问题。

比如:秒杀之后,给30分钟时间进行支付,如果30分钟后,没有支付,订单取消。

比如:餐厅订座,A用户早上8点预定的某家餐厅下午6点的座位,B用户中午12点预定的下午5点的座位;根据场景我们需要的时先让B用户进行消费,然后A用户再消费;这时TTL和MessagePostProcessor延迟就已经不能满足订餐的场景了;

因为TTL是对队列进行延迟,MessagePostProcessor是对消息进行延迟,但是MessagePostProcessor对消息延迟是不能根据订座的时间去排序消费的;

/*** 比如当我们发送第一个消息时延迟的时间时50s,而发送的第二个消息延迟时间是30s,虽然延迟30s的消息比延迟50s发送的晚* 但按照我们设想的情况,延迟30s的消息应该先消费;可是实际情况却不是这样,而是延迟50s的消息到达时间后 30s的才能消费!(队列先进先出)* 那这样此方式的不足就出现了!* 场景:*      A用户和B用户预定餐厅,A用户先开始预定的,预定的是下午6点。B用户比A用户预定操作晚一些,但是B用户预定的时间是下午5点。通过此场景*      我们希望的是B用户先进行用餐(因为他预定的吃饭时间比A早一些,需要先安排吃饭。不能说A用户没到6点B用户预定5点的吃不了。),根据此*      场景 之前的队列延迟还是消息延迟都不能满足场景需求了,这样就需要另一种延迟方式进行解决了! ->使用rabbitmq的延时队列插件*//*** 注意:*      TTL是对队列进行延迟,只要是在此队列中的消息都会按照TTL设置时间进行延迟;*      MessagePostProcessor是对消息进行延迟;**      如果我们不仅使用了消息延迟,而且还使用了队列延迟,那么延迟的时间就会以小的时间为准!*   理解:*      如果a消息设置的消息延迟是30s,b消息设置的延迟时间是90s,队列设置的延迟是60s。那么a消息最终的延迟是30s(a的消息延迟与队列延迟*      比对以延迟时间小的为准!),b消息最终延迟的时间是60s(b的消息延迟与队列延迟比对以延迟的时间小的为准!)*/

 二、延迟队列的三种场景

1、死信队列+TTL对队列进行延迟

给队列设置TTL过期时间,此队列可不用绑定消费者,时间到后把消息投递到死信队列中,由死信队列的消费者进行消费,这样就能达到延迟消费的作用

   @Beanpublic Queue directQueueLong(){return   QueueBuilder.durable("业务队列名称").deadLetterExchange("死信交换机名称").deadLetterRoutingKey("死信队列 RoutingKey").ttl(20000) // 消息停留时间//.maxLength(500).build();}

监听死信队列,即可处理超时的消息队列

缺点:

上述实现方式中,ttl延时队列中所有的消息超时时间都是一样的,如果不同消息想设置不一样的超时时间,就需要建立多个不同超时时间的消息队列,比较麻烦,且不利于维护。

 2、创建通用延时消息+死信队列 对消息延迟

这样的方式可对每一个消息设置指定的过期时间,不用像TTL那样给队列设置过期时间(若队列设置的过期时间达到后,其队列中的消息均会被删除或别的处理),但是由于队列是先进先出,若先投递的消息设置的过期时间是40s,后投递的消息过期时间是30s,那么设置过期时间为30s的并不会到30s时就投递到死信队列中,而是等40s到期后才会一起被投递。

rabbitTemplate.convertAndSend("交换机名称", "RoutingKey","对象",
message => {message.getMessageProperties().setExpiration(String.valueOf(5000))// 设置消息的持久化属性//这样发送的消息就会被持久化到 RabbitMQ 中,即使 RabbitMQ 重启,消息也不会丢失。                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;});

缺点:

该种方式可以创建一个承载不同超时时间消息的消息队列,但是这种方式有一个问题,如果消息队列中排在前面的消息没有到超时时间,即使后面的消息到了超时时间,先到超时时间的消息也不会进入死信队列,而是先检查排在最前面的消息队列是否到了超时时间,如果到了超时时间才会继续检查后面的消息。

 3、使用rabbitmq的延时队列插件

使用rabbitmq的延时队列插件,实现同一个队列中有多个不同超时时间的消息,并按时间超时顺序出队

1、下载延迟插件

在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。

下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

2、安装插件并启用

下载完成后直接把插件放在 /home/rabbitmq 目录,然后拷贝到容器内plugins目录下(my-rabbit是容器的name,也可以使用容器id)

docker cp /home/rabbitmq/rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/plugins

 进入 Docker 容器

docker exec -it rabbit /bin/bash

 在plugins内启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 退出容器

exit

重启 RabbitMQ

docker restart my-rabbit

 貌似不重启也能生效!

 结果:

 就多了一个x-delayed-message交换机

x-delayed-message使用

父pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.1</version>
<!--        <version>2.2.5.RELEASE</version>--><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.chensir</groupId><artifactId>spring-boot-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-boot-rabbitmq</name><properties><java.version>8</java.version><hutool.version>5.8.3</hutool.version><lombok.version>1.18.24</lombok.version></properties><description>spring-boot-rabbitmq</description><packaging>pom</packaging><modules><module>direct-exchange</module><module>fanout-exchange</module><module>topic-exchange</module><module>game-exchange</module><module>dead-letter-queue</module><module>delay-queue</module><module>delay-queue2</module></modules><dependencyManagement><dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies></dependencyManagement></project>

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.chensir</groupId><artifactId>spring-boot-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><relativePath>../pom.xml </relativePath></parent><artifactId>delay-queue2</artifactId><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

配置文件

logging.level.com.chensir = debug
server.port=8086
#host
spring.rabbitmq.host=121.40.100.66
#默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#连接到代理时用的虚拟主机
spring.rabbitmq.virtual-host=/
#每个消费者每次可最大处理的nack消息数量
spring.rabbitmq.listener.simple.prefetch=1
#表示消息确认方式,其有三种配置方式,分别是none、manual(手动)和auto(自动);默认auto
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#监听重试是否可用
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
#最大重试时间间隔
spring.rabbitmq.listener.simple.retry.max-interval=3000ms
#第一次和第二次尝试传递消息的时间间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
#应用于上一重试间隔的乘数
spring.rabbitmq.listener.simple.retry.multiplier=2
#决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.default-requeue-rejected=false

config

package com.chensir.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {@Beanpublic MessageConverter messageConverter(){return  new Jackson2JsonMessageConverter();}@Beanpublic CustomExchange customExchange(){Map<String,Object> args = new HashMap<>();//延迟时以direct直连方式绑定args.put("x-delayed-type","direct");// name:交换机名称 type:类型 固定值x-delayed-messagereturn new CustomExchange("chen-x-delayedExchange","x-delayed-message",true,false,args);}@Beanpublic Queue directQueueLong(){return   QueueBuilder.durable("chen-DirectQueue").build();}@Beanpublic Binding binding(){return  BindingBuilder.bind(directQueueLong()).to(customExchange()).with("direct123").noargs();}
}

生产者

package com.chensir.provider;import com.chensir.model.OrderIngOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
@Slf4j
public class DirectProvider {@Resourceprivate RabbitTemplate rabbitTemplate;public  void  send(){OrderIngOk orderIngOk = new OrderIngOk();orderIngOk.setOrderNo("202207149687-1");orderIngOk.setId(1);orderIngOk.setUserName("倪海杉前-延迟40秒");rabbitTemplate.convertAndSend("chen-x-delayedExchange", "direct123",orderIngOk,m->{m.getMessageProperties().setDelay(40*1000); //设置延迟时间,对延迟交换机有效// m.getMessageProperties().setExpiration(String.valueOf(30*1000)); 设置过期时间,对队列有效return m;});OrderIngOk orderIngOk2 = new OrderIngOk();orderIngOk2.setOrderNo("202207149687-2");orderIngOk2.setId(2);orderIngOk2.setUserName("倪海杉后-延迟30秒");rabbitTemplate.convertAndSend("chen-x-delayedExchange", "direct123",orderIngOk2,m->{m.getMessageProperties().setDelay(30*1000);return m;});log.debug("消息生产完成");}}

消费者

package com.chensir.consumer;import cn.hutool.json.JSONUtil;
import com.chensir.model.OrderIngOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class DirectConsumer {@RabbitHandler@RabbitListener(queues = "chen-DirectQueue" )public void  process(OrderIngOk orderIngOk) throws IOException {try {// 处理业务开始log.debug("接受到消息,并正常处理结束,{}", JSONUtil.toJsonStr(orderIngOk));// 处理业务结束} catch (Exception ex){throw ex;}}
}

结果

2023-08-29 18:27:45.983 DEBUG 15568 --- [           main] com.chensir.provider.DirectProvider      : 消息生产完成
2023-08-29 18:28:16.143 DEBUG 15568 --- [ntContainer#0-1] com.chensir.consumer.DirectConsumer      : 接受到消息,并正常处理结束,{"id":2,"OrderNo":"202207149687-2","userName":"倪海杉后-延迟30秒"}
2023-08-29 18:28:26.057 DEBUG 15568 --- [ntContainer#0-1] com.chensir.consumer.DirectConsumer      : 接受到消息,并正常处理结束,{"id":1,"OrderNo":"202207149687-1","userName":"倪海杉前-延迟40秒"}

可见 延迟40s的是先发送的,但是最终结果是先消费延迟30s的。这样就能达到我们订座的场景需求了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/95178.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么是跨域问题 ?Spring MVC 如何解决跨域问题 ?Spring Boot 如何解决跨域问题 ?

目录 1. 什么是跨域问题 &#xff1f; 2. Spring MVC 如何解决跨域问题 &#xff1f; 3. Spring Boot 如何解决跨域问题 &#xff1f; 1. 什么是跨域问题 &#xff1f; 跨域问题指的是不同站点之间&#xff0c;使用 ajax 无法相互调用的问题。 跨域问题的 3 种情况&#x…

神经网络--感知机

感知机 单层感知机原理 单层感知机:解决二分类问题&#xff0c;激活函数一般使用sign函数,基于误分类点到超平面的距离总和来构造损失函数,由损失函数推导出模型中损失函数对参数 w w w和 b b b的梯度&#xff0c;利用梯度下降法从而进行参数更新。让1代表A类&#xff0c;0代…

Qt +VTK+Cmake 编译和环境配置(第三篇,高级篇, 已解决)

上篇说了&#xff0c;Cmake 虽然可以成功的build&#xff0c;但是大部分人都选择的是VS编译&#xff0c;没有人选择Qt自带的编译器编译。 在build文件夹 shift右键 进入cmd串口&#xff0c;执行mingw32-make mingw32-make 报错&#xff01;&#xff01;&#xff01;&#x…

Qt +VTK+Cmake 编译和环境配置(第一篇 采坑)

VTK下载地址&#xff1a;https://vtk.org/download/ cmake下载地址&#xff1a;https://cmake.org/download/ 版本对应方面&#xff0c;如果你的项目对版本没有要求&#xff0c;就不用在意。我就是自己随机搭建的&#xff0c;VTK选择最新版本吧&#xff0c;如果后面其他的库不…

燃气管网监测系统,提升城市燃气安全防控能力

燃气是我们日常生活中不可或缺的能源&#xff0c;但其具有易燃易爆特性&#xff0c;燃气安全使用、泄漏监测尤为重要。当前全国燃气安全事故仍呈现多发频发态势&#xff0c;从公共安全的视角来看&#xff0c;燃气已成为城市安全的重大隐忧&#xff01;因此&#xff0c;建立一个…

Docker(三) 创建Docker镜像

一、在Docker中拉取最基本的Ubuntu系统镜像 搜索Ubuntu镜像 Explore Dockers Container Image Repository | Docker Hub 下载镜像 docker pull ubuntu:22.04 二、在镜像中添加自己的内容 使用ubuntu镜像创建容器 docker run -it ubuntu:20.04 /bin/bash 在容器中创建了一个文…

RISC-V(2)——特权级及特权指令集

目录 1. 特权级 2. 控制和状态寄存器&#xff08;CSR&#xff09; 2.1 分类 2.2 分析 1. 特权级 一个 RISC-V 硬件线程&#xff08;hart&#xff09;是运行在某个特权级上的&#xff0c;这个特权级被编码到一个或者多个 CSR&#xff08;control and status register&a…

不同写法的性能差异

“ 达到相同目的,可以有多种写法,每种写法有性能、可读性方面的区别,本文旨在探讨不同写法之间的性能差异 len(str) vs str "" 本部分参考自: [问个 Go 问题&#xff0c;字符串 len 0 和 字符串 "" &#xff0c;有啥区别&#xff1f;](https://segmentf…

经管博士科研基础【16】一元二次函数的解的公式

1. 一元二次函数的形式 2. 一元二次函数的图形与性质 一元二次函数的图像是一条抛物线&#xff0c;图像定点公式为(-b/2a,4ac-b*b/4a)&#xff0c;对称轴位直线x-b/2a。 3. 求根公式 形如ax*xb*xc0的一元二次方程&#xff0c;其求根公式为&#xff1a; 4. 韦达定理 如果x1和…

IP网络广播系统有哪些优点

IP网络广播系统有哪些优点 IP网络广播系统有哪些优点&#xff1f; IP网络广播系统是基于 TCP/IP 协议的公共广播系统&#xff0c;采用 IP 局域网或 广域网作为数据传输平台&#xff0c;扩展了公共广播系统的应用范围。随着局域网络和 网络的发展 , 使网络广播的普及变为可能 …

Sentinel 流量控制框架

1. Sentinel 是什么&#xff1f; Sentinel是由阿里中间件团队开源的&#xff0c;面向分布式服务架构的轻量级高可用流量控制组件。 2. 主要优势和特性 轻量级&#xff0c;核心库无多余依赖&#xff0c;性能损耗小。 方便接入&#xff0c;开源生态广泛。 丰富的流量控制场景。 …

手写RPC——数据序列化工具protobuf

手写RPC——数据序列化工具protobuf Protocol Buffers&#xff08;protobuf&#xff09;是一种用于结构化数据序列化的开源库和协议。下面是 protobuf 的一些优点和缺点&#xff1a; 优点&#xff1a; 高效的序列化和反序列化&#xff1a;protobuf 使用二进制编码&#xff0c…