※Redis的事务、乐观锁和悲观锁

1.是神魔

 在高并发的环境下,多个线程去竞争同一个资源, 比较常见的有高铁抢票系统,商品秒杀系统等,我们需要保证数据正确,同时系统的吞吐也要尽可能高。

2.解决方案

1.  一般多线程同步我们就会想到加锁,用synchornized关键字给并发代码块加锁,
2. 但是在我们的业务场景中,比如高铁抢票,有很多张不同的票,但是synchornized锁住     了秒杀那个代码块,
3. 所有的票全都上了这一把锁,这么看锁的力度还是太大了,其实我们只需要分别锁住每张票,比如根据票id来上锁
4. 这样锁刚刚好。我们可以通过Redis来实现分布式锁,下面分别介绍悲观锁和乐观锁。

3.reids 乐观锁

乐观锁顾名思义,就是很乐观,我看到了一件商品库存是1,我觉得这件商品很少人抢,我尝试去下单,当我提交订单的时候,如果商品的库存还是1,那就说明在我之前没人下单,我就可以成功下单,否则的话就下单失败。

在写操作较少的情况下,乐观锁是优于悲观锁的,吞吐量会大大提高,但是写操作比较频繁的话就会导致一直重试,如果重试代价较高则不推荐用乐观锁。

Redis的乐观锁主要是通过watch()来实现的,watch()的作用是监视键值对,首先是用multi()开启事务,exec()提交事务,提交事务的时候如果发现键值对的值发生变化则会取消事务,下面是一个测试例子:

/*** 初始化商品*/
private static void initProduct() {// 商品个数int prdNum = 3;String key = "prdNum";String customers = "customers";Jedis jedis = RedisUtil.getInstance().getJedis();if(jedis.exists(key)){jedis.del(key);}if(jedis.exists(customers) != null){jedis.del(customers);}jedis.set(key, String.valueOf(prdNum));RedisUtil.returnResource(jedis);
}/*** 顾客抢购商品(秒杀操作)*/
private static void initClient() throws InterruptedException {ExecutorService cachedThreadPool = Executors.newCachedThreadPool();int num = 10;for(int i=1; i<=num; i++){cachedThreadPool.execute(new Customer(i)); //启动与顾客数目相等的线程}cachedThreadPool.shutdown();while(true){if(cachedThreadPool.isTerminated()){System.out.println("所有的消费组线程均结束了!");break;}Thread.sleep(100);}}/*** 打印结果*/
private static void printResult() {Jedis jedis = RedisUtil.getInstance().getJedis();Set<String> customers = jedis.smembers("customers");int i = 1;for (String customer : customers) {System.out.println("第"+(i++)+"个抢到商品的顾客:"+customer);}RedisUtil.returnResource(jedis);
}static class Customer implements Runnable{private String name;private Jedis jedis = null;private String key = "prdNum";private String cust = "customers";public Customer(int name) {this.name = "编号=" + name;}/*** multi:开启redis事务,置客户端为事务态* exec:提交事务,执行从multi到此命令之前的命令队列,置客户端为非事务态* discard:取消事务,置客户端为非事务态* watch:监视键值对,exec提交事务时发现监视的键值发生变化则取消事务*/@Overridepublic void run() {// 随机睡眠一下try {Thread.sleep((int)Math.random()*5000);} catch (InterruptedException e) {e.printStackTrace();}while(true){System.out.println("顾客:" + name + "开始抢购商品!");jedis = RedisUtil.getInstance().getJedis();try{// 监视键值对jedis.watch(key);int prdNum = Integer.parseInt(jedis.get(key));if(prdNum > 0){// 开启事务Transaction transaction = jedis.multi();transaction.set(key, String.valueOf(prdNum - 1));// 提交事务List<Object> result = transaction.exec();if(result == null || result.isEmpty()){System.out.println("很抱歉,顾客" + name + "没有抢购到商品!");}else{// 抢到商品的话记录一下jedis.sadd(cust, name);System.out.println("恭喜顾客" + name + "抢到商品!");break;}}else{System.out.println("很抱歉,库存为0,顾客:" + name + "没有抢到商品");break;}}catch (Exception e){}finally {jedis.unwatch();RedisUtil.returnResource(jedis);}}}
}public static void main(String[] args) throws InterruptedException {long startTime = System.currentTimeMillis();initProduct();initClient();printResult();long endTime = System.currentTimeMillis();long time = endTime - startTime;System.out.println("程序运行时间:" + (int )time + "ms");
}

在这里插入图片描述
在这里插入图片描述

Redis悲观锁

所谓悲观锁,就是比如有很多人抢同一件商品,这件商品的id是101,我就把商品id作为key加一把锁,锁住对商品库存进行写操作的代码块,如果有人来抢,首先需要尝试获取这把锁,才能对商品库存进行写的操作。

当商品库存减少成功之后,再把锁释放,这样下一个人就再进来扣除这件商品的库存了。

当然如果有人一直持有锁不放怎么办呢,就是所谓的“站着茅坑不拉shi”,我们可以设定锁的过期时间。所以悲观锁中的锁就是一个唯一标识的锁和该锁的过期时间,Redis实现悲观锁是用setNX操作实现的阻塞式分布式锁。

setNX

Redis的setNX就是设置一个key的value时,如果该key已经有value,则返回1,否则返回0。

redis> EXISTS job # job 不存在 (integer) 0

redis> SETNX job “programmer” # job 设置成功 (integer) 1

redis> SETNX job “code-farmer” # 尝试覆盖 job ,失败 (integer) 0

redis> GET job # 没有被覆盖 “programmer”

我们可以用它来实现redis分布式锁,把锁过期时间作为value:

SETNX lock.foo <current Unix time + lock timeout + 1>

当有多个线程来set“lock.foo”的值时,如果返回1说明获取到了锁,键值是锁的过期时间(当前时间+锁的有效时间),返回0则可以设定一个循环不断setNX来获取锁,而释放锁则是直接把这个key删除即可。在一个进程持有锁期间,其他进程还会不断调用“GET lock.foo”命令来检测锁是否超时。

下面是Redis悲观锁实现示例:

AbstractLock

public abstract class AbstractLock implements Lock {
/*** <pre>* 这里需不需要保证可见性值得讨论, 因为是分布式的锁,* 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性* 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了.* </pre>*/
protected volatile boolean locked;/*** 当前jvm内持有该锁的线程(if have one)*/
private Thread exclusiveOwnerThread;public void lock() {try {lock(false, 0, null, false);} catch (InterruptedException e) {// TODO ignore}
}public void lockInterruptibly() throws InterruptedException {lock(false, 0, null, true);
}@Override
public boolean tryLock(long time, TimeUnit unit) {try {System.out.println("ghggggggggggggg");return lock(true, time, unit, false);} catch (InterruptedException e) {e.printStackTrace();System.out.println("" + e);}return false;
}public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {return lock(true, time, unit, true);
}public void unlock() {// TODO 检查当前线程是否持有锁if (Thread.currentThread() != getExclusiveOwnerThread()) {throw new IllegalMonitorStateException("current thread does not hold the lock");}unlock0();setExclusiveOwnerThread(null);
}protected void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;
}protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;
}protected abstract void unlock0();/*** 阻塞式获取锁的实现** @param useTimeout 是否超时* @param time 获取锁的等待时间* @param unit 时间单位* @param interrupt*            是否响应中断* @return* @throws InterruptedException*/
protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt)throws InterruptedException;}

RedisBasedDistributedLock

public class RedisBasedDistributedLock extends AbstractLock {
private Jedis jedis;
protected String lockKey;//锁的名字
protected long lockExpire;//锁的有效时长(毫秒)public RedisBasedDistributedLock(Jedis jedis,String lockKey,long lockExpire){this.jedis = jedis;this.lockKey = lockKey;this.lockExpire = lockExpire;
}@Override
public boolean tryLock() {long lockExpireTime = System.currentTimeMillis() + lockExpire + 1;//锁超时时间String stringOfLockExpireTime = String.valueOf(lockExpireTime);if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) {//获取到锁//设置相关标识locked = true;setExclusiveOwnerThread(Thread.currentThread());return true;}String value = jedis.get(lockKey);if (value != null && isTimeExpired(value)) {//锁是过期的//假设多个线程(非单jvm)同时走到这里String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime);//原子操作// 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)// 假如拿到的oldValue依然是expired的,那么就说明拿到锁了if (oldValue != null && isTimeExpired(oldValue)) {//拿到锁//设置相关标识locked = true;setExclusiveOwnerThread(Thread.currentThread());return true;}}return false;
}@Override
public Condition newCondition() {// TODO Auto-generated method stubreturn null;
}/*** 锁未过期,释放锁*/
@Override
protected void unlock0() {// 判断锁是否过期String value = jedis.get(lockKey);if (!isTimeExpired(value)) {doUnlock();}
}/*** 释放锁* @date 2017-10-18*/
public void doUnlock() {jedis.del(lockKey);
}/*** 查询当前的锁是否被别的线程占有* @return 被占有true,未被占有false* @date 2017-10-18*/
public boolean isLocked(){if (locked) {return true;}else {String value = jedis.get(lockKey);// TODO 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了,// 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断// 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就// 不是同步控制, 它只是一种锁状态的报告.return !isTimeExpired(value);}
}/*** 检测时间是否过期* @param value* @return* @date 2017-10-18*/
public boolean isTimeExpired(String value) {return Long.parseLong(value) < System.currentTimeMillis();
}/*** 阻塞式获取锁的实现* @param useTimeout 是否超时* @param time 获取锁的等待时间* @param unit 时间单位* @param interrupt*            是否响应中断*/
protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException {if (interrupt) {checkInterruption();}long start = System.currentTimeMillis();long timeout = unit.toMillis(time);//while (useTimeout ? isTimeout(start, timeout) : true) {if (interrupt) {checkInterruption();}long lockExpireTime = System.currentTimeMillis() + lockExpire + 1;//锁的超时时间String stringLockExpireTime = String.valueOf(lockExpireTime);if (jedis.setnx(lockKey, stringLockExpireTime) == 1) {//获取到锁//成功获取到锁,设置相关标识locked = true;setExclusiveOwnerThread(Thread.currentThread());return true;}}return false;
}/** 判断是否超时(开始时间+锁等待超时时间是否大于系统当前时间)*/
public boolean isTimeout(long start, long timeout) {return start + timeout > System.currentTimeMillis();
}/** 检查当前线程是否阻塞*/
public void checkInterruption() throws InterruptedException {if (Thread.currentThread().isInterrupted()) {throw new InterruptedException();}
}
}

Test

static class PessCustomer implements Runnable{private String name;private Jedis jedis = null;private String key = "prdNum";private String cust = "customers";private RedisBasedDistributedLock redisBasedDistributedLock;public PessCustomer(int num) {this.name = "编号=" + num;init();}private void init() {jedis = RedisUtil.getInstance().getJedis();redisBasedDistributedLock = new RedisBasedDistributedLock(jedis, "lock.lock", 5*1000);}@Overridepublic void run() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}while(true){if(Integer.parseInt(jedis.get(key)) < 0){break;}System.out.println("顾客:" + name + "开始抢购商品!");if(redisBasedDistributedLock.tryLock()){int prdNum = Integer.parseInt(jedis.get(key));if(prdNum > 0){jedis.decr(key); // 商品数减一jedis.sadd(cust, name);System.out.println("恭喜顾客:" + name + "抢购到商品!");}else{System.out.println("抱歉,库存为0,顾客:" + name + "没有抢到商品!");}redisBasedDistributedLock.unlock0(); // 释放锁break;}}// 释放资源redisBasedDistributedLock = null;RedisUtil.returnResource(jedis);}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/20664.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据库作业——select查询操作

数据库作业 创建数据库 mysql> create table worker( -> 部门号 int(11) not null, -> 职工号 int(11) primary key not null,-> 工作时间 date not null,-> 工资…

MiniGPT4 在RTX-3090 Ubuntu服务器部署步骤详解

主要参考知乎帖子&#xff1a; MiniGPT-4 本地部署 RTX 3090 - 知乎 MiniGPT-4部署比麻烦&#xff0c;首先需要获取LLaMA权重&#xff0c;并结合Vicuna的bitwise XOR增量文件完成Vicuna模型权重生成&#xff0c;最后准备好预训练的MiniGPT-4进行模型部署。为了便于理解&#…

这3个方法教你录音转文字怎么导出来

在日常生活中&#xff0c;我们有时候可能需要将音频转换为文本&#xff0c;因为这样可以节省我们听的时间&#xff0c;还能让记录更加清楚明了。那么&#xff0c;你是否知道录音转文字怎么导出来&#xff1f;如果你不懂&#xff0c;请接着看我下面介绍的三种方法吧&#xff01;…

pytorch安装问题【超级简单版】

pytorch安装问题 当前遇到的问题&#xff1a; python3.9无法安装读取coco数据集的 pycocotools-windows,那么需要切换版本到3.6/7/8&#xff0c;但是切换到python 3.6之后&#xff0c;无法安装torchvision和pytorch【在python就叫torch】&#xff0c;显示没有这个版本 pip i…

[RocketMQ] Broker 消息重放服务源码解析 (十三)

构建消息文件ConsumeQueue和IndexFile。 ConsumeQueue: 看作是CommitLog的消息偏移量索引文件, 存储了它所属Topic的消息在Commit Log中的偏移量。消费者拉取消息的时候, 可以从Consume Queue中快速的根据偏移量定位消息在Commit Log中的位置。IndexFile索引文件: 看作是Commi…

UE4 像素流的一些使用技巧

一、测试像素流的三种方法&#xff0c;前提是熟悉官网像素流送那套流程&#xff0c;这里只是讲如何不用打包就能测试的方法 1.第一种方法是vs安装unrealvs扩展&#xff0c;因为安装这个拓展后加可以加命令行参数启动项目https://docs.unrealengine.com/4.26/zh-CN/ProductionP…

怎么用PDF24 Tools工具在线进行PDF文件合并

PDF文件是经常会被用到&#xff0c;它在我们的日常生活和工作中扮演着重要的角色。PDF文件合并是将多个PDF文件合并为单个文件&#xff0c;这个过程通常是为了方便管理多个PDF文件&#xff0c;或者将多个PDF文件合并为一个整体以便于共享或打印。既然如此&#xff0c;如何快速合…

Spring5学习笔记--详细一文通

Spring5学习笔记--详细一文通 1 Spring 框架概述1.1 Spring 5 简述1.2 Spring5入门案例1.2.1 Spring5下载1.1.2 打开 idea 工具&#xff0c;创建普通 Java 工程1.2.3 导入 Spring5 相关 jar 包1.2.4 创建普通类&#xff0c;在这个类创建普通方法1.2.5 创建 Spring 配置文件&…

迅捷录屏软件使用中的注意事项

取消勾选时就不会出现右侧的悬浮框滑动的窗口了。 取消鼠标高亮后&#xff0c;录制的视频就不会出现一个空心的小圆圈。

实战解决百度旋转验证码

1、效果演示 2、如何识别 2.1准备数据集 首先需要使用爬虫&#xff0c;对验证码图片进行采集&#xff0c;尽量每一种类型都要采集到。 2.2图像矫正 接下来对采集的数据进行人工校正 2.3数据清洗 &#xff08;1&#xff09;对数据进行进行旋转&#xff0c;达到增加数据量的目…

ubuntu RPM should not be used directly install RPM packages, use Alien instead!

ubuntu RPM should not be used directly install RPM packages, use Alien instead! 所以我们最好下载deb版本的安装包 安装 参考文章

多线程设计模式【线程安全、 Future 设计模式、Master-Worker 设计模式 】(一)-全面详解(学习总结---从入门到深化)

目录 Single Thread Execution 设计模式 线程安全 Future 设计模式 Master-Worker 设计模式 生产者消费者设计模式 定义不可变对象的策略 Single Thread Execution 设计模式 机场过安检 Single Thread Execution 模式是指在同一时刻只能有一个线程去访问共享资源&#xff…