Go-zero中分布式事务的实现(DTM分布式事务管理器,在一个APi中如何调用两个不同服务的rpc层,并保证两个不同服务之间的业务逻辑同时成功)

涉及到的相关技术

     1.DTM分布式事务管理器,解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。

      2.SAGA事务模式,SAGA事务模式是DTM中常用的一种模式,简单易上手.(当然还有其它更多的事务模式,这里采用的SAGA只不过是其中一种较为简单的方法)

      3.Go-zero框架,ETCD服务注册...

更多内容移步至:go-zero 缩短从需求到上线的距离 和 介绍 | DTM开源项目文档

业务场景

        如果是在单体架构的业务当中,是不需要用到分布式事务的.单体架构中,涉及到需要保证多个事务同时成功的场景,只需要创建一个全局的事务对象 如:tx := db.Begin(),然后统一用这一个tx去管理接下来的业务逻辑即可.

        不清楚在一个api中如何调用其它服务rpc的可以看看我的另一篇博客中的一种解决办法:

go-zero标准的项目结构,以及如何使用docker-compose部署道linux服务器上-CSDN博客

        但是在go-zero框架的这种微服务中,比如说:我在一个用户服务的api中调用了用户服务rpc中注册的业务,并且同时还调用了标签服务的rpc层中的选择标签的业务. 那么,此时我就需要保证用户的注册和标签的选择这两个在不同服务下执行的业务逻辑同时成功.(总不能用户账号密码插入到的表中,但是突然断网了,导致标签没有选择上去吧,这个是不符合我的业务的).

DTM 环境搭建(Windows本地搭建)

        !!!!!!!!!!!!!! 这个环境请注意,是需要在你本地去搭建的,至于为什么,我会在后面解释,最重要的先把环境搭建起来吧! 我采用的是docker-compose去搭建.(如果不了解windows电脑如何配置docker环境,可以移步:)

Windows11电脑是如何搭建docker环境的-CSDN博客

        废话不多说,首先从搭建环境讲起.(我这里采用的是docker-compose搭建我需要的环境)

上图就是项目的结构

在dtm和etcd的目录下面各自新建一个Dockerfile文件,Dockerfile都不需要过多的配置,只需要用到最基础的镜像即可.在dtm的目录下还需要新建一个config.yml文件.

DTM下Dokcerfile以及config.yml的编写

FROM yedf/dtm:latestLABEL maintainer="zyf021026 <shichuxin6@163.com>"
# 指定要存储trans状态的存储驱动
# Store:### 默认存储驱动
#   Driver: 'boltdb'### redis 存储驱动
#   Driver: 'redis'
#   Host: 'localhost'
#   User: ''
#   Password: ''
#   Port: 6379### mysql 存储驱动
#   Driver: 'mysql'
#   Host: 'mysql'
#   User: 'root'
#   Password: '123456'
#   Port: 3306### postgres 存储驱动
#   Driver: 'postgres'
#   Host: 'localhost'
#   User: 'postgres'
#   Password: 'mysecretpassword'
#   Port: '5432'### 以下配置仅适用于 postgres/mysql 驱动
#   MaxOpenConns: 500
#   MaxIdleConns: 500
#   ConnMaxLifeTime: 5
#   TransGlobalTable: 'dtm.trans_global'
#   TransBranchOpTable: 'dtm.trans_branch_op'### 以下配置仅适用于 redis/boltdb 驱动
#   DataExpire: 604800 # Trans 过期时间
#   RedisPrefix: '{}'  # Redis 存储前缀MicroService:Driver: 'dtm-driver-gozero'           # 要处理注册/发现的驱动程序的名称Target: 'etcd://your-ip:2379/dtmservice' # 注册 dtm 服务的 etcd 地址EndPoint: 'your-ip:36790'# 以下配置的单位为'秒'
# TransCronInterval: 3
# TimeoutToFail: 35
# RetryInterval: 10# 日志等级
# LogLevel: 'info'

ETCD的Dockerfile文件编写

FROM bitnami/etcd:latestLABEL maintainer="zyf021026 <shichuxin6@163.com>"

使用docker-compose 构建镜像,启动容器


version: '3'networks:backend:driver: bridge######## 项目依赖的环境,启动项目之前要先启动此环境 #######
services:etcd:build:context: etcdenvironment:- TZ=Asia/Shanghai- ALLOW_NONE_AUTHENTICATION=yesports: # 设置端口映射- "2379:2379"networks:- backendrestart: alwaysdtm:build:context: ./dtmenvironment:- TZ=Asia/Shanghaientrypoint:- "/app/dtm/dtm"- "-c=/app/dtm/configs/config.yaml"privileged: truevolumes:- ./dtm/config.yml:/app/dtm/configs/config.yaml # 将 dtm 配置文件挂载到容器里ports:- "36789:36789"- "36790:36790"networks:- backendrestart: alwaysdepends_on:- etcd

   在根目录下面执行docker-compose up -d 将需要的环境搭建起来

执行如下图中的命令:

        新建子事务屏障的数据库(库名和表名请不要修改) 可以作为独立的一个数据库使用,没有必要把自己的项目数据库名称改为dtm_barrier

/*Navicat Premium Data TransferSource Server         : LinkSource Server Type    : MySQLSource Server Version : 50743Source Host           : 39.101.77.206:3306Source Schema         : dtm_barrierTarget Server Type    : MySQLTarget Server Version : 50743File Encoding         : 65001Date: 03/03/2024 13:57:48
*/SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for barrier
-- ----------------------------
DROP TABLE IF EXISTS `barrier`;
CREATE TABLE `barrier`  (`id` bigint(22) NOT NULL AUTO_INCREMENT,`trans_type` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`gid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`branch_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`op` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`barrier_id` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`reason` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '' COMMENT 'the branch type who insert this record',`create_time` datetime NULL DEFAULT CURRENT_TIMESTAMP,`update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`) USING BTREE,UNIQUE INDEX `gid`(`gid`, `branch_id`, `op`, `barrier_id`) USING BTREE,INDEX `create_time`(`create_time`) USING BTREE,INDEX `update_time`(`update_time`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1482 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;

SAGA事务模式的使用

        简单说明一下,SAGA分布式事务模式,是没有办法携带返回值的,因此尽量此处要避免需要有返回值的业务场景.

        直接用代码来展示SAGA事务模式的使用方法吧!

    用户注册服务PRC的编写以及事务失败补偿机制的编写

这里不再演示proto文件是如何编写的

用户注册服务的rpc

func (l *UserCreateLogic) UserCreate(in *user.UserCreateRequest) (pd *user.UserCreateResponse, endErr error) {// 获取 RawDB// 注册db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()// 获取子事务屏障对象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)if err != nil {return nil, status.Error(500, err.Error())}// 开启子事务屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) error {// 加密密码pwd, _ := bcrypt.GetPwd(in.Password)// 插入用户数据_, err = tx.Exec("INSERT INTO users (id , created_at, updated_at, username, password, avatar, phone) VALUES (?,?, ?, ?, ?, ?, ?)", in.Id, time.Now(), time.Now(), in.Username, pwd, in.Avatar, in.Phone)//返回子事务执行失败if err != nil {return err}return nil})if err != nil {return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) //如果失败,不再重试,直接回滚}return &user.UserCreateResponse{}, endErr
}

用户注册服务rpc的失败补偿 (如果注册服务的rpc失败,就会执行相应的补偿方法)

func (l *UserCreateRevertLoginLogic) UserCreateRevertLogin(in *user.UserCreateRequest) (pd *user.UserCreateResponse, err error) {fmt.Println("用户标签回滚开始--->")// 获取 RawDBdb, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()// 获取子事务屏障对象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)if err != nil {return nil, status.Error(500, err.Error())}// 开启子事务屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) error {fmt.Println("注册事务走入了补偿")//删除插入的标签数据 和 用户数据_, err = tx.Exec("DELETE FROM tb_user_tag where user_id = ?", in.Id)_, err = tx.Exec("DELETE FROM users where id = ?", in.Id)//返回子事务执行失败if err != nil {return err}return nil})if err != nil {fmt.Println("failed---->", err)return nil, err}fmt.Println("删除成功")fmt.Println("用户标签回滚结束--->")return &user.UserCreateResponse{}, nil
}

        标签服务Rpc的编写以及事务失败补偿机制的编写

标签服务的rpc

func (l *SignUserChooseTagLogic) SignUserChooseTag(in *tag.UserChooseTagRequest) (*tag.UserChooseTagRequest, error) {// 获取 RawDB// 注册账号时,选择标签db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()if err != nil {return nil, status.Error(500, err.Error())}// 获取子事务屏障对象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)// 开启子事务屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) (err error) {// 用户注册时选择标签var exists boolerr = tx.QueryRow("SELECT EXISTS(SELECT 1 FROM tb_user_tag WHERE tag_id = ? and user_id = ?)", in.TagId, in.UserId).Scan(&exists)if err != nil {return err}if exists {return fmt.Errorf("标签重复选择")}fmt.Println("开始插入标签")_, err = tx.Exec("INSERT INTO tb_user_tag (tb_user_tag.created_at , tb_user_tag.updated_at , tag_id, user_id) VALUES (?,?,?, ?)", time.Now(), time.Now(), in.TagId, in.UserId)if err != nil {return fmt.Errorf("标签选择失败")}return nil})if err != nil {return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) //事务失败不再重试,直接回滚}return &tag.UserChooseTagRequest{}, nil
}

标签选择失败补偿的rpc

func (l *SignUserChooseTagRevertLogic) SignUserChooseTagRevert(in *tag.UserChooseTagRequest) (*tag.UserChooseTagRequest, error) {fmt.Println("用户标签SignUserChooseTagRevert--->开始")// 获取 RawDBdb, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()if err != nil {return nil, status.Error(500, err.Error())}// 获取子事务屏障对象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)// 开启子事务屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) (err error) {fmt.Println("注册时选择标签进入了补偿")logc.Info(l.ctx)//删除记录_, err = tx.Exec("DELETE FROM tb_user_tag where tag_id = ? and user_id = ?", in.TagId, in.UserId)return err})if err != nil {return nil, err}fmt.Println("用户标签SignUserChooseTagRevert--->结束")return &tag.UserChooseTagRequest{}, nil
}

至此rpc层的业务逻辑全部编写完毕,但请一定要注意每一个rpc的返回值,一定要按照 如&tag.UserChooseTagRequest{}返回,不能简单的返回一个nil值.否则会导致事务一直无法提交

        API层的编写


func (l *SignUpLogic) SignUp(req *types.UserCreateRequest) (resp *types.UserCreateResponse, err error) {//首先判断用户是否存在_, err = l.svcCtx.UserRpc.UserIsExists(l.ctx, &user.UserCreateRequest{Phone: req.Phone,})if err != nil {return nil, err}// 获取UserRpc 的BuildTargetuserRpcBuildServer, err := l.svcCtx.Config.UserRpc.BuildTarget()if err != nil {return nil, status.Error(100, "用户注册异常")}// 获取TagRpc 的BuildTargettagRpcBuildServer, err := l.svcCtx.Config.TagRpc.BuildTarget()if err != nil {return nil, status.Error(100, "标签选择异常")}empty := user.Empty{}//dtm服务的etcd注册地址var dtmServer = l.svcCtx.Config.Dtm//dtmServer := "etcd://etcd:2379/dtmservice"fmt.Println(dtmServer)// 创建一个gidgid := dtmgrpc.MustGenGid(dtmServer)//创建一个自增idif _, err := l.svcCtx.UserRpc.AddUserId(l.ctx, &empty); err != nil {return nil, fmt.Errorf("CREATE user id error:%v", err)}userID, _ := l.svcCtx.UserRpc.NextUserID(l.ctx, &empty)saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).Add(tagRpcBuildServer+"/tag.TagSign/SignUserChooseTag", tagRpcBuildServer+"/tag.TagSign/SignUserChooseTagRevert", &tag.UserChooseTagRequest{UserId: userID.NextUserId,TagId:  req.StartTagId,}).Add(userRpcBuildServer+"/user.UserService/UserCreate", userRpcBuildServer+"/user.UserService/UserCreateRevertLogin", &user.UserCreateRequest{Username: req.Username,Password: req.Password,Avatar:   req.Avatar,Phone:    req.Phone,Id:       userID.NextUserId,})//事务提交if err := saga.Submit(); err != nil {//自增主键减少1if _, err := l.svcCtx.UserRpc.DecUserID(l.ctx, &empty); err != nil {logx.Error(err)}logx.Error(err)return nil, fmt.Errorf("saga submit error:%v", err)}return &types.UserCreateResponse{}, nil
}

上面代码的逻辑,相信如果各位接触到微服务,一定是可以理解的,由于saga的事务模式没有返回值,所以我通过redis生成一个自增id来使用,而不再采用mysql的自增主键id.

上面代码中的地址可以在rpc生成的pb.go中找到

自己的理解

        经历了长度一周多对分布式事务的研究,写一点自己的简单理解吧!(比较浅显)

        saga事务模式需要自己写事务的补偿方法,子事务屏障内的事务执行失败之后,就会执行对应的事务补偿方法!即回滚事务.补偿方法内写的便是对这一次执行的插入,修改语句的相反操作.比如我增加某一条数据,补偿内就写上对删除的操作.

        感觉和MySQL的Undo log 回滚日志很相似啊!Undo log日志会记录更新前的数据到日志中,是在一个事务下执行过程中,在还没有提交之前,如果发生意外,就可以通过这个日志回滚到事务执行之前的数据了.

        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/508509.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

14-RPC-自研微服务框架

RPC RPC 框架是分布式领域核心组件&#xff0c;也是微服务的基础。 RPC &#xff08;Remote Procedure Call&#xff09;全称是远程过程调用&#xff0c;相对于本地方法调用&#xff0c;在同一内存空间可以直接通过方法栈实现调用&#xff0c;远程调用则跨了不同的服务终端&a…

【大厂AI课学习笔记NO.64】机器学习开发框架

机器学习开发框架本质上是一种编程库或工具&#xff0c;目的是能够让开发人员更容易、更快速地构建机器学习模型。 机器学习开发框架封装了大量的可重用代码&#xff0c;可以直接调用&#xff0c;目的是避免“重复造轮子’大幅降低开发人员的开发难度&#xff0c;提高开发效率…

mongodb 图形界面工具 -- Studio 3T(下载、安装、连接mongodb数据库)

目录 mongodb 图形界面工具 -- Studio 3T下载安装第一次使用&#xff1a;注册添加一个连接&#xff08;连接 mongodb 数据库&#xff09;1、点击【添加新连接】&#xff0c;选择【手动配置我的连接设置】2、对 Server 设置连接数据3、连接的用户认证设置&#xff08;创建数据库…

某大型制造企业数字化转型规划方案(附下载)

目录 一、项目背景和目标 二、业务现状 1. 总体应用现状 2. 各模块业务问题 2.1 设计 2.2 仿真 2.3 制造 2.4 服务 2.5 管理 三、业务需求及预期效果 1. 总体业务需求 2. 各模块业务需求 2.1 设计 2.2 仿真 2.3 制造 2.4 服务 2.5 管理 四、…

代码随想录刷题笔记 DAY 35 | 无重叠区间 No.435 | 划分字母区间 No.763 | 合并区间 No.56

文章目录 Day 3501. 无重叠区间&#xff08;No. 435&#xff09;<1> 题目<2> 笔记<3> 代码 02. 划分字母区间&#xff08;No. 763&#xff09;<1> 题目<2> 笔记<3> 代码 03. 合并区间&#xff08;No. 56&#xff09;<1> 题目<2&g…

UE5中实现后处理深度描边

后处理深度描边可以通过取得边缘深度变化大的区域进行描边&#xff0c;一方面可以用来做角色的等距内描边&#xff0c;避免了菲尼尔边缘光不整齐的问题&#xff0c;另一方面可以结合场景扫描等特效使用&#xff0c;达到更丰富的效果&#xff1a; 后来解决了开启TAA十字线和锯齿…

华为配置攻击检测功能示例

配置攻击检测功能示例 组网图形 图1 配置攻击检测功能示例组网图 业务需求组网需求数据规划配置思路配置注意事项操作步骤配置文件 业务需求 企业用户通过WLAN接入网络&#xff0c;以满足移动办公的最基本需求。且在覆盖区域内移动发生漫游时&#xff0c;不影响用户的业务使用。…

Vue3:使用 Composition API 不需要 Pinia

在 Vue.js 开发的动态环境中&#xff0c;在单个组件中处理复杂的业务逻辑可能会导致笨重的文件和维护噩梦。虽然 Pinia 提供集中式状态管理&#xff0c;但仅依赖它来处理复杂的业务逻辑可能会导致代码混乱。本文探讨了使用 Composition API 的替代方法&#xff0c;说明开发人员…

DAP-Link DIY复刻指南

DAP-Link DIY复刻指南 文章目录 DAP-Link DIY复刻指南1. 概述2. 获取工程资源2.1 工具安装2.2 源码拉取2.3 硬件资源获取 3. 工程下载验证3.1 下载bootload3.2 下载 APP3.3 修改IO配置 4. 验证4.1 虚拟串口验证4.2 Keil 无法识别 DAPLink&#xff1f;4.3 keil 可以识别DAPLink但…

Vue2+ElementUI列表、表格组件的封装

Vue2ElementUI列表组件的封装&#xff1a;引言 在日常开发中&#xff0c;我们经常会遇到需要展示列表数据的场景。ElementUI 提供的 el-table 组件是一个功能强大的表格组件&#xff0c;可以满足大部分的需求。但是&#xff0c;在实际应用中&#xff0c;我们往往需要根据业务需…

Java基础 - 7 - 常用API(三)

API&#xff08;全称 Application Programming Interface&#xff1a;应用程序编程接口&#xff09; API就是Java帮我们已经写好的一些程序&#xff0c;如类、方法等&#xff0c;可以直接拿过来用 JDK8 API文档&#xff1a;Java Platform SE 8 一. JDK8之前传统的日期、时间 …

并行和并发的区别

并行和并发的区别是并行指的是多个任务在同一时间点上同时执行&#xff0c;而并发指的是多个任务在同一时间段内交替执行。并行需要多个处理器或者多核处理器&#xff0c;每个任务都有独立的资源&#xff0c;不会互相干扰。并发可以在单核或者多核处理器上实现&#xff0c;多个…