1.是神魔
在高并发的环境下,多个线程去竞争同一个资源, 比较常见的有高铁抢票系统,商品秒杀系统等,我们需要保证数据正确,同时系统的吞吐也要尽可能高。
2.解决方案
1. 一般多线程同步我们就会想到加锁,用synchornized关键字给并发代码块加锁,
2. 但是在我们的业务场景中,比如高铁抢票,有很多张不同的票,但是synchornized锁住 了秒杀那个代码块,
3. 所有的票全都上了这一把锁,这么看锁的力度还是太大了,其实我们只需要分别锁住每张票,比如根据票id来上锁
4. 这样锁刚刚好。我们可以通过Redis来实现分布式锁,下面分别介绍悲观锁和乐观锁。
3.reids 乐观锁
乐观锁顾名思义,就是很乐观,我看到了一件商品库存是1,我觉得这件商品很少人抢,我尝试去下单,当我提交订单的时候,如果商品的库存还是1,那就说明在我之前没人下单,我就可以成功下单,否则的话就下单失败。
在写操作较少的情况下,乐观锁是优于悲观锁的,吞吐量会大大提高,但是写操作比较频繁的话就会导致一直重试,如果重试代价较高则不推荐用乐观锁。
Redis的乐观锁主要是通过watch()来实现的,watch()的作用是监视键值对,首先是用multi()开启事务,exec()提交事务,提交事务的时候如果发现键值对的值发生变化则会取消事务,下面是一个测试例子:
/*** 初始化商品*/
private static void initProduct() {// 商品个数int prdNum = 3;String key = "prdNum";String customers = "customers";Jedis jedis = RedisUtil.getInstance().getJedis();if(jedis.exists(key)){jedis.del(key);}if(jedis.exists(customers) != null){jedis.del(customers);}jedis.set(key, String.valueOf(prdNum));RedisUtil.returnResource(jedis);
}/*** 顾客抢购商品(秒杀操作)*/
private static void initClient() throws InterruptedException {ExecutorService cachedThreadPool = Executors.newCachedThreadPool();int num = 10;for(int i=1; i<=num; i++){cachedThreadPool.execute(new Customer(i)); //启动与顾客数目相等的线程}cachedThreadPool.shutdown();while(true){if(cachedThreadPool.isTerminated()){System.out.println("所有的消费组线程均结束了!");break;}Thread.sleep(100);}}/*** 打印结果*/
private static void printResult() {Jedis jedis = RedisUtil.getInstance().getJedis();Set<String> customers = jedis.smembers("customers");int i = 1;for (String customer : customers) {System.out.println("第"+(i++)+"个抢到商品的顾客:"+customer);}RedisUtil.returnResource(jedis);
}static class Customer implements Runnable{private String name;private Jedis jedis = null;private String key = "prdNum";private String cust = "customers";public Customer(int name) {this.name = "编号=" + name;}/*** multi:开启redis事务,置客户端为事务态* exec:提交事务,执行从multi到此命令之前的命令队列,置客户端为非事务态* discard:取消事务,置客户端为非事务态* watch:监视键值对,exec提交事务时发现监视的键值发生变化则取消事务*/@Overridepublic void run() {// 随机睡眠一下try {Thread.sleep((int)Math.random()*5000);} catch (InterruptedException e) {e.printStackTrace();}while(true){System.out.println("顾客:" + name + "开始抢购商品!");jedis = RedisUtil.getInstance().getJedis();try{// 监视键值对jedis.watch(key);int prdNum = Integer.parseInt(jedis.get(key));if(prdNum > 0){// 开启事务Transaction transaction = jedis.multi();transaction.set(key, String.valueOf(prdNum - 1));// 提交事务List<Object> result = transaction.exec();if(result == null || result.isEmpty()){System.out.println("很抱歉,顾客" + name + "没有抢购到商品!");}else{// 抢到商品的话记录一下jedis.sadd(cust, name);System.out.println("恭喜顾客" + name + "抢到商品!");break;}}else{System.out.println("很抱歉,库存为0,顾客:" + name + "没有抢到商品");break;}}catch (Exception e){}finally {jedis.unwatch();RedisUtil.returnResource(jedis);}}}
}public static void main(String[] args) throws InterruptedException {long startTime = System.currentTimeMillis();initProduct();initClient();printResult();long endTime = System.currentTimeMillis();long time = endTime - startTime;System.out.println("程序运行时间:" + (int )time + "ms");
}
Redis悲观锁
所谓悲观锁,就是比如有很多人抢同一件商品,这件商品的id是101,我就把商品id作为key加一把锁,锁住对商品库存进行写操作的代码块,如果有人来抢,首先需要尝试获取这把锁,才能对商品库存进行写的操作。
当商品库存减少成功之后,再把锁释放,这样下一个人就再进来扣除这件商品的库存了。
当然如果有人一直持有锁不放怎么办呢,就是所谓的“站着茅坑不拉shi”,我们可以设定锁的过期时间。所以悲观锁中的锁就是一个唯一标识的锁和该锁的过期时间,Redis实现悲观锁是用setNX操作实现的阻塞式分布式锁。
setNX
Redis的setNX就是设置一个key的value时,如果该key已经有value,则返回1,否则返回0。
redis> EXISTS job # job 不存在 (integer) 0
redis> SETNX job “programmer” # job 设置成功 (integer) 1
redis> SETNX job “code-farmer” # 尝试覆盖 job ,失败 (integer) 0
redis> GET job # 没有被覆盖 “programmer”
我们可以用它来实现redis分布式锁,把锁过期时间作为value:
SETNX lock.foo <current Unix time + lock timeout + 1>
当有多个线程来set“lock.foo”的值时,如果返回1说明获取到了锁,键值是锁的过期时间(当前时间+锁的有效时间),返回0则可以设定一个循环不断setNX来获取锁,而释放锁则是直接把这个key删除即可。在一个进程持有锁期间,其他进程还会不断调用“GET lock.foo”命令来检测锁是否超时。
下面是Redis悲观锁实现示例:
AbstractLock
public abstract class AbstractLock implements Lock {
/*** <pre>* 这里需不需要保证可见性值得讨论, 因为是分布式的锁,* 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性* 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了.* </pre>*/
protected volatile boolean locked;/*** 当前jvm内持有该锁的线程(if have one)*/
private Thread exclusiveOwnerThread;public void lock() {try {lock(false, 0, null, false);} catch (InterruptedException e) {// TODO ignore}
}public void lockInterruptibly() throws InterruptedException {lock(false, 0, null, true);
}@Override
public boolean tryLock(long time, TimeUnit unit) {try {System.out.println("ghggggggggggggg");return lock(true, time, unit, false);} catch (InterruptedException e) {e.printStackTrace();System.out.println("" + e);}return false;
}public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {return lock(true, time, unit, true);
}public void unlock() {// TODO 检查当前线程是否持有锁if (Thread.currentThread() != getExclusiveOwnerThread()) {throw new IllegalMonitorStateException("current thread does not hold the lock");}unlock0();setExclusiveOwnerThread(null);
}protected void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;
}protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;
}protected abstract void unlock0();/*** 阻塞式获取锁的实现** @param useTimeout 是否超时* @param time 获取锁的等待时间* @param unit 时间单位* @param interrupt* 是否响应中断* @return* @throws InterruptedException*/
protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt)throws InterruptedException;}
RedisBasedDistributedLock
public class RedisBasedDistributedLock extends AbstractLock {
private Jedis jedis;
protected String lockKey;//锁的名字
protected long lockExpire;//锁的有效时长(毫秒)public RedisBasedDistributedLock(Jedis jedis,String lockKey,long lockExpire){this.jedis = jedis;this.lockKey = lockKey;this.lockExpire = lockExpire;
}@Override
public boolean tryLock() {long lockExpireTime = System.currentTimeMillis() + lockExpire + 1;//锁超时时间String stringOfLockExpireTime = String.valueOf(lockExpireTime);if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) {//获取到锁//设置相关标识locked = true;setExclusiveOwnerThread(Thread.currentThread());return true;}String value = jedis.get(lockKey);if (value != null && isTimeExpired(value)) {//锁是过期的//假设多个线程(非单jvm)同时走到这里String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime);//原子操作// 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)// 假如拿到的oldValue依然是expired的,那么就说明拿到锁了if (oldValue != null && isTimeExpired(oldValue)) {//拿到锁//设置相关标识locked = true;setExclusiveOwnerThread(Thread.currentThread());return true;}}return false;
}@Override
public Condition newCondition() {// TODO Auto-generated method stubreturn null;
}/*** 锁未过期,释放锁*/
@Override
protected void unlock0() {// 判断锁是否过期String value = jedis.get(lockKey);if (!isTimeExpired(value)) {doUnlock();}
}/*** 释放锁* @date 2017-10-18*/
public void doUnlock() {jedis.del(lockKey);
}/*** 查询当前的锁是否被别的线程占有* @return 被占有true,未被占有false* @date 2017-10-18*/
public boolean isLocked(){if (locked) {return true;}else {String value = jedis.get(lockKey);// TODO 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了,// 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断// 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就// 不是同步控制, 它只是一种锁状态的报告.return !isTimeExpired(value);}
}/*** 检测时间是否过期* @param value* @return* @date 2017-10-18*/
public boolean isTimeExpired(String value) {return Long.parseLong(value) < System.currentTimeMillis();
}/*** 阻塞式获取锁的实现* @param useTimeout 是否超时* @param time 获取锁的等待时间* @param unit 时间单位* @param interrupt* 是否响应中断*/
protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException {if (interrupt) {checkInterruption();}long start = System.currentTimeMillis();long timeout = unit.toMillis(time);//while (useTimeout ? isTimeout(start, timeout) : true) {if (interrupt) {checkInterruption();}long lockExpireTime = System.currentTimeMillis() + lockExpire + 1;//锁的超时时间String stringLockExpireTime = String.valueOf(lockExpireTime);if (jedis.setnx(lockKey, stringLockExpireTime) == 1) {//获取到锁//成功获取到锁,设置相关标识locked = true;setExclusiveOwnerThread(Thread.currentThread());return true;}}return false;
}/** 判断是否超时(开始时间+锁等待超时时间是否大于系统当前时间)*/
public boolean isTimeout(long start, long timeout) {return start + timeout > System.currentTimeMillis();
}/** 检查当前线程是否阻塞*/
public void checkInterruption() throws InterruptedException {if (Thread.currentThread().isInterrupted()) {throw new InterruptedException();}
}
}
Test
static class PessCustomer implements Runnable{private String name;private Jedis jedis = null;private String key = "prdNum";private String cust = "customers";private RedisBasedDistributedLock redisBasedDistributedLock;public PessCustomer(int num) {this.name = "编号=" + num;init();}private void init() {jedis = RedisUtil.getInstance().getJedis();redisBasedDistributedLock = new RedisBasedDistributedLock(jedis, "lock.lock", 5*1000);}@Overridepublic void run() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}while(true){if(Integer.parseInt(jedis.get(key)) < 0){break;}System.out.println("顾客:" + name + "开始抢购商品!");if(redisBasedDistributedLock.tryLock()){int prdNum = Integer.parseInt(jedis.get(key));if(prdNum > 0){jedis.decr(key); // 商品数减一jedis.sadd(cust, name);System.out.println("恭喜顾客:" + name + "抢购到商品!");}else{System.out.println("抱歉,库存为0,顾客:" + name + "没有抢到商品!");}redisBasedDistributedLock.unlock0(); // 释放锁break;}}// 释放资源redisBasedDistributedLock = null;RedisUtil.returnResource(jedis);}
}