SpringBoot 整合RabbitMQ 之延迟队列实验

系列文章目录

第一章 Java线程池技术应用
第二章 CountDownLatch和Semaphone的应用
第三章 Spring Cloud 简介
第四章 Spring Cloud Netflix 之 Eureka
第五章 Spring Cloud Netflix 之 Ribbon
第六章 Spring Cloud 之 OpenFeign
第七章 Spring Cloud 之 GateWay
第八章 Spring Cloud Netflix 之 Hystrix
第九章 代码管理gitlab 使用
第十章 SpringCloud Alibaba 之 Nacos discovery
第十一章 SpringCloud Alibaba 之 Nacos Config
第十二章 Spring Cloud Alibaba 之 Sentinel
第十三章 JWT
第十四章 RabbitMQ应用
第十五章 RabbitMQ 延迟队列

在这里插入图片描述


文章目录

  • 系列文章目录
    • @[TOC](文章目录)
  • 前言
  • 1、RabbitMQ延迟队列
    • 1.1、方式1:RabbitMQ通过死信机制来实现延迟队列的功能
    • 1.2、方式二:安装延迟队列插件
      • 1.2.1、安装延迟队列插件:
  • 2、消息确认机制
    • 2.1、生产确认
    • 2.2、消费确认

前言

实际业务中,例如秒杀系统,秒杀商品成功会有截止时间,这时需要用到RabbitMQ延迟服务。

1、RabbitMQ延迟队列

1.1、方式1:RabbitMQ通过死信机制来实现延迟队列的功能

  • TTL ,即 Time-To-Live,存活时间,消息和队列都可以设置存活时间
  • Dead Letter,即死信,若给消息设置了存活时间,当超过存活时间后消息还没有被消费,则该消息变成了死信
  • Dead Letter Exchanges(DLX),即死信交换机
  • Dead Letter Routing Key (DLK),死信路由键
/***********************延迟队列*************************/
//创建立即消费队列
@Bean
public Queue immediateQueue(){return new Queue("immediateQueue");
}
//创建立即消费交换机
@Bean
public DirectExchange immediateExchange(){return new DirectExchange("immediateExchange");
}
@Bean
public Binding bindingImmediate(@Qualifier("immediateQueue") Queue queue,@Qualifier("immediateExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("immediateRoutingKey");
}//创建延迟队列
@Bean
public Queue delayQueue(){Map<String,Object> params = new HashMap<>();//死信队列转发的死信转发到立即处理信息的交换机params.put("x-dead-letter-exchange","immediateExchange");//死信转化携带的routing-keyparams.put("x-dead-letter-routing-key","immediateRoutingKey");//设置消息过期时间,单位:毫秒params.put("x-message-ttl",60 * 1000);return new Queue("delayQueue",true,false,false,params);
}@Bean
public DirectExchange delayExchange(){return new DirectExchange("delayExchange");
}@Bean
public Binding bindingDelay(@Qualifier("delayQueue") Queue queue,@Qualifier("delayExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("delayRoutingKey");
}
@Test
public void sendDelay(){this.rabbitTemplate.convertAndSend("delayExchange","delayRoutingKey","Hello world topic");
}

1.2、方式二:安装延迟队列插件

1.2.1、安装延迟队列插件:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

/**************延迟队列一个单一queue*******************/
@Bean
public Queue delayNewQueue(){return new Queue("delayNewQueue");
}
@Bean
public CustomExchange delayNewExchange(){Map<String, Object> args = new HashMap<>();// 设置类型,可以为fanout、direct、topicargs.put("x-delayed-type", "direct");return new CustomExchange("delayNewExchange","x-delayed-message", true,false,args);
}
@Bean
public Binding bindingNewDelay(@Qualifier("delayNewQueue") Queue queue,@Qualifier("delayNewExchange") CustomExchange customExchange){return BindingBuilder.bind(queue).to(customExchange).with("delayNewRoutingKey").noargs();
}
@Test
public void sendDelay() {//生产端写完了UserInfo userInfo = new UserInfo();userInfo.setPassword("13432432");userInfo.setUserAccount("kelvin");this.rabbitTemplate.convertAndSend("delayNewExchange", "delayNewRoutingKey", userInfo, a -> {//单位毫秒a.getMessageProperties().setDelay(30000);return a;});
}

2、消息确认机制

消息确认分为两部分: 生产确认 和 消费确认。

生产确认: 生产者生产消息后,将消息发送到交换机,触发确认回调;交换机将消息转发到绑定队列,若失败则触发返回回调。
消费确认: 默认情况下消息被消费者从队列中获取后即发送确认,不管消费者处理消息时是否失败,不需要额外代码,但是不能保证消息被正确消费。我们增加手动确认,则需要代码中明确进行消息确认。

2.1、生产确认

@Bean
public RabbitTemplate getTemplate(ConnectionFactory connectionFactory){RabbitTemplate template = new RabbitTemplate(connectionFactory);//消息发送到交换器Exchange后触发回调template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {//  可以进行消息入库操作log.info("消息唯一标识 correlationData = {}", correlationData);log.info("确认结果 ack = {}", ack);log.info("失败原因 cause = {}", cause);}});// 配置这个,下面的ReturnCallback 才会起作用template.setMandatory(true);// 如果消息从交换器发送到对应队列失败时触发(比如 根据发送消息时指定的routingKey找不到队列时会触发)template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {//  可以进行消息入库操作log.info("消息主体 message = {}", returnedMessage.getMessage());log.info("回复码 replyCode = {}", returnedMessage.getReplyCode());log.info("回复描述 replyText = {}", returnedMessage.getReplyText());log.info("交换机名字 exchange = {}", returnedMessage.getExchange());log.info("路由键 routingKey = {}", returnedMessage.getRoutingKey());}});return template;
}
spring:cloud:nacos:discovery:server-addr: localhost:8848application:name: drp-user-service  #微服务名称datasource:username: rootpassword: rooturl: jdbc:mysql://127.0.0.1:3306/drpdriver-class-name: com.mysql.cj.jdbc.Driverrabbitmq:host: 127.0.0.1port: 5672username: rootpassword: rootvirtual-host: root_vh# 确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated# 确认消息已发送到队列publisher-returns: truelistener:simple:acknowledge-mode: manual # 开启消息消费手动确认retry:enabled: true

2.2、消费确认

@RabbitHandler
public void process(UserInfo data, Message message, Channel channel){log.info("收到directQueue队列信息:" + data);long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//成功消费确认channel.basicAck(deliveryTag,true);log.info("消费成功确认完毕。。。。。");} catch (IOException e) {log.error("确认消息时抛出异常 ", e);        // 重新确认,成功确认消息try {Thread.sleep(50);channel.basicAck(deliveryTag, true);} catch (IOException | InterruptedException e1) {log.error("确认消息时抛出异常 ", e);// 可以考虑入库}}catch (Exception e){log.error("业务处理失败", e);try {// 失败确认channel.basicNack(deliveryTag, false, false);} catch (IOException e1) {log.error("消息失败确认失败", e1);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/164216.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

01-基于IDEA,Spring官网,阿里云官网,手动四种方式创建SpringBoot工程

快速上手SpringBoot SpringBoot技术由Pivotal团队研发制作&#xff0c;功能的话简单概括就是加速Spring程序初始搭建过程和Spring程序的开发过程的开发 最基本的Spring程序至少有一个配置文件或配置类用来描述Spring的配置信息现在企业级开发使用Spring大部分情况下是做web开…

计算机毕业设计java+springboot+vue的旅游攻略平台

项目介绍 本系统结合计算机系统的结构、概念、模型、原理、方法&#xff0c;在计算机各种优势的情况下&#xff0c;采用JAVA语言&#xff0c;结合SpringBoot框架与Vue框架以及MYSQL数据库设计并实现的。员工管理系统主要包括个人中心、用户管理、攻略管理、审核信息管理、积分…

蓝桥杯每日一题2023.11.5

题目描述 方格分割 - 蓝桥云课 (lanqiao.cn) 题目分析 对于每个图我们可以从中间开始搜索&#xff0c;如果到达边界点就说明找到了一种对称的方法&#xff0c;我们可以直接对此进行答案记录每次进行回溯就会找到不同的图像&#xff0c;如果是一样的图像则算一种情况&#xff…

Mac VsCode g++编译报错:不支持C++11语法解决

编译运行时报错&#xff1a; [Running] cd “/Users/yiran/Documents/vs_projects/c/” && g 1116.cpp -o 1116 && "/Users/yiran/Documents/vs_projects/c/"1116 1116.cpp:28:22: warning: range-based for loop is a C11 extension [-Wc11-extensi…

python如何使用gspread读取google在线excel数据?

一、背景 公司使用google在线excel管理测试用例&#xff0c;为了方便把手工测试用到的测试数据用来做自动化用例测试数据&#xff0c;所以就想使用python读取在线excel数据&#xff0c;通过数据驱动方式&#xff0c;完成自动化回归测试&#xff0c;提升手动复制&#xff0c;粘…

vue2 集成 - 超图 - SuperMap iClient3D for WebGL 及常用方法

1:下载SuperMap iClient3D for WebGL SuperMap iClient3D for WebGL产品包 打开资源目录如下 2:格式化项目中所用的依赖包 开发指南 从超图官网下载SuperMap iClient3D 11i (2023) SP1 for WebGL_CN.zip解压后,将Build目录下的SuperMap3D复制到项目中 \public\static…

【T+】畅捷通T+账套恢复时提示:Wrong Local header signature。

【问题描述】 畅捷通T软件使用账套维护工具恢复账套的时候&#xff0c; 提示&#xff1a;错误的本地标头签名 Wrong Local header signature: 0xEA12AEAE。 【问题原因】 是用户T服务是使用的Nginx。 Nginx下载压缩包有bug&#xff0c;导致压缩包有问题。 【解决方法】 1、打…

笔记50:正则表达式入门宝典

引自&#xff1a;正则表达式是什么? - 知乎 中“龙吟九野”所写的一个回答&#xff0c;个人感觉看完之后如同醍醐灌顶&#xff0c;查了很多资料都没有这篇文章写的基础和通透&#xff0c;感觉是正则表达式扫盲好文&#xff0c;所以搬运一下&#xff0c;侵权删&#xff0c;感谢…

1. Collection,List, Map, Queue

1. java集合框架体系结构图 2. Collection派生的子接口 其中最重要的子接口是&#xff1a; 1&#xff09;List 表示有序可重复列表&#xff0c;重要的实现类有&#xff1a;ArrayList, LinkedList ArrayList特点&#xff1a;底层数组实现&#xff0c;随机查找快&#xff0c;增删…

[云原生案例2.1 ] Kubernetes的部署安装 【单master集群架构 ---- (二进制安装部署)】

文章目录 1. 常见的K8S安装部署方式1.1 Minikube1.2 Kubeadm1.3 二进制安装部署 2. Kubernetes单master集群架构 ---- &#xff08;二进制安装部署&#xff09;2.1 前置准备2.2 操作系统初始化2.3 部署 docker引擎 ---- &#xff08;所有 node 节点&#xff09;2.4 部署 etcd 集…

python对Windows如何进行关机/重启?

用CMD命令进行关机/重启步骤&#xff1a; 1.winR&#xff0c;换出输入框 2.在输入框输入命令&#xff0c;如关机&#xff1a;shutdown -s -t 20&#xff0c;该命令是20秒后关机。 命令说明 -s 关机 -r 重启 -t 时间&#xff0c;后面是数字是你要设置的秒数 -a 取消命令&…

在 “219.**** 找不到用于监控项 key“agent.hostname“ 的主机接口.

细节 无法添加主机 在 "219.151" 找不到用于监控项 key"agent.hostname" 的主机接口.z 这个时候要改一下 方式&#xff1a;