SpringBoot中使用RocketMQ实现事务消息来保证分布式事务的一致性(有代码)

前言

分布式事务是分布式系统中非常常见的问题。是非常必要钱常见的。实现的方式也是多种多样。今天这个视频主要来分享一下RocketMQ实现事务消息来保证分布式事务的一致性。不知道大家使用过这种方式没有。这种分布式事务的原理其实和本地消息表一样。

本地消息表实现分布式事务的基本原理

本地消息表实现分布式事务的基本原理是通过两个阶段的事务处理来保证分布式环境中的数据一致性。以下是其基本步骤:
大致就是将本地消息表和要执行的第一个业务逻辑放在一个事务中,这样就可以一起成功一起失败。当第一阶段成功后。根据本地消息表中的记录去让下游的业务执行成功。扫描本地消息表中的消息然后执行下游业务。执行成功后在删除本地消息表中消息。不成功则重试。

1.本地事务:

在开始分布式事务时,首先执行本地操作。例如,更新某个服务的数据。
如果本地操作成功,事务进入下一步;如果失败,则回滚本地事务,并结束流程。
消息记录:
创建一条消息记录,通常称为“本地消息”,将需要在后续阶段执行的远程操作信息保存在本地数据库的一个消息表中。这个消息记录包含了执行远程操作所需的所有数据。
消息发送:

将本地消息发送到消息队列,如RocketMQ或其他消息中间件。此时,消息队列并不保证消息已经被消费,只是简单地将消息放入队列。
消息消费:

消息队列的消费者监听并处理消息。消费者通常是另一个服务,它接收消息并执行相应的远程操作,比如更新另一个服务的数据。
确认与补偿:

如果远程操作成功,消费者会发送一个确认信号(ACK),通知生产者操作已完成。这时,生产者可以删除本地消息表中的记录。
如果远程操作失败,消费者可能会尝试重新消费消息,或者根据策略回滚本地事务,然后通知生产者消息处理失败。
最终一致性:

尽管可能有短暂的延迟,但最终所有服务的数据状态会达到一致,因为本地操作和远程操作都会成功完成,或者在失败时都会回滚。
异常处理:

为了处理异常情况,系统通常会有超时和重试机制。如果消费者长时间没有确认,生产者可能会重新发送消息,或者在一定时间后回滚本地事务。
本地消息表方案的优点在于它避免了分布式事务的复杂性,实现了最终一致性,而不是强一致性。但是,它也有一些缺点,比如增加了系统的复杂性,需要维护额外的消息表,以及可能出现消息丢失或重复消费的问题。因此,它更适合对实时性要求不高,但对最终一致性有要求的场景。

本地消息表是一种最终一致性方案。并不是强一致性方案。

rocketmq事务消息

今天重点来说一下rocketmq事务消息是怎么做的。先理解一下Rocketmq事务消息
在这里插入图片描述

这种类似的图片挺多的。简单的来看一下 然后一会结合代码看一下。生产者先送消息到MQserve。然后mq去执行本地事务。通过回查的方式来保证第一阶段消息执行的成功。然后下游消费者来消费这个消息。

代码

我们需要实现分布式事务的两个服务分别是用户中心的服务以及im业务服务。功能是注册的功能。用户的注册信息基本信息存储在用户中心表。然后其他信息存储在im_user表里面。这个听起来有点奇怪。因为我这套代码是计划用户中心存储多个app的用户信息。通义提供鉴权服务什么的。然后基本信息存储在自己的业务用户表里面。大概是这样的设计思路。可以看代码。

	/*** 使用rocketmq实现事务* @param dto* @return* @throws Exception*/@ApiOperation("使用邮箱和密码注册")@PostMapping("/sys/registByWeb")public GenericResponse registByWebTX(@RequestBody SysRegisterForm dto) throws Exception {String uuid = UUID.randomUUID().toString() + new Random().nextInt();SysUserEntity sysUserEntity = new SysUserEntity();sysUserEntity.setPassword(dto.getPassword());sysUserEntity.setUsername(dto.getUsername());sysUserEntity.setOpenid(uuid);//注册需要的实体类RegisterFeign registerFeign = new RegisterFeign();registerFeign.setOpenid(uuid);registerFeign.setUsername(dto.getUsername());registerFeign.setEmail(dto.getEmail());TransactionSendResult sendResult= rocketMqHelper.transactionSend(Topic.REGISTER,MessageBuilder.withPayload(sysUserEntity).build(),registerFeign);String sendStatus = sendResult.getSendStatus().name();String localTXState = sendResult.getLocalTransactionState().name();logger.info("sendStatus---" + sendStatus);logger.info("localTXState---"+localTXState);// 注意:这里不能立即返回成功,因为事务还未完成,实际应用中可能需要设计异步回调通知客户端事务结果// 以下仅为示例逻辑,实际应用中需根据业务需求调整return GenericResponse.response(ServiceError.NORMAL);}

这里实现注册功能。然后
TransactionSendResult sendResult= rocketMqHelper.transactionSend(Topic.REGISTER,
MessageBuilder.withPayload(sysUserEntity).build(),registerFeign);
这行代码用来发送事务消息;
需要给rocketmq配置一个生产者端的消息监听器

@Slf4j
@RocketMQTransactionListener
public class UserRegistrationTransactionListener implements RocketMQLocalTransactionListener {@Autowiredprivate SysUserService sysUserService;@AutowiredSysUserDao sysUserDao;@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);// 执行本地事务RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;try {String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
//            OrderEntity order = GSON.fromJson(jsonString, OrderEntity.class);SysUserEntity  sysUserEntity = JSON.parseObject(jsonString, SysUserEntity.class);sysUserService.saveUser(sysUserEntity);} catch (Exception e) {log.error(">>>> exception message={} <<<<",e.getMessage());result = RocketMQLocalTransactionState.UNKNOWN;}
//        return  RocketMQLocalTransactionState.UNKNOWN;return result;}/*** 步骤四* 描述:mq回调检查本地事务执行情况* @param msg* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {log.info(">>>> TX message listener check local transaction, message={} <<<<",msg.getPayload());// 检查本地事务RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;try {String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);SysUserEntity  sysUserEntity = JSON.parseObject(jsonString, SysUserEntity.class);
//            OrderEntity order = GSON.fromJson(jsonString, OrderEntity.class);
//            List<OrderEntity> list = orderService.selectOrder(order);List<Map> list = sysUserDao.queryUserByOpenid(sysUserEntity.getOpenid(),sysUserEntity.getUsername());if(list.size()<=0){result = RocketMQLocalTransactionState.UNKNOWN;}} catch (Exception e) {// 异常就回滚log.error(">>>> exception message={} <<<<",e.getMessage());result = RocketMQLocalTransactionState.ROLLBACK;}return result;}}

@RocketMQTransactionListener注意这个注解不能落下。
然后可以配置一下下游消费者。

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumeRegister", topic = "TX_REGISTER_ADD",consumeMode = ConsumeMode.ORDERLY)
public class RegisterListener implements RocketMQListener<RegisterFeign> {@Autowiredprivate WeChatService weChatService;/**** @param dto*/@Overridepublic void onMessage(RegisterFeign dto) {log.info("接收到消息,开始消费..dto" + dto);weChatService.registByOpenid(dto);}}

我们在这个地方来接受一下消息。然后调用这个服务的保存。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/682178.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ARM(2)ARMv8基础知识

目录 一、异常 1.1异常等级的定义 1.2异常的种类 1.2.1同步异常和异步异常 1.3改变异常等级 1.4异常后的处理 1.4.1异常处理相关寄存器 1.4.2系统调用 1.4.3对EL2/EL3的系统调用 1.4.4异常返回 1.4.5异常处理流程 二、安全状态 三、执行状态 本文介绍以下内容&…

javaWeb快速部署到tomcat阿里云服务器

目录 准备 关闭防火墙 配置阿里云安全组 点击控制台 点击导航栏按钮 点击云服务器ECS 点击安全组 点击管理规则 点击手动添加 设置完成 配置web服务 使用yum安装heepd服务 启动httpd服务 查看信息 部署java通过Maven打包好的war包项目 Maven打包项目 上传项目 …

Redis(无中心化集群搭建)

文章目录 1.无中心化集群1.基本介绍2.集群说明 2.基本环境搭建1.部署规划&#xff08;6台服务器&#xff09;2.首先删除上次的rdb和aof文件&#xff08;对之前的三台服务器都操作&#xff09;1.首先分别登录命令行&#xff0c;关闭redis2.清除/root/下的rdb和aof文件3.把上次的…

分布式锁讲解

概括 分布式锁是一种用于在分布式系统中实现同步机制的锁。在单机系统中&#xff0c;我们可以使用如Java中的synchronized关键字或者 ReentrantLock来实现线程间的同步&#xff0c;但在分布式系统中&#xff0c;由于多个节点&#xff08;服务器&#xff09;之间的并发操作&am…

Redis持久化策略——Java全栈知识(17)

Redis持久化 1、Redis 持久化的三种方式 1、RDB&#xff1a; 以快照的方式将此刻 Redis 中的数据以二进制的文件形式保存在磁盘中。 RDB 的优点是&#xff1a;快照文件小、恢复速度快&#xff0c;适合做备份和灾难恢复。 RDB 的缺点是&#xff1a;定期更新可能会丢数据&#…

【stomp 实战】spring websocket 接收消息源码分析

后台消息的发送过程&#xff0c;我们通过spring websocket用户消息发送源码分析已经了解了。我们再来分析一下后端接收消息的过程。这个过程和后端发送消息过程有点类似。 前端发送消息 前端发送消息给服务端的示例如下&#xff1a; 发送给目的/app/echo一个消息。 //主动发…

mysql数据库调优篇章1

目录 1.认识数据库中日志的作用2.增加mysql数据库中my.ini 基本配置3.增加my.ini中参数配置4.查看已经执行过的sql语句过去执行时间5.找出慢查询的sql6. SHOW VARIABLES LIKE ‘innodb_read_io_threads’; SHOW VARIABLES LIKE ‘innodb_write_io_threads’; SHOW VARIABLES LI…

Python从0到POC编写--实用小脚本02

爆破脚本&#xff1a; 爆破脚本也是我们经常使用的东西 这里就简单讲讲后台爆破脚本的编写吧 在编写之前&#xff0c;我们先通过访问网站去看看情况 首先我们可以先登录看看 输入账号 admin &#xff0c;密码 12345 后 登录失败&#xff0c;提示 用户名或密码错误 在输入…

振动分析的一些概念

一.时域分析 振动测试领域中&#xff0c;通常使用标准是ISO 10816系列标准&#xff0c;其要去使用有效值&#xff08;RMS&#xff09;来表示震动信号的能量大小&#xff0c;并提供一组限制值&#xff0c;以帮助用户评估机器的振动水平是否正常。 1.位移&#xff1a; 峰峰&…

【备战软考(嵌入式系统设计师)】10 - 软件工程基础

这一部分的内容是概念比较多&#xff0c;不要理解&#xff0c;去感受。 涉及的知识点是嵌入式系统开发和维护的部分&#xff0c;也就是和管理相关的&#xff0c;而不是具体如何进行嵌入式系统开发的细节。 系统开发生命周期 按照顺序有下面几个阶段&#xff0c;我们主要要记…

Android MediaCodec 简明教程(七):使用 MediaCodec 解码到 OES 纹理上

系列文章目录 Android MediaCodec 简明教程&#xff08;一&#xff09;&#xff1a;使用 MediaCodecList 查询 Codec 信息&#xff0c;并创建 MediaCodec 编解码器Android MediaCodec 简明教程&#xff08;二&#xff09;&#xff1a;使用 MediaCodecInfo.CodecCapabilities 查…

书生浦语训练营第2期-第7节作业

一、基础作业 二、进阶作业