Spring Boot + 事务钩子函数,打造高效支付系统!

今天,我继续安利一个独门绝技:Spring 事务的钩子函数。

单纯的讲技术可能比较枯燥乏味。

接下来,我将以一个实际的案例来描述Spring事务钩子函数的正确使用姿势。

一、案例背景

拿支付系统相关的业务来举例。在支付系统中,我们需要记录每个账户的资金流水(记录用户A因为哪个操作扣了钱,因为哪个操作加了钱),这样我们才能对每个账户的账做到心中有数,对于支付系统而言,资金流水的数据可谓是最重要的。因此,为了防止支付系统的老大徇私舞弊,CTO提了一个流水存档的需求:

要求支付系统对每个账户的资金流水做一份存档,要求支付系统在写流水的时候,把流水相关的信息以消息的形式推送到kafka,由存档系统消费这个消息并落地到库里(这个库只有存档系统拥有写权限)。

整个需求的流程如下所示:

图片

整个需求的流程还是比较简单的,考虑到后续会有其他事业部也要进行数据存档操作,CTO建议支付系统团队内部开发一个二方库,这个二方库的主要功能就是发送消息到kafka中去。

二、确定方案

既然要求开发一个二方库,因此,我们需要考虑如下几件事情:

1、技术栈使用的springboot,因此,这里最好以starter的方式提供

2、二方库需要发送消息给kafka,最好是二方库内部基于kafka生产者的api创建生产者,不要使用Spring自带的kafkaTemplate,因为集成方有可能已经使用了kafkaTemplate。不能与集成方造成冲突。

3、减少对接方的集成难度、学习成本,最好是提供一个简单实用的api,业务侧能简单上手。

4、发送消息这个操作需要支持事务,尽量不影响主业务

在上述的几件事情中,最需要注意的应该就是第4点:发送消息这个操作需要支持事务,尽量不影响主业务。

这是什么意思呢?

首先,尽量不影响主业务,这个最简单的方式就是使用异步机制。

其次,需要支持事务是指:假设我们的api是在事务方法内部调用的,那么我们需要保证事务提交后再执行这个api。

那么,我们的流水落地api应该要有这样的功能:

图片

内部可以判断当前是否存在事务,如果存在事务,则需要等事务提交后再异步发送消息给kafka。

如果不存在事务则直接异步发送消息给kafka。

而且这样的判断逻辑得放在二方库内部才行。

那现在摆在我们面前的问题就是:我要如何判断当前是否存在事务,以及如何在事务提交后再触发我们自定义的逻辑呢?

三、TransactionSynchronizationManager显神威

这个类内部所有的变量、方法都是static修饰的,也就是说它其实是一个工具类。是一个事务同步器。

下述是流水落地API的伪代码,这段代码就解决了我们上述提到的疑问:

java复制代码private final ExecutorService executor = Executors.newSingleThreadExecutor();public void sendLog() {// 判断当前是否存在事务if (!TransactionSynchronizationManager.isSynchronizationActive()) {// 无事务,异步发送消息给kafkaexecutor.submit(() -> {// 发送消息给kafkatry {// 发送消息给kafka} catch (Exception e) {// 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常}});return;}// 有事务,则添加一个事务同步器,并重写afterCompletion方法(此方法在事务提交后会做回调)TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {@Overridepublic void afterCompletion(int status) {if (status == TransactionSynchronization.STATUS_COMMITTED) {// 事务提交后,再异步发送消息给kafkaexecutor.submit(() -> {try {// 发送消息给kafka} catch (Exception e) {// 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常}});}}});}

代码比较简单,其主要是TransactionSynchronizationManager的使用。

3.1、判断是否存在事务?TransactionSynchronizationManager.isSynchronizationActive() 方法显神威

我们先看下这个方法的源码:

java复制代码// TransactionSynchronizationManager.java类内部的部分代码private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =new NamedThreadLocal<>("Transaction synchronizations");public static boolean isSynchronizationActive() {return (synchronizations.get() != null);
}

很明显,synchronizations是一个线程变量(ThreadLocal)。

那它是在什么时候set进去的呢?这里的话,可以参考下这个方法:org.springframework.transaction.support.TransactionSynchronizationManager#initSynchronization,其源码如下所示:

java复制代码/*** Activate transaction synchronization for the current thread.* Called by a transaction manager on transaction begin.* @throws IllegalStateException if synchronization is already active*/
public static void initSynchronization() throws IllegalStateException {if (isSynchronizationActive()) {throw new IllegalStateException("Cannot activate transaction synchronization - already active");}logger.trace("Initializing transaction synchronization");synchronizations.set(new LinkedHashSet<>());
}

由源码中的注释也可以知道,它是在事务管理器开启事务时调用的。换句话说,只要我们的程序执行到带有事务特性的方法时,就会在线程变量中放入一个LinkedHashSet,用来标识当前存在事务。只要isSynchronizationActive返回true,则代表当前有事务。因此,结合这两个方法我们是指能解决我们最开始提出的疑问:**要如何判断当前是否存在事务**

3.2、如何在事务提交后触发自定义逻辑?TransactionSynchronizationManager.registerSynchronization()方法显神威

我们来看下这个方法的源代码:

java复制代码/*** Register a new transaction synchronization for the current thread.* Typically called by resource management code.* <p>Note that synchronizations can implement the* {@link org.springframework.core.Ordered} interface.* They will be executed in an order according to their order value (if any).* @param synchronization the synchronization object to register* @throws IllegalStateException if transaction synchronization is not active* @see org.springframework.core.Ordered*/
public static void registerSynchronization(TransactionSynchronization synchronization)throws IllegalStateException {Assert.notNull(synchronization, "TransactionSynchronization must not be null");if (!isSynchronizationActive()) {throw new IllegalStateException("Transaction synchronization is not active");}synchronizations.get().add(synchronization);
}

这里又使用到了synchronizations线程变量,我们在判断是否存在事务时,就是判断这个线程变量内部是否有值。

那我们现在想在事务提交后触发自定义逻辑和这个有什么关系呢?

我们在上面构建流水落地api的伪代码中有向synchronizations内部添加了一个TransactionSynchronizationAdapter,内部并重写了afterCompletion方法,其代码如下所示:

java复制代码TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {@Overridepublic void afterCompletion(int status) {if (status == TransactionSynchronization.STATUS_COMMITTED) {// 事务提交后,再异步发送消息给kafkaexecutor.submit(() -> {try {// 发送消息给kafka} catch (Exception e) {// 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常}});}}});

我们结合registerSynchronization的源码来看,其实这段代码主要就是向线程变量内部的LinkedHashSet添加了一个对象而已,但就是这么一个操作,让Spring在事务执行的过程中变得“有事情可做”

这是什么意思呢?是因为Spring在执行事务方法时,对于操作事务的每一个阶段都有一个回调操作,比如:trigger系列的回调

图片

invoke系列的回调

图片

而我们现在的需求就是在事务提交后触发自定义的函数,那就是在invokeAfterCommit和invokeAfterCompletion这两个方法来选了。

首先,这两个方法都会拿到所有TransactionSynchronization的集合(其中会包括我们上述添加的TransactionSynchronizationAdapter)。

但是要注意一点:invokeAfterCommit只能拿到集合,invokeAfterCompletion除了集合还有一个int类型的参数,而这个int类型的参数其实是当前事务的一种状态。也就是说,如果我们重写了invokeAfterCompletion方法,我们除了能拿到集合外,还能拿到当前事务的状态。

因此,此时我们可以根据这个状态来做不同的事情,比如:可以在事务提交时做自定义处理,也可以在事务回滚时做自定义处理等等。

四、总结

上面有说到,我们判断当前是否存在事务、添加钩子函数都是依赖线程变量的。因此,我们在使用过程中,一定要避免切换线程。否则会出现不生效的情况。

最后说一句(求关注!别白嫖!)

如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。

关注公众号:woniuxgg,在公众号中回复:笔记  就可以获得蜗牛为你精心准备的java实战语雀笔记,回复面试、开发手册、有超赞的粉丝福利!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/630337.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

22长安杯电子取证复现(检材一,二)

检材一 先用VC容器挂载&#xff0c;拿到完整的检材 从检材一入手&#xff0c;火眼创建案件&#xff0c;打开检材一 1.检材1的SHA256值为 计算SHA256值&#xff0c;直接用火眼计算哈希计算 9E48BB2CAE5C1D93BAF572E3646D2ECD26080B70413DC7DC4131F88289F49E34 2.分析检材1&am…

网站添加PWA支持,仅需三步,无视框架的类型

总结起来&#xff0c;网站配置PWA简单步骤为&#xff1a; 编写 manifest.json&#xff1b;编写 serviceWorker.js&#xff1b;在 index.html 引入上述两个文件&#xff1b;把上述三个文件放在网站根目录(或者同一目录下)&#xff1b;网站需要部署在https环境才能触发&#xff…

2010-2022年各省新质生产力测算数据(含原始数据+计算代码+计算结果)

2010-2022年各省新质生产力测算数据&#xff08;含原始数据计算代码计算结果&#xff09; 1、时间&#xff1a;2010-2022年 2、来源&#xff1a;国家统计局、各省年鉴、能源年鉴、环境年鉴 3、范围&#xff1a;31省 4、指标&#xff1a; 省份、年份、分地区授权专利数&…

在Windows 10中切换用户的几种方法,总有一种适合你

序言 如果你不想关闭计算机&#xff0c;又想从计算机上的另一个用户帐户启动&#xff0c;如果在计算机中已创建了多个本地帐户&#xff0c;则可以快速切换到另一用户。在这里&#xff0c;我们将分享在Windows 10中切换用户帐户的有效方法。 在登录屏幕切换用户 对于学校或公…

永久免费次数ChatGPT国内镜像网站【强烈建议收藏】

gctohttps://chat.tomyres.com/#/pages/web/index?n0 觉得分享的网站好用的话&#xff0c;记得点赞收藏哦。

算法学习笔记:Bi-LSTM和Bi-GRU

这篇文章的作为前几篇RNN\LSTM\RNN的后续之作&#xff0c;主要就是补充一个这两个哥的变体&#xff0c;想详细了解RNN\LSTM\GRU的详细理论和公式推导以及代码的请前往下面链接&#xff1a; 算法学习笔记&#xff1a;循环神经网络&#xff08;Recurrent Neural Network)-CSDN博…

C语言进阶课程学习记录-函数指针的阅读

C语言进阶课程学习记录-函数指针的阅读 5个标识符含义解析技巧 本文学习自狄泰软件学院 唐佐林老师的 C语言进阶课程&#xff0c;图片全部来源于课程PPT&#xff0c;仅用于个人学习记录 5个标识符含义解析 int (*p1) (int* , int (*f) ( int* ) );定义了指针p1,指向函数&#…

2024年免费云服务器推荐,小编亲测好用!

随着云计算技术的飞速发展&#xff0c;云服务器以其弹性、高效、安全的特性&#xff0c;成为众多企业和个人用户的首选。尽管市面上有众多收费的云服务器产品&#xff0c;但免费的云服务器仍然吸引着大量用户&#xff0c;尤其是初学者和预算有限的用户。下面&#xff0c;我们就…

vue框架中的组件通信

vue框架中的组件通信 一.组件通信关系二.父子通信1.props 校验2.prop & data、单向数据流 二.非父子通信-event bus 事件总线三.非父子通信 (拓展) - provide & inject四.v-model简化父子通信代码五. .sync修饰符 一.组件通信关系 组件关系分类&#xff1a; 1.父子关系…

C++修炼之路之反向迭代器和非模板参数,模板特化,分离编译

目录 前言 一&#xff1a;反向迭代器 二&#xff1a;非类型模板参数 三&#xff1a;模板的特化 四&#xff1a;模板的分离编译 五&#xff1a;模板的优点与缺点 接下来的日子会顺顺利利&#xff0c;万事胜意&#xff0c;生活明朗-----------林辞忧 前言 在vector&am…

AIDE:自动驾驶目标检测的自动数据引擎

AIDE&#xff1a;自动驾驶目标检测的自动数据引擎 摘要IntroductionRelated WorksMethodData FeederModel Updater4 Experiments 摘要 自动驾驶车辆&#xff08;AV&#xff09;系统依赖于健壮的感知模型作为安全保证的基石。然而&#xff0c;道路上遇到的物体表现出长尾分布&a…

selenium 下载文件取消安全下载的方法

问题描述 我要从一个网站上下载文件&#xff0c;谷歌浏览器总是自动阻止下载&#xff0c;并询问我是否保留。 可是&#xff0c;我想要的是不要询问&#xff0c;默认下载即可。 运行环境 OS: macOSselenium: 4.19.0python: 3.10.11Chrome: 124.0.6367.62selenium chromedrive…