目录
1 基本介绍
1.1API介绍
1.2 示例
2 源码解析
2.1 Semaphore设置许可数量(trySetPermits(int permits))
2.2 尝试获取许可(boolean tryAcquire())
3 Lua脚本
3.1 加锁lua脚本
3.2 解锁lua脚本
1 基本介绍
Semaphore通常叫信号量,可以用来同时控制访问特定资源的线程数量,通过协调各个线程,保证合理的使用资源。
1.1API介绍
public interface RSemaphore extends RExpirable, RSemaphoreAsync {// 获得一个permitvoid acquire() throws InterruptedException; //获得var1个permitvoid acquire(int var1) throws InterruptedException; //尝试获得permitboolean tryAcquire(); //尝试获得var1个permitboolean tryAcquire(int var1); //尝试获得permit, 等待时间var1boolean tryAcquire(long var1, TimeUnit var3) throws InterruptedException;//尝试获得var1个permit, 等待时间var2boolean tryAcquire(int var1, long var2, TimeUnit var4) throws InterruptedException;//释放1个permitvoid release();//释放var1个permitvoid release(int var1);//信号量的permits数int availablePermits();//清空permitsint drainPermits();//设置permits数boolean trySetPermits(int var1);//添加permits数void addPermits(int var1);
}
1.2 示例
@RestController
@RequestMapping("/rsemaphone")
public class TestRsemaphoreController {@Resourceprivate RedissonClient redissonClient;private ExecutorService executorService= Executors.newFixedThreadPool(5);/*** redission信号量*/@RequestMapping("/rseTrySetPermits")public void rseTrySetPermits() throws InterruptedException {RSemaphore semaphore = redissonClient.getSemaphore("123");semaphore.trySetPermits(5);for (int i = 0; i < 10; i++) {executorService.submit(()->{try {semaphore.acquire();System.out.println("线程:"+Thread.currentThread().getName()+"获得permit");Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("线程:"+Thread.currentThread().getName()+"释放permit");semaphore.release();}});}}}
运行结果:
2 源码解析
2.1 Semaphore设置许可数量(trySetPermits(int permits))
public boolean trySetPermits(int permits) {return (Boolean)this.get(this.trySetPermitsAsync(permits));}public RFuture<Boolean> trySetPermitsAsync(int permits) {return this.commandExecutor.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//判断分布式信号量是否存在,如果不存在才设置
"local value = redis.call('get', KEYS[1]); if (value == false or value == 0)
//使用string数据结构设置信号量许可数
then redis.call('set', KEYS[1], ARGV[1]);
//发布一条消息到redisson_sc:{semaphore}通道
redis.call('publish', KEYS[2], ARGV[1]);
//设置成功返回1
return 1;end;
//否则返回0
return 0;", Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{permits});}
可以发现只有设置许可其实就是利用lua将值设置到redis中
RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
semaphore.trySetPermits(5);
2.2 尝试获取许可(boolean tryAcquire())
尝试获取许可:可以看到获取许可的底层还是通过lua来实现的,如果能够成功获取返回true,否则返回false。
public boolean tryAcquire(int permits) {return (Boolean)this.get(this.tryAcquireAsync(permits));}public RFuture<Boolean> tryAcquireAsync() {return this.tryAcquireAsync(1);}public RFuture<Boolean> tryAcquireAsync(int permits) {if (permits < 0) {throw new IllegalArgumentException("Permits amount can't be negative");} else {return permits == 0 ? RedissonPromise.newSucceededFuture(true) : this.commandExecutor.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//获取当前剩余的许可数量
"local value = redis.call('get', KEYS[1]);
//许可数量不为空 并且当前许可数量大于等于剩余的许可数量
if (value ~= false and tonumber(value) >= tonumber(ARGV[1]))
//通过decrby减少剩余可用许可
then local val = redis.call('decrby', KEYS[1], ARGV[1]);
//返回1return 1; end;
//其他情况返回0
return 0;", Collections.singletonList(this.getRawName()), new Object[]{permits});}}
从源码中可以看出获取许可就是通过操作redis中的数据,首先获取到剩余的许可数量,当只有剩余的许可数量大于想要获取的许可数量时返回1否则返回0.
3 Lua脚本
3.1 加锁lua脚本
参数 | 示例值 | 含义 |
KEY个数 | 1 | KEY个数 |
KEYS[1] | my_first_lock_name | 锁名 |
ARGV[1] | 60000 | 持有锁的有效时间:毫秒 |
ARGV[2] | 58c62432-bb74-4d14-8a00-9908cc8b828f:1 | 唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID |
- 脚本内容
-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
return redis.call('pttl', KEYS[1]);
- 脚本解读
问:返回nil、返回剩余过期时间有什么目的?
答:当且仅当返回nil,才表示加锁成功;客户端需要感知加锁是否成功的结果
3.2 解锁lua脚本
- 脚本入参
参数 | 示例值 | 含义 |
KEY个数 | 2 | KEY个数 |
KEYS[1] | my_first_lock_name | 锁名 |
KEYS[2] | redisson_lock__channel:{my_first_lock_name} | 解锁消息PubSub频道 |
ARGV[1] | 0 | redisson定义0表示解锁消息 |
ARGV[2] | 300000 | 设置锁的过期时间;默认值30秒 |
ARGV[3] | 58c62432-bb74-4d14-8a00-9908cc8b828f:1 | 唯一标识;同加锁流程 |
- 脚本内容
-- 若锁不存在:则直接广播解锁消息,并返回1
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
-- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
- 脚本解读
问:广播解锁消息有什么用?
答:是为了通知其他争抢锁阻塞住的线程,从阻塞中解除,并再次去争抢锁。