什么是Redisson
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
为什么要用Redisson
基于SETNX实现的分布式锁存在诸多问题
- 不可重入问题:是指获取锁的线程,不可以再次获取相同的锁。例:方法A在获取锁A后调用了方法B,方法B也需要获取A,但是由于不可重入锁的原因,方法B无法获取锁A,此情况会导致死锁的发生,由此暴露出了不可重入锁的弊端。因此可重入锁的意义在于防止死锁(synchronized和Lock锁都是可重入的)
- 超时释放:在加锁的时候增加了TTL,可以防止死锁,但是如果线程发生卡顿或阻塞的时间太长,会导致锁超时释放,释放后其他线程可以获取锁从而引发安全问题。
- 主从一致性:在Redis是主从集群时,在向集群写数据时,主节点需要异步将数据同步给从节点,特殊情况下,在数据同步之前主节点发生了宕机(主从同步存在时间延迟),未把数据成功同步到从节点,从而出现主从一致性的问题
- 不可重试:通过SETNX命令获取分布式锁只能尝试一次,失败后返回false,不能进行锁重试机制
Redisson提供了不同的分布式锁:
- 可重入锁(Reentrant Lock)
- 公平锁(Fair Lock)
- 联锁(MultiLock)
- 红锁(RedLock)
- 读写锁(ReadWriteLock)
- 信号量(Semaphore)
- 可过期性信号量(PermitExpirableSemaphore)
- 闭锁(CountDownLatch)
Redisson入门
1.引入依赖:
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
2.配置Redisson客户端
@Configuration
public class RedisConfig {@Beanpublic RedissonClient redissonClient(){Config config = new Config();config.useSingleServer.setAddress("redis://192.168.0.1:6379").setPassword("123456"); return Redisson.create(config);}
}
3.使用Redisson的分布式锁
@Resource
private RedissonClient redissonClient;private RLock rLock;@Test
void testRedisson throws Exception {lock = redissonClient.getLock("lock");// 获取锁,并指定锁名称boolean isLock = lock.tryLock();// 尝试获取锁,此处方法根据业务类型传参if (isLock) {// 如果为true,表示获取成功获取锁try {// 执行业务...} finally {// 释放锁lock.unlock();}}
}
Redisson之重入锁——tryLock
// 尝试获取锁,不可重入锁
// 如果失败则直接返回false,如果成功,则自动更新锁的过期时间(watchDog机制)
// leaseTime 默认是-1,当leaseTime为-1时,执行watchDog机制
boolean lock = lock.tryLock();// 尝试获取锁,可重入锁,并自动更新锁的过期时间(watchDog机制)
// waitTime 最大等待时常(在获取锁失败后的一段时间内不断尝试获取锁)
// leaseTime 此处不传leaseTime默认值是-1,成功获取锁后,通过watchDog机制自动更新锁的过期时间
// unit 时间单位
boolean lock = lock.tryLock(waitTime, unit);// 尝试获取锁,可重入锁
// waitTime 最大等待时常(在获取锁失败后的一段时间内不断尝试获取锁)
// leaseTime 锁的有效期(不会自动更新锁的过期时间,当leaseTime为-1时,会执行watchDog机制自动更新时间)
// unit 时间单位
boolean lock = lock.tryLock(waitTime, leaseTime, unit);
Java中的Lock重入锁,是借助于一个voaltile的一个state变量来记录重入的状态。
- 如果当前没有人持有这把锁,那么
state = 0
- 如果有人持有这把锁,那么
state = 1
- 如果持有者把锁的人再次持有这把锁,那么
state
会+1
Java中的
synchronize
重入锁
- 对于
synchronize
而言,在c语言代码中会有一个count- 原理与
state
类似,也是重入一次就+1
,释放一次就-1
,直至减到0,表示这把锁没有被人持有
在Redisson中,采用了Redis中的hash结构来实现可重入锁。通过hash结构来实现重入锁加1减1的操作
逻辑图:
Lua入门:https://www.runoob.com/lua/lua-tutorial.html
为了保证原子性,Redisson内部采用了Lua脚本的方式执行相关的命令
Lua脚本实现重入锁:
if (redis.call('exists', KEYS[1]) == 0) // 判断锁是否存在,等于0代表不存在,否则存在
then redis.call('hset', KEYS[1], ARGV[2], 1);
// 不存在,则创建锁,重入次数加1redis.call('pexpire', KEYS[1], ARGV[1]); // 设置锁的过期时间return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) // 判断锁是否是自己,等于1则是自己
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
// 自增1redis.call('pexpire', KEYS[1], ARGV[1]); // 设置锁的过期时间return nil;
end;
return redis.call('pttl', KEYS[1]);// 获取锁失败,并返回旧锁的剩余时间
注:该Lua脚本在 RedissonLock 类的 tryLockInnerAsync 方法
Lua脚本释放重入锁:
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) // 判断锁是否存在
then return nil;// 不存在,结束
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); // 获取锁,重入次数减1
if (counter > 0)
then redis.call('pexpire', KEYS[1], ARGV[2]); // 重置锁的有效期return 0;
else redis.call('del', KEYS[1]); // 删除锁redis.call('publish', KEYS[2], ARGV[1]); // 通知其他线程,锁已释放return 1;
end;
return nil;
注:该Lua脚本在 RedissonLock 类的 unlockInnerAsync 方法
Redisson重入锁的执行原理
Redisson的WatchDog机制
源码:
图一:通过判断 leaseTime 判断是否等于-1,如果等于-1,则调用 tryLockInnerAsync 方法尝试获取锁,如果锁成功获取了,则调用 scheduleExpirationRenewal 方法
图二:该 scheduleExpirationRenewal 方法分为两步,判断是否该锁已存在,如果存在则进行重入次数加1(如图三),如果不存在,在重入次数加1后调用 renewExpiration 方法
图四:renewExpiration 方法的本质是一个定时任务,通过递归的方式定时自动刷新锁的过期时间,刷新时间的方法是 renewExpirationAsync
Redisson的锁重试机制
锁重试,必然是由于当前线程没有成功获取到锁,Redisson内部通过不断判断是否有剩余重试时间来操作重试机制,先使用 subscribe 方法订阅其他锁是否存在已经释放的信号,如果在剩余重试时间内没有等到其他线程释放锁的信号,则取消订阅,并返回false获取锁失败。如果在剩余时间内等到了其他线程释放锁的信号,那么会先计算是否还有剩余时间,如果有时间则通过dowhile循环尝试获取锁。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();// 尝试获取锁Long ttl = this.tryAcquire(leaseTime, unit, threadId);// 如果ttl为空,代表成功获取锁if (ttl == null) {return true;} else {// 计算重试的剩余时间time -= System.currentTimeMillis() - current;if (time <= 0L) {this.acquireFailed(threadId);// 没时间了 返回falsereturn false;} else {current = System.currentTimeMillis();// 订阅其他线程是否有释放锁的信号RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);// 在time这个时间内等待释放锁的信号if (!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {// 取消订阅this.unsubscribe(subscribeFuture, threadId);}});}// 在time时间内没有等到信号,返回false,获取锁失败this.acquireFailed(threadId);return false;} else {// 等到了缩放锁的信号try {// 计算是否还有重试时间time -= System.currentTimeMillis() - current;if (time <= 0L) {this.acquireFailed(threadId);boolean var20 = false;return var20;} else {// 有时间,则通过循环尝试获取锁boolean var16;do {long currentTime = System.currentTimeMillis();ttl = this.tryAcquire(leaseTime, unit, threadId);if (ttl == null) {var16 = true;return var16;}time -= System.currentTimeMillis() - currentTime;if (time <= 0L) {this.acquireFailed(threadId);var16 = false;return var16;}currentTime = System.currentTimeMillis();if (ttl >= 0L && ttl < time) {this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;} while(time > 0L);this.acquireFailed(threadId);var16 = false;return var16;}} finally {// 取消订阅this.unsubscribe(subscribeFuture, threadId);}}}}}
Redisson的MutiLock原理
在Redis的主从模式和哨兵模式中,都需要把主节点的数据发送到从节点,在这个过程中,如果主节点发生了故障宕机,就会导致锁失效从而引发线程安全问题。
由此,Redisson提出来了MutiLock锁,MutiLock锁的思想是每次加锁时,对多个节点同时加锁,把这多个节点都认为是主节点,只有把锁成功的加到每个节点上,才认为是加锁成功。如果其中有一个主节点发生故障宕机,也不会影响其他主机点,锁的信息在其他主节点也存在。保证了加锁的可靠性。
先定义多个节点
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://192.168.0.1:6379").setPassword("root");return Redisson.create(config);}@Beanpublic RedissonClient redissonClient2() {Config config = new Config();config.useSingleServer().setAddress("redis://92.168.0.1:6379").setPassword("root");return Redisson.create(config);}@Beanpublic RedissonClient redissonClient3() {Config config = new Config();config.useSingleServer().setAddress("redis://92.168.0.1:6379").setPassword("root");return Redisson.create(config);}
}
使用MutiLock锁
@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;private RLock lock;@BeforeEach
void setUp() {RLock lock1 = redissonClient.getLock("lock");RLock lock2 = redissonClient2.getLock("lock");RLock lock3 = redissonClient3.getLock("lock");lock = redissonClient.getMultiLock(lock1, lock2, lock3);
}@Test
void method1() {boolean success = lock.tryLock();if (!success) {log.error("获取锁失败,1");return;}try {log.info("获取锁成功");method2();} finally {log.info("释放锁,1");lock.unlock();}
}void method2() {RLock lock = redissonClient.getLock("lock");boolean success = lock.tryLock();if (!success) {log.error("获取锁失败,2");return;}try {log.info("获取锁成功,2");} finally {log.info("释放锁,2");lock.unlock();}
}
MutiLock锁的tryLock方法源码
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long newLeaseTime = -1L;//如果传入了释放时间if (leaseTime != -1L) {//再判断一下是否有等待时间if (waitTime == -1L) {//如果没传等待时间,不重试,则只获得一次newLeaseTime = unit.toMillis(leaseTime);} else {//想要重试,耗时较久,万一释放时间小于等待时间,则会有问题,所以这里将等待时间乘以二newLeaseTime = unit.toMillis(waitTime) * 2L;}}//获取当前时间long time = System.currentTimeMillis();//剩余等待时间long remainTime = -1L;if (waitTime != -1L) {remainTime = unit.toMillis(waitTime);}//锁等待时间,与剩余等待时间一样 long lockWaitTime = this.calcLockWaitTime(remainTime);//锁失败的限制,源码返回是的0int failedLocksLimit = this.failedLocksLimit();//已经获取成功的锁List<RLock> acquiredLocks = new ArrayList(this.locks.size());//迭代器,用于遍历ListIterator<RLock> iterator = this.locks.listIterator();while(iterator.hasNext()) {RLock lock = (RLock)iterator.next();boolean lockAcquired;try {//没有等待时间和释放时间,调用空参的tryLockif (waitTime == -1L && leaseTime == -1L) {lockAcquired = lock.tryLock();} else {//否则调用带参的tryLocklong awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException var21) {this.unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception var22) {lockAcquired = false;}//判断获取锁是否成功if (lockAcquired) {//成功则将锁放入成功锁的集合acquiredLocks.add(lock);} else {//如果获取锁失败//判断当前锁的数量,减去成功获取锁的数量,如果为0,则所有锁都成功获取,跳出循环if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) {break;}//否则将拿到的锁都释放掉if (failedLocksLimit == 0) {this.unlockInner(acquiredLocks);//如果等待时间为-1,则不想重试,直接返回falseif (waitTime == -1L) {return false;}failedLocksLimit = this.failedLocksLimit();//将已经拿到的锁都清空acquiredLocks.clear();//将迭代器往前迭代,相当于重置指针,放到第一个然后重试获取锁while(iterator.hasPrevious()) {iterator.previous();}} else {--failedLocksLimit;}}//如果剩余时间不为-1,很充足if (remainTime != -1L) {//计算现在剩余时间remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();//如果剩余时间为负数,则获取锁超时了if (remainTime <= 0L) {//将之前已经获取到的锁释放掉,并返回falsethis.unlockInner(acquiredLocks);//联锁成功的条件是:每一把锁都必须成功获取,一把锁失败,则都失败return false;}}}//如果设置了锁的有效期if (leaseTime != -1L) {List<RFuture<Boolean>> futures = new ArrayList(acquiredLocks.size());//迭代器用于遍历已经获取成功的锁Iterator var24 = acquiredLocks.iterator();while(var24.hasNext()) {RLock rLock = (RLock)var24.next();//设置每一把锁的有效期RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}var24 = futures.iterator();while(var24.hasNext()) {RFuture<Boolean> rFuture = (RFuture)var24.next();rFuture.syncUninterruptibly();}}//但如果没设置有效期,则会触发WatchDog机制,自动帮我们设置有效期,所以大多数情况下,我们不需要自己设置有效期return true;
}
总结:
利用Hash结构,记录线程标识和重入次数,解决了锁的可重入
利用WatchDog机制,通过内部定时器延续锁的时间,解决了锁的超时释放
利用信号量和重试时间控制锁重试,解决了不可重试问题
利用multiLock机制,解决主从一致性问题