使用RabbitMQ死信队列关闭未支付的订单

一、什么是RabbitMQ死信队列

RabbitMQ死信队列(Dead-Letter Exchange,简称DLX)是一种特殊类型的交换机,用于处理在队列中无法被消费的消息。当消息无法被消费时,它会被转发到死信队列中,以便进一步处理。

在RabbitMQ中,死信队列通常用于处理以下情况:

  1. 消息无法被消费者处理:例如,如果消费者崩溃或消息的格式不正确,则无法处理消息。此时,消息将被发送到死信队列进行进一步处理。
  2. 消息的优先级较低:如果消息的优先级较低,则可能无法在队列中得到及时处理。在这种情况下,消息也会被发送到死信队列中,以确保它最终被处理。

要使用死信队列,需要创建一个普通的交换机和一个普通的队列,然后创建一个死信队列并将其绑定到普通队列上。当消息无法被消费时,它将被发送到死信队列中。

二、RabbitMQ关单逻辑

1. 流程图

在这里插入图片描述

  • 订单创建成功后, 发送消息给order-event-exchange交换机,采用路由键order.create.order
  • order-event-exchange交换机将消息转发给order.delay.queue队列,队列保存时间为30分钟,如果没有消费,则再将消息路由到order-event-exchange交换机,采用路由键order.release.order
  • order-event-exchange交换机再将消息转发到死信队列 order-realease-order.queue,采用路由键order.release.order
  • 监听死信队列 order-realease-order.queue,如果订单状态为“待付款”的,说明支付不成功,改为“取消”关闭订单

三、Springboot配置RabbitMQ

1. 添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 参数配置

spring:rabbitmq:host: 127.0.0.1port: 5672# 虚拟主机virtual-host: /# 开启发送端发送确认,无论是否到达broker都会触发回调【发送端确认机制+本地事务表】publisher-confirm-type: correlated# 开启发送端抵达队列确认,消息未被队列接收时触发回调【发送端确认机制+本地事务表】publisher-returns: true# 消息在没有被队列接收时是否强行退回template:mandatory: true# 消费者手动确认模式,关闭自动确认,否则会消息丢失listener:simple:acknowledge-mode: manual

3、RabbitMQ模板配置

@Configuration
@Slf4j
public class MyRabbitConfig {@Primary@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {// TODO 封装RabbitTemplateRabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter());initRabbitTemplate(rabbitTemplate);return rabbitTemplate;}@Beanpublic MessageConverter messageConverter() {// 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式return new Jackson2JsonMessageConverter();}/*** 定制RabbitTemplate* 1、服务收到消息就会回调* 1、spring.rabbitmq.publisher-confirms: true* 2、设置确认回调* 2、消息正确抵达队列就会进行回调* 1、spring.rabbitmq.publisher-returns: true*    spring.rabbitmq.template.mandatory: true* 2、设置确认回调ReturnCallback* <p>* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)*///@PostConstruct   // (MyRabbitConfig对象创建完成以后,执行这个方法)public void initRabbitTemplate(RabbitTemplate rabbitTemplate) {/*** 发送消息触发confirmCallback回调* @param correlationData:当前消息的唯一关联数据(如果发送消息时未指定此值,则回调时返回null)* @param ack:消息是否成功收到(ack=true,消息抵达Broker)* @param cause:失败的原因*/rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {log.info("发送消息触发confirmCallback回调" +"\ncorrelationData ===> " + correlationData +"\nack ===> " + ack + "" +"\ncause ===> " + cause);log.info("=================================================");});/*** 消息未到达队列触发returnCallback回调* 只要消息没有投递给指定的队列,就触发这个失败回调* @param message:投递失败的消息详细信息* @param replyCode:回复的状态码* @param replyText:回复的文本内容* @param exchange:接收消息的交换机* @param routingKey:接收消息的路由键*/rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息未到达队列触发returnCallback回调" +"\nmessage ===> " + message +"\nreplyCode ===> " + replyCode +"\nreplyText ===> " + replyText +"\nexchange ===> " + exchange +"\nroutingKey ===> " + routingKey);// TODO 修改mq_message,设置消息状态为2-错误抵达【后期定时器重发消息】});}
}

4、启动RabbitMQ

@SpringBootApplication
@EnableRabbit //启用RabbitMQ自动配置
public class Application implements CommandLineRunner {   public static void main(String[] args) throws Exception {SpringApplication.run(Application.class, args);}
}

四、关单业务流程

1. 提交订单完成后,发送关单消息

1. 提交订单控制层

/*** 创建订单* 创建成功,跳转订单支付页* 创建失败,跳转结算页* 无需提交要购买的商品,提交订单时会实时查询最新的购物车商品选中数据提交*/@TokenVerify@PostMapping(value = "/submitOrder")public String submitOrder(OrderSubmitVO vo, Model model, RedirectAttributes attributes) {try {SubmitOrderResponseVO orderVO = orderService.submitOrder(vo);// 创建订单成功,跳转收银台model.addAttribute("submitOrderResp", orderVO);// 封装VO订单数据,供页面解析[订单号、应付金额]return "pay";} catch (Exception e) {// 下单失败回到订单结算页if (e instanceof VerifyPriceException) {String message = ((VerifyPriceException) e).getMessage();attributes.addFlashAttribute("msg", "下单失败" + message);} else if (e instanceof NoStockException) {String message = ((NoStockException) e).getMessage();attributes.addFlashAttribute("msg", "下单失败" + message);}return "redirect:http://order.kmall.com/toTrade";}}

2. 提交订单实现层

@Transactional(isolation = Isolation.DEFAULT)@Overridepublic SubmitOrderResponseVO submitOrder(OrderSubmitVO orderSubmitVO) throws Exception {SubmitOrderResponseVO result = new SubmitOrderResponseVO();// 返回值// 创建订单线程共享提交数据confirmVoThreadLocal.set(orderSubmitVO);// 1.生成订单实体对象(订单 + 订单项)OrderCreateTO order = this.createOrder();// 2.验价应付金额(允许0.01误差,前后端计算不一致)if (Math.abs(orderSubmitVO.getPayPrice().subtract(order.getPayPrice()).doubleValue()) >= 0.01) {// 验价不通过throw new VerifyPriceException();}// 验价成功// 3.保存订单saveOrder(order);// 4.库存锁定(wms_ware_sku)// 封装待锁定商品项TOWareSkuLockTO lockTO = new WareSkuLockTO();lockTO.setOrderSn(order.getOrder().getOrderSn());List<OrderItemVO> itemList = order.getOrderItems().stream().map((item) -> {OrderItemVO itemVO = new OrderItemVO();itemVO.setSkuId(item.getSkuId());itemVO.setCount(item.getSkuQuantity());itemVO.setTitle(item.getSkuName());return itemVO;}).collect(Collectors.toList());lockTO.setLocks(itemList);// 待锁定订单项R response = wmsFeignService.orderLockStock(lockTO);if (response.getCode() == 0) {// 锁定成功// TODO 5.远程扣减积分// 封装响应数据返回result.setOrder(order.getOrder());//System.out.println(10 / 0); // 模拟订单回滚,库存不会滚// 6.发送创建订单到延时队列rabbitTemplate.convertAndSend(MQConstant.order_event_exchange, MQConstant.order_create_routekey, order.getOrder());return result;} else {// 锁定失败throw new NoStockException("");}}

2. 在容器注入消息交换机、队列并进行绑定

@Configuration
public class MyRabbitMQConfig {/*** 延时队列*/@Beanpublic Queue orderDelayQueue() {/*** Queue(String name,  队列名字*       boolean durable,  是否持久化*       boolean exclusive,  是否排他*       boolean autoDelete, 是否自动删除*       Map<String, Object> arguments) 属性【TTL、死信路由、死信路由键】*/HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", MQConstant.order_event_exchange);// 死信路由arguments.put("x-dead-letter-routing-key", MQConstant.order_release_routekey);// 死信路由键arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟return new Queue(MQConstant.order_delay_queue, true, false, false, arguments);}/*** 交换机(死信路由)*/@Beanpublic Exchange orderEventExchange() {return new TopicExchange(MQConstant.order_event_exchange, true, false);}/*** 死信队列*/@Beanpublic Queue orderReleaseQueue() {return new Queue(MQConstant.order_release_queue, true, false, false);}/*** 绑定:交换机与订单解锁延迟队列*/@Beanpublic Binding orderCreateBinding() {/*** String destination, 目的地(队列名或者交换机名字)* DestinationType destinationType, 目的地类型(Queue、Exhcange)* String exchange,* String routingKey,* Map<String, Object> arguments**/return new Binding(MQConstant.order_delay_queue,Binding.DestinationType.QUEUE,MQConstant.order_event_exchange,MQConstant.order_create_routekey,null);}/*** 绑定:交换机与订单解锁死信队列*/@Beanpublic Binding orderReleaseBinding() {return new Binding(MQConstant.order_release_queue,Binding.DestinationType.QUEUE,MQConstant.order_event_exchange,MQConstant.order_release_routekey,null);}/*** 绑定:交换机与库存解锁*/@Beanpublic Binding orderReleaseOtherBinding() {return new Binding(MQConstant.stock_release_queue,Binding.DestinationType.QUEUE,MQConstant.order_event_exchange,"order.release.other.#",null);}
}

3. 监听死信队列,进行关单,确认消息

@Slf4j
@RabbitListener(queues = MQConstant.order_release_queue)
@Component
public class OrderCloseListener {@AutowiredOrderService orderService;/*** 定时关单,监听死信队列,如果死信队列  消息过期时间 1分钟 后没有消费,就该关单* @param order* @param message* @param channel* @throws IOException*/@RabbitHandlerpublic void handleOrderRelease(OrderEntity order, Message message, Channel channel) throws IOException {log.debug("订单解锁,订单号:" + order.getOrderSn());try {orderService.closeOrder(order);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}}

4. 关闭订单

只有是待付款状态才要关单

@Overridepublic void closeOrder(OrderEntity order) {OrderEntity _order = this.getById(order.getId());//只有是待付款状态才要关单if (OrderConstant.OrderStatusEnum.CREATE_NEW.getCode().equals(_order.getStatus())) {// 待付款状态允许关单OrderEntity temp = new OrderEntity();temp.setId(order.getId());temp.setStatus(OrderConstant.OrderStatusEnum.CANCLED.getCode());updateById(temp);try {// 发送消息给MQOrderTO orderTO = new OrderTO();BeanUtils.copyProperties(_order, orderTO);//TODO 持久化消息到mq_message表中,并设置消息状态为3-已抵达(保存日志记录)rabbitTemplate.convertAndSend(MQConstant.order_event_exchange, "order.release.other", orderTO);} catch (Exception e) {// TODO 消息为抵达Broker,修改mq_message消息状态为2-错误抵达}}}

五、源码下载

https://gitee.com/charlinchenlin/koo-erp

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/2491.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云原生——Docker容器化实战

❄️作者介绍&#xff1a;奇妙的大歪❄️ &#x1f380;个人名言&#xff1a;但行前路&#xff0c;不负韶华&#xff01;&#x1f380; &#x1f43d;个人简介&#xff1a;云计算网络运维专业人员&#x1f43d; 前言 "Docker"一词指代了多个概念&#xff0c;包括开源…

云端安全由繁到简,亚马逊云科技护航业务创新新局面

数字化愿景与现实存在的差距困扰着诸多企业&#xff0c;但造成这种差距的一个重要因素却一直被很多管理者所忽视&#xff0c;那就是企业未能建立应有的数字安全与合规体系。应用迭代的速度加快、数据快速膨胀、企业云原生道路上遭遇的种种困境&#xff0c;与数字安全部门有限的…

大学智慧课堂系统整理

目录 一、题目类型选择器(非组件库) 1.1、效果展示 1.2、代码展示 二、题目类型选择器(Vant组件库) 2.1、效果展示 2.2、代码展示 一、题目类型选择器(非组件库) 使用vue2&#xff1a;在methods里区分单个点击和多个点击&#xff0c;在view视图区分判断题和选择题。 如下…

【微服务】springboot 通用限流方案设计与实现

目录 一、背景 二、限流概述 2.1 dubbo 服务治理模式 2.1.1 dubbo框架级限流 2.1.2 线程池设置 2.1.3 集成第三方组件 2.2 springcloud 服务治理模式 2.2.1 hystrix 2.2.2 sentinel 2.3 网关层限流 三、常用限流策略 3.1 限流常用的算法 3.1.1 令牌桶算法 3.1.2 …

chatgpt赋能python:同一行Python给两个变量赋值:如何提高编程效率?

同一行Python给两个变量赋值&#xff1a;如何提高编程效率&#xff1f; 作为Python编程方面经验丰富的工程师&#xff0c;我们都知道Python是一种非常易学易用的编程语言&#xff0c;其灵活性和高效性问题业已广为人知。然而&#xff0c;当我们在同时对多个变量进行赋值时&…

NLP学习笔记(三)

一&#xff1a;分类方法 &#xff08;一&#xff09;逻辑回归 最简单的方法就是将分类问题视为回归问题&#xff0c;采用逻辑回归计算分类的边界。 &#xff08;二&#xff09;softmax回归 softmax的前向传播过程可以分为以下三步&#xff1a; h W T x y ^ s o f t m a …

chatgpt赋能python:如何升级Python包

如何升级Python包 如果你是一名有着10年Python编程经验的工程师&#xff0c;那么你一定知道如何安装和使用Python包。但是&#xff0c;有时候你需要升级一些已经安装的包&#xff0c;以获得更好的性能和新功能。在本文中&#xff0c;我们将讨论如何升级Python包。 什么是Pyth…

[conda]tf_agents和tensorflow-gpu安装傻瓜式教程

1.打开终端或Anaconda Prompt&#xff08;Windows用户&#xff09;。 2.输入以下命令创建新的Python环境&#xff1a; conda create --name <env_name> python<version>其中&#xff0c;<env_name>是您想要创建的环境名称&#xff0c;<version>是您想…

HHU云计算期末复习(下)Hadoop、虚拟化技术、openstack

文章目录 第五章 Hadoop分布式文件系统HDFS分离元数据和数据&#xff1a;NameNode和DataNode流水线复制 第七章 虚拟化技术7.1 虚拟化技术简介7.2 虚拟机迁移7.3 网络虚拟化 第八章 openstack8.1 计算服务NovaRabbitMQ 8.2 Swift 第九章 云计算数据中心9.1 云数据中心特征9.2 网…

C# 多数元素

169 多数元素 给定一个大小为 n 的数组 nums &#xff0c;返回其中的多数元素。多数元素是指在数组中出现次数 大于 ⌊ n/2 ⌋ 的元素。 你可以假设数组是非空的&#xff0c;并且给定的数组总是存在多数元素。 示例 1&#xff1a; 输入&#xff1a;nums [3,2,3] 输出&…

Java面试Day12

1.意向锁是什么&#xff1f;有什么作用&#xff1f;它是表级锁还是行级锁&#xff1f; 意向锁是什么 在使用 InnoDB 引擎的表里时对某些记录加上「共享锁」之前&#xff0c;需要先在表级别加上一个「意向共享锁」 在使用 InnoDB 引擎的表里时对某些记录加上「独占锁」之前&…

升级HarmonyOS 3,通话一步切换更便捷

小伙伴们&#xff0c;今天和大家来聊聊HarmonyOS 3音频播控中心有哪些真香体验。不少朋友可能会脱口而出&#xff1a;一键切换音频App&#xff0c;一键实现音频跨设备流转&#xff0c;还有音频共享。这一次&#xff0c;音频播控中心又带来了新技能——一键切换通话音频。 相信大…