支付模块-基于消息队列发送支付通知消息

消息队列发送支付通知消息

需求分析

订单服务作为通用服务,在订单支付成功后需要将支付结果异步通知给其他对接的微服务,微服务收到支付结果根据订单的类型去更新自己的业务数据

在这里插入图片描述

技术方案

使用消息队列进行异步通知需要保证消息的可靠性即生产端将消息成功通知到服务端: 消息发送到交换机 --> 由交换机发送到队列 --> 消费者监听队列,收到消息进行处理,参考文章02- 使用Docker安装RabbitMQ-CSDN博客

  • 生产者确认机制: 发送消息前使用数据库事务将消息保证到数据库表中,成功发送到交换机将消息从数据库中删除

  • 配置MQ持久化(交换机、队列、发送消息):MQ收到消息持久化,当MQ重启时即使消息没有消费完也不会丢失

  • 消费者确认机制: 消费者消费成功,自动发送ACK,负责重试消费

发布订阅模式: 订单服务接收支付成功结果通知后创建一条消息发送给Fanout广播类型的交换机,学习中心服务绑定队列到交换机接收消息,参考文章04- 基于SpringAMQP封装RabbitMQ,消息队列的Work模型和发布订阅模型-CSDN博客

环境搭建

第一步: 在订单服务和学习中心服务中添加消息队列依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:在Nacos的dev环境下添加RabbitMQ的配置信息rabbitmq-dev.yaml,设置group为xuecheng-plus-common

spring:rabbitmq:host: 192.168.101.128 # 主机port: 5672 # 端口名username: root # 用户名password: root # 密码virtual-host: / # 虚拟主机publisher-confirm-type: correlated # 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallbackpublisher-returns: true # 开启publish-return功能,同样是基于callback机制调用回调函数ReturnCallbacktemplate:mandatory: true # 定义消息路由失败时的策略,true表示调用ReturnCallback;false表示直接丢弃消息listener:simple:# 每次只能获取一条消息,处理完成才能获取下一个消息prefetch: 1  # auto:出现异常时返回unack且消息回滚到mq,如果没有异常直接返回ack# manual:手动控制# none:丢弃消息不回滚到mqacknowledge-mode: auto retry:enabled: false # 开启消费者失败重试initial-interval: 5000ms # 初始的失败等待时长为几秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态;如果业务中包含事务需要改为false

第三步:在订单服务和学习中心服务的接口工程中引入rabbitmq-dev.yaml配置文件

- data-id: rabbitmq-${spring.profiles.active}.yamlgroup: xuecheng-plus-commonrefresh: true

第四步: 在订单服务的service工程编写MQ配置类PayNotifyConfig创建交换机和队列

@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {// 交换机public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";// 支付结果通知消息类型public static final String MESSAGE_TYPE = "payresult_notify";// 支付通知队列public static final String PAYNOTIFY_QUEUE = "paynotify_queue";// 声明交换机且持久化@Bean(PAYNOTIFY_EXCHANGE_FANOUT)public FanoutExchange paynotify_exchange_fanout() {// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);}//支付通知队列且持久化@Bean(PAYNOTIFY_QUEUE)public Queue course_publish_queue() {return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();}// 交换机和支付通知队列绑定@Beanpublic Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}// 交换机路由消息到队列的时候如果失败执行回调函数@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 消息处理serviceMqMessageService mqMessageService = applicationContext.getBean(MqMessageService.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 消息发送失败记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 解析消息内容,将消息再添加到消息表MqMessage mqMessage = JSON.parseObject(message.toString(), MqMessage.class);mqMessageService.addMessage(mqMessage.getMessageType(), mqMessage.getBusinessKey1(), mqMessage.getBusinessKey2(), mqMessage.getBusinessKey3());});}
}

第五步: 在学习中心服务编写MQ配置类PayNotifyConfig创建交换机和队列,避免学习中心服务启动的时候监听的队列还没有创建,如果生产端已经创建就不再创建

@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {// 声明交换机,支付通知队列,交换机和支付通知队列绑定关系// 不用设置回调函数,只有生产者才需要确认 
}

重启订单服务,登录rabbitmq查看交换机自动创建成功

在这里插入图片描述

生产者发送信息

在订单服务的OrderService中定义接口接收支付宝响应的通知消息结果并发送给学习中心服务

public interface OrderService {/*** 接收通知结果并发送给学习中心服务* @param mq	Message 消息*/void notifyPayResult(MqMessage mqMessage);
}
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {@AutowiredMqMessageService mqMessageService;@AutowiredRabbitTemplate rabbitTemplate;@Overridepublic void notifyPayResult(MqMessage mqMessage) {// 1. 将消息体转为JsonString jsonMsg = JSON.toJSONString(mqMessage);// 2. 设置消息的持久化方式为PERSISTENT,即消息会被持久化到磁盘上,确保即使在RabbitMQ服务器重启后也能够恢复消息Message msgObj = MessageBuilder.withBody(jsonMsg.getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 3. 封装CorrelationData,用于跟踪指定Id消息的相关信息CorrelationData correlationData = new CorrelationData(mqMessage.getId().toString());// 3.1 使用CorrelationData添加一个Callback对象指定回调方法,该对象用于在消息确认时处理消息的结果correlationData.getFuture().addCallback(result -> {if (result.isAck()) {// 3.2 消息成功发送到交换机,删除消息表中的记录log.debug("消息发送成功:{}", jsonMsg);mqMessageService.completed(mqMessage.getId());} else {// 3.3 消息发送失败log.error("消息发送失败,id:{},原因:{}", mqMessage.getId(), result.getReason());}}, ex -> {// 3.4 消息异常可能是网络问题log.error("消息发送异常,id:{},原因:{}", mqMessage.getId(), ex.getMessage());});// 4. 发送消息rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, "", msgObj, correlationData);}
}

订单服务收到第三方平台的支付结果时,在saveAliPayStatus方法中除了保存支付宝响应的结果信息还需要向数据库消息表添加消息记录将消息封装好后发送给消费端

	/*** 保存支付结果信息,向数据库中的消息表添加消息并发送给消费端* @param payStatusDto 支付结果信息*/
@Transactional
@Override
public void saveAlipayStatus(PayStatusDto payStatusDto) {// 1. 获取支付流水号String payNo = payStatusDto.getOut_trade_no();// 2. 查询数据库订单状态XcPayRecord payRecord = getPayRecordByPayNo(payNo);if (payRecord == null) {XueChengPlusException.cast("未找到支付记录");}XcOrders order = xcOrdersMapper.selectById(payRecord.getOrderId());if (order == null) {XueChengPlusException.cast("找不到相关联的订单");}String statusFromDB = payRecord.getStatus();// 2.1 已支付,直接返回if ("600002".equals(statusFromDB)) {return;}// 3. 查询支付宝交易状态String tradeStatus = payStatusDto.getTrade_status();// 3.1 支付宝交易已成功,保存订单表和交易记录表,更新交易状态if ("TRADE_SUCCESS".equals(tradeStatus)) {// 更新支付交易表payRecord.setStatus("601002");payRecord.setOutPayNo(payStatusDto.getTrade_no());payRecord.setOutPayChannel("Alipay");payRecord.setPaySuccessTime(LocalDateTime.now());int updateRecord = xcPayRecordMapper.updateById(payRecord);if (updateRecord <= 0) {XueChengPlusException.cast("更新支付交易表失败");}// 更新订单表order.setStatus("600002");int updateOrder = xcOrdersMapper.updateById(order);if (updateOrder <= 0) {log.debug("更新订单表失败");XueChengPlusException.cast("更新订单表失败");}}// 4. 创建消息记录并保存到消息表中,参数1:支付结果类型通知;参数2:业务id;参数3:业务类型MqMessage mqMessage = mqMessageService.addMessage("payresult_notify", order.getOutBusinessId(), order.getOrderType(), null);// 5. 封装消息记录并发送给消费端notifyPayResult(mqMessage);
}

消费者接收消息

在学习中心服务定义impl/ReceivePayNotifyService

  • 监听消息队列接收支付结果, 当接收到消息后更新选课记录表的选课状态为选课成功,同时向我的课程表中插入一条课程记录
@Slf4j
@Service
public class ReceivePayNotifyService {@AutowiredMyCourseTablesService tablesService;@RabbitListener(queues = PayNotifyConfig.PAYNOTIFY_QUEUE)public void receive(Message message) {// 1. 获取消息MqMessage mqMessage = JSON.parseObject(message.getBody(), MqMessage.class);// 2. 根据消息内容,更新选课记录,向我的课程表插入记录// 2.1 消息类型,学习中心只处理支付结果的通知String messageType = mqMessage.getMessageType();// 2.2 选课idString chooseCourseId = mqMessage.getBusinessKey1();// 2.3 订单类型,60201表示购买课程String orderType = mqMessage.getBusinessKey2();// 3. 学习中心只负责处理支付结果的通知if (PayNotifyConfig.MESSAGE_TYPE.equals(messageType)){// 3.1 学习中心只负责购买课程类订单的结果if ("60201".equals(orderType)){// 3.2 保存选课记录boolean flag = tablesService.saveChooseCourseStatus(chooseCourseId);if (!flag){XueChengPlusException.cast("保存选课记录失败");}}}}
}

MyCourseTablesService接口中定义方法更新选课记录的选课状态,同时向我的课程表添加选课记录(之前添加免费课程的时候已经实现过了)

public interface MyCourseTablesService {/*** 保存选课成功状态* @param chooseCourseId* @return*/public boolean saveChooseCourseSuccess(String chooseCourseId);
}
@Slf4j
@Service
public class MyCourseTablesServiceImpl implements MyCourseTablesService {@Override@Transactionalpublic boolean saveChooseCourseStatus(String chooseCourseId) {// 1. 根据选课id,查询对应的选课记录XcChooseCourse chooseCourse = chooseCourseMapper.selectById(chooseCourseId);if (chooseCourse == null) {log.error("接收到购买课程的消息,根据选课id未查询到课程,选课id:{}", chooseCourseId);return false;}// 2. 选课状态为未支付时,更新选课状态为选课成功if ("701002".equals(chooseCourse.getStatus())) {chooseCourse.setStatus("701001");int update = chooseCourseMapper.updateById(chooseCourse);if (update <= 0) {log.error("更新选课记录失败:{}", chooseCourse);}}// 3. 向我的课程表添加记录addCourseTables(chooseCourse);return true;}
}
public XcCourseTables addCourseTabls(XcChooseCourse xcChooseCourse){//选课成功了才可以向我的课程表添加String status = xcChooseCourse.getStatus();if(!"701001".equals(status)){XueChengPlusException.cast("选课没有成功无法添加到课程表");}XcCourseTables xcCourseTables = getXcCourseTables(xcChooseCourse.getUserId(), xcChooseCourse.getCourseId());if(xcCourseTables!=null){return xcCourseTables;}xcCourseTables = new XcCourseTables();BeanUtils.copyProperties(xcChooseCourse,xcCourseTables);xcCourseTables.setChooseCourseId(xcChooseCourse.getId());//记录选课表的逐渐xcCourseTables.setCourseType(xcChooseCourse.getOrderType());//选课类型xcCourseTables.setUpdateDate(LocalDateTime.now());int insert = courseTablesMapper.insert(xcCourseTables);if(insert<=0){XueChengPlusException.cast("添加我的课程表失败");}return xcCourseTables;
}

通知支付结果测试

选择一门已发布的收费课程,如果在我的课程表存储则删除记录及其相关的选课记录及订单记录信息

  • 进入课程详细页面,点击马上学习生成二维码进行支付
  • 支付完成点击“支付完成”,观察订单服务控制台是否发送消息(使用内网穿透工具)
  • 观察学习中心服务控制台是否接收到消息
  • 观察数据库中的消息表的相应记录是否已删除,我的选课表中是否有对应的选课记录

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/535180.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【一】【设计模式】类关系UML图

1. 继承&#xff08;Generalization&#xff09; 继承是对象间的一种层次关系&#xff0c;允许子类继承并扩展父类的功能。 UML线&#xff1a;带有空心箭头的直线&#xff0c;箭头指向基类&#xff08;父类&#xff09;。 class Parent {public void parentMethod() {System.…

STM32第十课:串口发送

一、usart串口 1.1 USART串口协议 串口通讯(Serial Communication) 是一种设备间非常常用的串行通讯方式&#xff0c;因为它简单便捷&#xff0c;因此大部分电子设备都支持该通讯方式&#xff0c;电子工程师在调试设备时也经常使用该通讯方式输出调试信息。在计算机科学里&…

提速电商效果图云渲染:优势与策略

云渲染为电商效果图提供了快速、高效的解决方案&#xff0c;利用其庞大的计算能力和并行处理优势&#xff0c;即便是复杂场景也能迅速渲染完成&#xff0c;大幅提高工作效率和加快产品上市节奏。 一、电商效果图有什么用&#xff1f; 电商效果图在电商行业中扮演着至关重要的角…

Midjourney封禁Stability AI:恶意爬取数据,致服务器瘫痪24小时

这两家 AI 图像生成公司之间发生什么事了。虽然 AI 生图领域&#xff0c;看似百花齐放&#xff0c;但论资排辈&#xff0c;Midjourney、Stability AI 还是很受用户欢迎的。 Midjourney 把 Stability AI 拉入黑名单了&#xff0c;禁止后者所有员工使用其软件&#xff0c;直至另…

Could not use APOC procedures

报错内容&#xff1a; Traceback (most recent call last):File "/root/anaconda3/envs/sakura/lib/python3.9/site-packages/langchain_community/graphs/neo4j_graph.py", line 205, in __init__self.refresh_schema()File "/root/anaconda3/envs/sakura/lib…

linux系统docker网络介绍

网络介绍 docker网络网络情况docker不启动&#xff0c;默认网络情况docker启动&#xff0c;网络情况 常用基本命令all命令查看网络创建新的网络查看网络源数据删除网络 docker网络作用docker网络模式bridgehostnonecontainer自定义网络模式容器实例内默认网络ip生产规则 docker…

Uni-app跟学笔记(一):新建项目、运行、tabbar、全局配置

文章目录 1&#xff09;新建项目2&#xff09;项目运行3&#xff09;项目结构4&#xff09;开发规范5&#xff09;globalStyle全局外观配置6&#xff09;pages页面配置7&#xff09;tabbar8&#xff09;Condition 本博客为 uni-app 此门课的跟学笔记&#xff0c;目的是便于个人…

建立网站的费用大概需要多少钱?

网站建设是很多人都熟悉的事情&#xff0c;因为现在无论是大公司还是中小企业都有自己的网站。 当你有了网站之后&#xff0c;你可以将公司的信息上传到网站上&#xff0c;这样可以让很多客户在浏览的时候更多地了解公司。 虽然现在大多数企业都有自己的网站&#xff0c;但是很…

铭文:探索比特币世界的数字印记

铭文是什么&#xff1f; 铭文指的是在某种物品&#xff08;如石头、硬币、平板等&#xff09;上刻有文字。在比特币领域&#xff0c;铭文指的是刻在聪&#xff08;satoshi&#xff09;上的元数据。比特币的最小单位是聪&#xff0c;1比特币可分为1亿聪。每个聪都通过序数理论进…

专题二 -滑动窗口 - leetcode 209. 长度最小的子数组 | 中等难度

leetcode 209. 长度最小的子数组 leetcode 209. 长度最小的子数组 | 中等难度1. 题目详情1. 原题链接2. 基础框架 2. 解题思路1. 题目分析2. 算法原理3. 时间复杂度 3. 代码实现4. 知识与收获 leetcode 209. 长度最小的子数组 | 中等难度 1. 题目详情 给定一个含有 n 个正整数…

5款可以免费使用的 UI 设计软件

在我们分享五个有用的原型工具之前&#xff0c;完成原型并优化界面。这是 UI 设计师的任务。UI 设计软件对设计师来说非常重要。UI 设计工具的使用是否直接影响最终结果&#xff0c;然后有人会问&#xff1a;UI 界面设计用什么软件&#xff1f;一些 UI 设计师和对 UI 设计感兴趣…

《深入Linux内核架构》第2章 进程管理和调度 (1)

目录 前言 2.1 进程优先级 2.2 进程生命周期 2.3 进程表示 2.3.1 进程类型 2.3.2 命名空间 2.3.3 进程ID号 2.3.4 进程关系 2.4 进程管理相关的系统调用 2.4.1 进程复制 2.4.2 内核线程 2.4.3 启动新程序 2.4.4 退出进程 前言 本章内容太多&#xff0c;分为两篇博…