0、前言
本文所有代码可见 => 【gitee code demo】
本文涉及的主题:
- 为什么使用分布式锁:单机锁在集群中会失效
- 分布式锁的特征 & 手写redis分布式锁
- redisson
1、单机锁在集群失效问题演示
使用nginx 搭建集群服务
直接启动,获取初始文件
docker run -p 80:80 –name nginx -d nginx:1.10
复制要挂载的文件到宿主机
docker cp cf15f5516cc0:/etc/nginx/ /usr/local/nginx/conf/
docker cp cf15f5516cc0:/var/log/nginx /usr/local/nginx/logs/
docker cp cf15f5516cc0:/usr/share/nginx/html /usr/local/nginx/html/
路径结构如下
修改配置文件
/usr/local/nginx/conf/nginx.conf
/usr/local/nginx/conf/conf.d/default.conf
upstream 命名不支持
_
,改为任意合法名称即可。例如:nginx-redis
配置负载均衡到 两个业务端口
删除原容器,启动新容器
docker rm $(docker stop nginx)docker run -p 80:80 --name nginx \-v /usr/local/nginx/html:/usr/share/nginx/html \-v /usr/local/nginx/logs:/var/log/nginx \-v /usr/local/nginx/conf/:/etc/nginx \-d nginx:1.10
jmeter 测试,出现了重复消费问题
redis库存: set inventory001 100
测试计划:lock锁集群.jmx
集群服务:端口 7777、8888
解释
synchronized 和 lock 只对同一个进程下的多线程有效,多进程不管用(多进程是多个锁)
2、分布式锁
独占性 setnx
public String sale() {String key = "RedisLock";String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();// setnx <key, value>while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)) {//暂停20毫秒,类似CAS自旋try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}}try {// 业务代码} finally {stringRedisTemplate.delete(key);}
}
超时释放 expire
public String sale() {String key = "RedisLock";String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();// setnx <key, value>// 原子性设置过期时间,防止宕机后出现死锁while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS)) {//暂停20毫秒,类似CAS自旋try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}}try {// 业务代码} finally {stringRedisTemplate.delete(key);}
}
安全性
唯一标识 requestId = uuid:threadId
public String sale() {String key = "RedisLock";String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();// setnx <key, value>// 原子性设置过期时间,防止宕机后出现死锁while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS)) {//暂停20毫秒,类似CAS自旋try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}}try {// 业务代码} finally { // 添加校验逻辑 防止误删if(stringRedisTemplate.opsForValue().get(key).equalsIgnoreCase(uuidValue)) {stringRedisTemplate.delete(key);}}
}
自动续期:业务没执行完之前,自动续期,防止提前删除,别的线程
private void renewExpire() {String script ="if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " +"return redis.call('expire',KEYS[1],ARGV[2]) " +"else " +"return 0 " +"end";new Timer().schedule(new TimerTask() {@Overridepublic void run() {if (stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) {renewExpire();}}}, expireTime/3);
}
原子性 lua脚本保证原子性
public String sale() {String key = "RedisLock";String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();// setnx <key, value>// 原子性设置过期时间,防止宕机后出现死锁while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS)) {//暂停20毫秒,类似CAS自旋try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}}try {// 业务代码} finally {// lua 脚本保证原子性String delLuaScript = "if (redis.call('get', keys[1]) == args[1]) then return redis.call('del', key[1]) else return 0 end";stringRedisTemplate.execute(RedisScript.of(delLuaScript, Boolean.class), Collections.singletonList(key), uuidValue);}
}
可重入性 hash(增加一个加锁次数的信号量)
redis.call(‘hexists’,KEYS[1],ARGV[1]) == 1 : 线程已经获取了锁
public String sale() {String lockScript ="if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 " +"then " +" redis.call('hincrby',KEYS[1],ARGV[1],1) " +" redis.call('expire',KEYS[1],ARGV[2]) " +" return 1 " +"else " +" return 0 " +"end";while (!stringRedisTemplate.execute(RedisScript.of(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) {try {TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {throw new RuntimeException(e);}}this.renewExpire();try {// 业务代码} catch (Exception e) {e.printStackTrace();} finally {String unlockScript ="if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " +" return nil " +"elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +" return redis.call('del',KEYS[1]) " +"else " +" return 0 " +"end";stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));}
}
redisson实现
高可用 Redisson
redis分布式锁失效的情况
redis-master宕机,由于redis复制是异步的,锁信息没同步到新的master,这里新线程获取锁成功,就导致redis分布式锁失效
官网描述
Redlock算法
使用 N 个独立的master,N = 2x + 1,尝试在全部N个master上获取锁,只有能在其中至少 (N/2+1)个master上获取到锁,才算加锁成功,否则解锁全部实例
官网描述
watchdog
高性能 读写锁
RReadWriteLock readWriteLock = redisson.getReadWriteLock("readWriteLock");
RLock writeLock = readWriteLock.writeLock();
RLock readLock = readWriteLock.readLock();
try {writeLock.lock();//业务操作
} catch (InterruptedException e) {log.error(e);
} finally {rLock.unlock();
}
通过将锁分为读锁与写锁,最大的提升之后就在与大大的提高系统的读性能,因为读锁与读锁之间是没有冲突的,不存在互斥,然后又因为业务系统中的读操作是远远多与写操作的,所以我们在提升了读锁的性能的同时,系统整体锁的性能都得到了提升
读写锁特点
- 读锁与读锁不互斥,可共享
- 读锁与写锁互斥
- 写锁与写锁互斥
3、总结
所以如果我们的业务场景,更需要数据的一致性,我们可以使用 CP
的分布式锁,例子 zookeeper
如果我们更需要的是保证数据的可用性,那么我们可以使用 AP
的分布式锁,例如 Redis