模板模式实现分布式锁实战

前言

分布式锁相信大家都有用过,常见的分布式锁实现方式例如redis、zookeeper、数据库都可以实现,而我们代码中强引用这些分布式锁的代码,那么当我们以后想替换分布式锁的实现方式时,需要修改代码的成本会很高,于是我们需要借鉴一些设计模式思想来设计,下面我介绍下这三个分布式锁的实现逻辑以及我们项目中是怎么实现

实现方式

数据库实现

首先我们设计一张这样的表

CREATE TABLE `lock` (`key` varchar(128) C NOT NULL,`created` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,`version` int(8) DEFAULT '1',PRIMARY KEY (`key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT = '分布式锁表';

首先方法开启事务,当我们需要对某块业务上锁时,确定加锁的颗粒度设置key的值,执行插入语句,这里我们可以设计的全面一点,对比其他的分布式锁实现方式,貌似还缺了点什么,比如这个锁不可重入,无法设置超时时间 其实这些我们也可以解决, 首先可重入方面,version字段记录重入的次数

insert into table (`key`) values (#key#) on duplicate key `version` total = `version` + 1

key是唯一索引,执行完这个语句后,会在这一行添加行锁。这时候后续有两种可能性,如果是当前线程A重复插入锁,那么会更新version字段;如果是其他线程B想要插入这个锁,那么由于A持有的行锁还未释放,所以B会阻塞

超时方面,我们可以设置数据库的innodb_lock_wait_timeout参数来设置超时时间,默认50s,等待时间超时这个时间则会报错

由于insert语句在RR隔离级别会生成间隙锁,在并发较高的情况下会产生死锁的情况,所以建议在RC情况下使用

Redis实现

如果我们自己设计redis实现加锁的话,我们第一个想到的就是setNx语法,它的作用就是当key不存在的情况下,将key 的值设置为 value,同时返回1,如果key已存在,则不设置value,并且返回0

如果还要加上超时时间,那么还需要执行expire语法

setnx lock true
expire lock 10

使用这两个语法,会产生一个问题,就是这两个语法不是原子性的,当执行一个语法后,系统报错了,那么超时时间就无法设置,这样子这个key就无法过期

基于这个问题,Redis官方将这两个指令组合在了一起,解决Redis分布式锁原子性操作的问题

SET key value [EX 过期时间] NX
将key 的值设置为 value,同时返回1,如果key已存在,则不设置value,并且返回0;同时加上超时时间,这是一个原子性操作

由此可见,我们自己实现需要踩很多坑,市面上有成熟的redis实现的分布式锁框架redission,我们直接用就可以了,它帮我们把坑都踩了一遍,不用重复造轮子了,简单讲解下它的原理

首先讲一下它的键值kv结构

  • key为锁的key
  • value为一个hash对象 {key:线程id,value:重入次数}

value为什么这么设计?我们要解决两个问题

1、删除锁时,A线程把B线程持有的锁给删除了?为了解决这个问题,我们需要在锁中记录线程id,删除时就可以判断锁是否是当前线程持有的,一致才可以删除

2、实现可重入的逻辑,所以需要在锁中记录重入次数,每次重入次数+1

不同于上面SET key value [EX 过期时间] NX方式,redission的加锁逻辑是通过一段lua脚本来实现的,redis的lua脚本可以实现原子性的操作,下图是lua脚本

image-20240105170729529

解锁逻辑也是一样的,由于需要判断当前线程才能执行删除,所以也需要通过lua脚本来实现删除的逻辑

当我们设置了过期时间后,如果我们的业务执行时间超过了设定的过期时间,那么锁会提前删除,就会出现各种各样的问题。所以redission实现了一个watch dog逻辑。它是一个后台线程,每10s检查一次,将锁的过期时间延长

redis我们通过都会设置高可用,最常见的方案就是主从或者哨兵,但是它保证了高可用的同时,无法保证高一致性。这样子当redis的主节点挂了,从节点还没有同步到主节点的数据,就变成了主节点,那么锁就会发生丢失

redission提供了RedLock算法,通过使用多个Redis实例,各个实例之间没有主从关系,相互独立,超过一半节点加锁成功才算获取到锁。不过这种算法也不是一个完美的算法,多个实例加锁效率低,同时也会衍生出一些其他问题

Zookeeper实现

Zookeeper有很多种节点种类,其中有一种节点种类叫做临时顺序节点,这个节点有两个特性,首先是当客户端向Zookeeper添加了这个节点后,如果之后客户端挂了,那么这个临时节点会被删除,不会一直存在,其次是这个节点是有序递增的。这些特性很适合用来做分布式锁

加锁就是在Zookeeper上添加临时顺序节点,判断是否是最小节点,如果是最小节点,则无需排队直接执行。如果不是最小的,则往后加一个顺序节点,并且向前一个节点添加一个watch监听,线程阻塞等待排队

当前一个节点删除时,当前节点监听到删除事件并唤醒线程。这样子第一个通知第二个,第二个通知第三个,这种击鼓传花的方式可以避免羊群效应

羊群效应就是前一个节点释放锁后,所有节点被唤醒,这样会给服务器带来巨大压力

哪种更好?

这个问题,我觉得得根据我们的现实情况做判定,当我们的系统只有数据库,又不想依赖其他的中间件,那我们使用数据库实现的方方式就可以了,但是性能会很差,容易出现瓶颈

Redis和Zookeeper性能都比数据库好,这两者相比较而言,Redis作为分布式锁大家使用的会多一些,主要原因我想应该是Zookeeper的cp特性导致单leader节点易出现瓶颈,而redis如果出现瓶颈后弹性伸缩(增加节点)会很方便,所以性能更高

踩坑点

之前我们在代码中加入分布式锁时,碰到过一个坑,就是明明加入了分布式锁,但是没有"锁住"代码。这里有几种原因,我就不一一展开了,我们之前踩过的一个坑就是我们的方法加了事务@Transactional注解,同时方法里面的逻辑加了redis分布式锁,类似下面的代码。

@Transactional
private void test() {// 加锁// 查询 id =1的记录// 更新 id=1的记录// 释放锁
}

这时候,当线程A进入了方法,首先进入事务,加锁,然后执行方法里面的逻辑,释放锁,但是事务还未释放。这时候线程B也进入了这个方法,锁因为已经释放了,就直接进入方法逻辑了,但是线程A的事务此时还没有提交,所以线程B查询的id=1的记录是不对的。

这个是踩坑了,解决方法有两种,一种是把事务的方法放在加锁的逻辑里面;另外一种就是释放锁的逻辑改成监听spring 事务提交的事件,实现事务完成后再释放锁,我们最后也是这么改的

@Transactional
private void test() {// 加锁// 查询 id =1的记录// 更新 id=1的记录// unlockAfterTransaction方法
}private void unlockAfterTransaction(LockResult lockResult) {//事物完成后释放锁TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {@Overridepublic void afterCompletion(int status) {super.afterCompletion(status);distLockSservice.unlock(lockResult);}});
}

实战

这里再重审下为什么使用模板模式,因为我们使用分布式锁的实现有多种,如果我们在业务代码中直接依赖某一种分布式锁的话,那么后续我们想替换分布式锁的实现会很麻烦,所有有依赖的类都得替换,所以我们使用模版模式,把分布式锁的实现以及加锁、释放锁的逻辑放到公共代码中,我们业务类只需要实现自己的业务逻辑即可,无需关心分布式锁的相关逻辑,调用方法即可

我们在1.8之前使用模版模式会比较繁琐,我们需要准备一个抽象类,定义一些公共的逻辑,然后子类继承这个抽象类来自定义不同的逻辑。下面我以redission分布式锁的实战为例

public abstract class AbstractLockService {RedissonClient redissonClient;public void lock(List<String> keyNames, Long timeout,Object... o) throws InterruptedException {// 加锁RLock[] locks = new RLock[keyNames.size()];for(int i = 0; i < keyNames.size(); ++i) {locks[i] = this.redissonClient.getLock(keyNames.get(i));}RLock lock = this.redissonClient.getMultiLock(locks);boolean success = lock.tryLock(timeout, TimeUnit.MILLISECONDS);// 加锁成功,走业务逻辑if (success) {this.doBusiness(o);}// 释放锁unlockAfterTransaction(lock);}// 业务逻辑abstract Object doBusiness(Object... o);private void unlockAfterTransaction(RLock lock) {//事物完成后释放锁TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {@Overridepublic void afterCompletion(int status) {super.afterCompletion(status);lock.unlock();}});}
}

当我们使用1.8以上的JDK时,针对模板模式,做了很多优化。我们可以将实现方法作为函数式方法传入模版中

public class LockFunctionService {RedissonClient redissonClient;public void lock(List<String> keyNames, Long timeout, ILockCallback lockCallback) throws InterruptedException {// 加锁RLock[] locks = new RLock[keyNames.size()];for(int i = 0; i < keyNames.size(); ++i) {locks[i] = this.redissonClient.getLock(keyNames.get(i));}RLock lock = this.redissonClient.getMultiLock(locks);boolean success = lock.tryLock(timeout, TimeUnit.MILLISECONDS);// 加锁成功,走业务逻辑if (success) {lockCallback.callback();}// 释放锁unlockAfterTransaction(lock);}private void unlockAfterTransaction(RLock lock) {//事物完成后释放锁TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {@Overridepublic void afterCompletion(int status) {super.afterCompletion(status);lock.unlock();}});}}interface ILockCallback<T> {T callback();
}

当我们调用时,比之前方便多了

public static void main(String[] args) throws InterruptedException {LockFunctionService lockFunctionService = new LockFunctionService();List<String> keys = Lists.newArrayList("lock_1");// 需要加锁执行释放lockFunctionService.lock(keys, 5L, () -> {System.out.println("执行业务逻辑");return null;});
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/327555.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Python实战】global关键字的应用和线程并发

【Python实战】global关键字的应用和线程并发 一、前言编译环境 二、gloabl全局变量关键字代码示例 三、程序运行时全局变量的变化代码示例 四、全局变量的线程安全问题五、总结 在很多场景和业务实践中&#xff0c;线程之间的变量共享和保持原子性非常的关键和重要。 这边主要…

HarmonyOS 开发基础(五)Button

HarmonyOS 开发基础&#xff08;五&#xff09;Button Entry Component struct Index {build() {Row() {Column() {// Button&#xff1a;ArkUI 的基础组件 按钮组件// label 参数&#xff1a;文字型按钮Button(我是按钮)// width&#xff1a;属性方法&#xff0c;设置组件的宽…

Python如何生成个性二维码

Python-生成个性二维码 一、问题描述 通过调用MyQR模块来实现生成个人所需二维码。 安装&#xff1a; pip install myqr 二、代码实现 1.普通二维码 from MyQR import myqr # 普通二维码 myqr.run(wordshttp://www.csdn.net/mayi0312,save_nameqrcode.png ) 效果图&#…

Vue2商品规格选择

Vue2Element-ui Vu2仿写拼多多商家后台规则选择&#xff0c;为什么用Vue2呢&#xff0c;因为公司用的Vue2... 样式不是很好看&#xff0c;自己调一下就行。 <template><div ref"inputContainer"><div>{{ combinationsResult }}</div><…

【开源】基于JAVA语言的智能教学资源库系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 课程档案模块2.3 课程资源模块2.4 课程作业模块2.5 课程评价模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 课程档案表3.2.2 课程资源表3.2.3 课程作业表3.2.4 课程评价表 四、系统展示五、核心代…

【响应式编程-05】Lambda方法引用

一、简要描述 Lambda的方法引用也叫引用方法 方法引用初体验方法引用的底层实现方法引用的语法格式方法引用举例 静态方法引用构造方法引用普通方法引用super和this方法引用数组的方法引用 二、方法引用初体验 为什么出现方法引用&#xff1f; 引用已存在方法&#xff0c;避免重…

DBeaver配置类Navicat显示字段是否非空

在Navicat中设计表时可以很方便的看到字段是否【非空】&#xff0c;而在DBeaver中确实这样显示的,必须双击字段才能看到是否【非空】 解决方案 点击此处齿轮按钮,将【非空】以及其他需要的显示字段都勾上,重新打开即可

Python中的有序字典是什么

有序字典 一、简介 Python中的字典的特性&#xff1a;无序性。 有序字典和通常字典类似&#xff0c;只是它可以记录元素插入其中的顺序&#xff0c;而一般字典是会以任意的顺序迭代的。 二、普通字典 #! /usr/bin/env python3 # -*- coding:utf-8 -*- d1 {} d1[a] A d1[b…

【Java EE初阶七】多线程案例(生产者消费者模型)

1. 阻塞队列 队列是先进先出的一种数据结构&#xff1b; 阻塞队列&#xff0c;是基于队列&#xff0c;做了一些扩展&#xff0c;适用于多线程编程中&#xff1b; 阻塞队列特点如下&#xff1a; 1、是线程安全的 2、具有阻塞的特性 2.1、当队列满了时&#xff0c;就不能往队列里…

druid Communications link failure报错处理

现象 日志报错&#xff1a;com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure 原因 从数据库连接池拿到了已经关闭的连接&#xff0c;导致报错。druid有定时任务进行空闲连接的检测和回收&#xff0c;当连接时长超过mysql的连接超时时间…

YOLOv8改进 | 2023主干篇 | FasterNeT跑起来的主干网络( 提高FPS和检测效率)

一、本文介绍 本文给大家带来的改进机制是FasterNet网络,将其用来替换我们的特征提取网络,其旨在提高计算速度而不牺牲准确性,特别是在视觉任务中。它通过一种称为部分卷积(PConv)的新技术来减少冗余计算和内存访问。这种方法使得FasterNet在多种设备上运行速度比其他网络…

【Flink精讲】Flink数据延迟处理

面试题&#xff1a;Flink数据延迟怎么处理&#xff1f; 将迟到数据直接丢弃【默认方案】将迟到数据收集起来另外处理&#xff08;旁路输出&#xff09;重新激活已经关闭的窗口并重新计算以修正结果&#xff08;Lateness&#xff09; Flink数据延迟处理方案 用一个案例说明三…