SpringBoot整合RabbitMQ实现消息延迟队列(含源码)

环境依赖

SpringBoot 3.1.0

JDK 17

前期准备

安装MQ:  liunx+docker+rabbitmq安装延迟队列插件

实例

实现延迟队列的一种方式是在 RabbitMQ 中使用消息延迟插件,这个插件可以让你在消息发送时设置一个延迟时间,超过这个时间后消息才会被消费者接收到。下面是 SpringBoot 整合 RabbitMQ 实现延迟队列的简单步骤:

1.添加 RabbitMQ 的 Maven 依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置 RabbitMQ

在 application.properties 配置文件中添加 RabbitMQ 的连接信息:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.virtual-host=/
# 手动应答
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
#每次从队列中取一个,轮询分发,默认是公平分发
spring.rabbitmq.listener.simple.prefetch=1
# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5

3.配置文件

@Configuration
public class RabbitMQOrderConfig {/*** 订单交换机*/public static final String ORDER_EXCHANGE = "order_exchange";/*** 订单队列*/public static final String ORDER_QUEUE = "order_queue";/*** 订单路由key*/public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";/*** 死信交换机*/public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";/*** 死信队列 routingKey*/public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";/*** 死信队列*/public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";/*** 延迟时间 (单位:ms(毫秒))*/public  static final Integer DELAY_TIME = 10000;/*** 创建死信交换机*/@Bean("orderDeadLetterExchange")public Exchange orderDeadLetterExchange() {return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);}/*** 创建死信队列*/@Bean("orderDeadLetterQueue")public Queue orderDeadLetterQueue() {return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();}/*** 绑定死信交换机和死信队列*/@Bean("orderDeadLetterBinding")public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();}/*** 创建订单交换机*/@Bean("orderExchange")public Exchange orderExchange() {return new TopicExchange(ORDER_EXCHANGE, true, false);}/*** 创建订单队列*/@Bean("orderQueue")public Queue orderQueue() {Map<String, Object> args = new HashMap<>(3);//消息过期后,进入到死信交换机args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);//消息过期后,进入到死信交换机的路由keyargs.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);//过期时间,单位毫秒args.put("x-message-ttl", DELAY_TIME);return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();}/*** 绑定订单交换机和队列*/@Bean("orderBinding")public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();}
}

4.定义消息实体类

定义一个消息体类,用来存储需要发送的消息:

@Slf4j
@Data
@Builder
public class OrderMessage implements Serializable {/*** 商户订单号*/private String orderId;/*** 支付宝订单号*/private String tradeNo;
}

5.定义消息发送者

定义一个 RabbitMQ 消息发送者类,用来发送消息到 RabbitMQ:

@Slf4j
@Component
public class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendOrderMessage(OrderMessage message) {//为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定rabbitTemplate.setMandatory(true);//开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息rabbitTemplate.setReturnsCallback(returned -> {int code = returned.getReplyCode();System.out.println("code=" + code);System.out.println("returned=" + returned);});rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", message);log.info("===============延时队列生产消息====================");log.info("发送时间:{},发送内容:{}, {}ms后执行", LocalDateTime.now(), message, RabbitMQConfig.DELAY_TIME);}
}

6.定义消息消费者

定义一个 RabbitMQ 消息消费者类,用来接收并处理消息:

@Component
@Slf4j
@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)
public class OrderMQListener {@RabbitHandlerpublic void consumer(OrderMessage orderMessage, Message message, Channel channel) throws IOException {log.info("收到消息:{}",new Date());log.info("msgTag:{}", message.getMessageProperties().getDeliveryTag());log.info("message:{}", message);log.info("content:{}", orderMessage);}
}

这里使用了 @RabbitListener 注解来将一个方法标记为一个 RabbitMQ 消息监听器,通过设置 queues 属性来指定监听的队列名称。

7.定义一个controller

@Slf4j
@Api(tags = "延迟消息接口")
@RestController
@RequestMapping("/rabbitmq_order_delay_message")
public class RabbitMQDelayMessageController {@Autowiredprivate MessageSender sender;/*** 发送消息* @return*/@RequestMapping(value = "/sendMsg", method = RequestMethod.GET)@ResponseBodypublic void sendMsg() {OrderMessage orderMessage = OrderMessage.builder().orderId(UUID.randomUUID().toString()).tradeNo(UUID.randomUUID().toString()).build();sender.sendOrderMessage(orderMessage);}
}

启动项目,请求运行结果:

image-20230704171525938

总的xml:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId>
</dependency><dependency><groupId>com.xiaoleilu</groupId><artifactId>hutool-all</artifactId><version>3.0.7</version>
</dependency><dependency><groupId>io.swagger</groupId><artifactId>swagger-annotations</artifactId><version>${swagger-annotations.version}</version>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version><scope>compile</scope>
</dependency>

问题总结

1.Invalid argument, ‘x-delayed-type’ must be an existing exchange type

需要创建一个交换机

image-20230703161834850

2.Connection refused: no further information

请检查配置 application.xml配置的rabbimq不生效,可以将配置放到application.properties

3.Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)

这种情况:

1.消费者内部重复签收导致签收异常

​ 解决方案:增加配置手动处理应答

  1. 配置新增
spring.rabbitmq.listener.simple.acknowledge-mode=manual #手动签收
  1. 代码里: 增加channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    public void consumer(String body, Message message, Channel channel) throws IOException {long msgTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("收到消息:" + new Date());System.out.println("msgTag=" + msgTag);System.out.println("message=" + message);System.out.println("body=" + body);channel.basicAck(msgTag, false);}catch (Exception e) {log.error("【订单延迟关闭处理异常】 接收到消息为:" + msgTag + " ,消息异常消费 : ", e);} finally {// 处理完之后手动签收(这里再次签收)channel.basicAck(msgTag, false);}}

2.已经是自动处理了,然后代码里还有手动处理channel.basicAck(msgTag, false)

​ 解决方案:去除channel.basicAck(msgTag, false)

4.Failed to convert message

消息发送和接收的方式不对 比如发送的是对象,则接收的也必须是对象,发送的是string ,接收的也必须是string

image-20230704174011064

image-20230704174057321

如果需要完整源码请关注公众号"架构殿堂" ,回复 "SpringBoot+RabbitMQ实现消息延迟队列"获得

写在最后

如果大家对相关文章感兴趣,可以关注公众号"架构殿堂",会持续更新AIGC,java基础面试题, netty, spring boot,spring cloud等系列文章,一系列干货随时送达!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/7421.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Squid 代理服务器应用】

目录 一、Squid 代理服务器1、代理的工作机制2、代理服务器的概念及其作用3、Squid 代理的类型 二、安装 Squid 服务1&#xff0e;编译安装 Squid2&#xff0e;修改 Squid 的配置文件3&#xff0e;Squid 的运行控制1、检查配置文件语法是否正确2、启动 Squid&#xff0c;第一次…

Spring Boot中的Elasticsearch自动配置:原理与使用

Spring Boot中的Elasticsearch自动配置&#xff1a;原理与使用 简介 在Spring Boot中&#xff0c;Elasticsearch是非常流行的搜索引擎。为了方便开发人员使用Elasticsearch&#xff0c;Spring Boot提供了Elasticsearch自动配置功能。本文将介绍Elasticsearch自动配置的原理与…

mysql ——基本约束以及语法 以及 Dbeaver基本使用

1. 规约 说到约束&#xff0c;就不得不想到命名规范&#xff0c;跟java一样&#xff0c;mysql也有一套自己的命名要求 库名尽量与业务名称一致&#xff0c;比如这是一个办公系统&#xff0c;你可以命名 将数据库命名为office, 多个单词组成全小写 例如&#xff1a;officeoa 表…

Python_装饰器

目录 简单装饰器 语法糖 *args、**kwargs处理有参数的函数 带参数的装饰器 类装饰器 不带参数的类装饰器 带参数的类装饰器 装饰器执行顺序 functools.wraps 讲 Python 装饰器前&#xff0c;我想先举个例子&#xff0c;虽有点污&#xff0c;但跟装饰器这个话题很贴切。…

【javascript】二维码

javascript二维码的生成可以用第三方库qrcode.js。 下载地址&#xff1a;https://gitcode.net/mirrors/davidshimjs/qrcodejs 解压后打开index.html文件输入百度地址回车&#xff0c;就可以看到指定页面的二维码了。 html代码&#xff1a; <!DOCTYPE html PUBLIC "-/…

4.1ORB-SLAM3之处理缓存队列中的关键帧

0.简介 该函数主要包括以下几个部分&#xff1a; 计算该关键帧特征点的Bow信息更新当前关键帧新增地图点的属性更新共视图中关键帧间的连接关系将该关键帧插入到地图中 1.计算该关键帧特征点的Bow信息ComputeBoW() vector<cv::Mat> vCurrentDesc Converter::toDescr…

【MySQL数据库】MMM高可用架构

目录 一 、MMM简介1.1MMM&#xff08;Master-Master replication manager for MvSQL&#xff0c;MySQL主主复制管理器&#xff09;1.2关于 MMM 高可用架构的说明如下 二、搭建mysql MMM架构2.1实验环境2.2搭建多主多从2.3安装配置 MySQL-MMM 一 、MMM简介 1.1MMM&#xff08;M…

四、Docker镜像详情

学习参考&#xff1a;尚硅谷Docker实战教程、Docker官网、其他优秀博客(参考过的在文章最后列出) 目录 前言一、Docker镜像1.1 概念1.2 UnionFS&#xff08;联合文件系统&#xff09;1.3 Docker镜像加载原理1.4 重点理解 二、docker commit 命令2.1 是什么&#xff1f;2.2 命令…

pytorch快速入门中文——01

PyTorch 深度学习&#xff1a;60分钟快速入门 原文&#xff1a;https://pytorch.org/tutorials/beginner/deep_learning_60min_blitz.html 作者&#xff1a; Soumith Chintala https://www.youtube.com/embed/u7x8RXwLKcA 什么是 PyTorch&#xff1f; PyTorch 是基于以下两个…

无限极 × 盖雅工场|劳动力管理系统项目正式启动,为多工厂管理保驾护航

6月12日&#xff0c;无限极盖雅工场劳动力管理系统启动大会在广东江门举行。无限极IT供应链系统负责人毛松和、智能制造总监胡波、新会生产中心负责人胡流云、营口生产中心负责人源博恩和人才资源共享服务负责人林岳&#xff0c;以及盖雅工场华南总经理潘磊等出席了启动大会。 …

uniapp打包白屏问题

【bug】&#xff1a;浏览器运行正常&#xff0c;模拟器、真机运行只有tab栏显示&#xff0c;或者完全白屏。打包也是白屏。 【控制台报错信息】&#xff1a; 注意&#xff1a;app不支持dom操作 【解决办法】&#xff1a;在main.js里修改 render函数是vue通过js渲染dom结构的…

html5学习精选5篇案例

html5学习心得1 一&#xff1a;了解HTML5前端开发技术 HTML 指的是超文本标记语言 (Hyper Text Markup Language)&#xff0c;标记语言是一套标记标签 (markup tag)&#xff0c;HTML 使用标记标签来描述网页。HTML5区别于HTML的标准&#xff0c;基于全新的规则手册&#xff0…