面试切入点
锁的分类
- 单机版同一个JVM虚拟机内,synchronized或者Lock接口
- 分布式多个不同JVM虚拟机,单机的线程锁不再起作用,资源类在不同的服务器之间共享了
一个靠谱分布式锁需要具备的条件与刚需
- 独占性:onlyOne,任何时刻只能有且仅有一个线程持有
- 高可用:若redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况。高并发请求下,依旧性能OK好使
- 防死锁:杜绝死锁,必须有超时控制机制或者撤销机制操作,有个兜底终止跳出方案。
- 不乱抢:防止张冠李戴,不能私下unlock别人的锁,只能自己加锁,自己释放锁,自己的锁含着泪也要自己去解
- 重入性:同一个节点的同一个线程如果获得锁后,它也可以再次获取这个锁。
分布式锁
setnx key value
set key value [EX seconds] [PX milliseconds] [NX|XX]
案例演示扣减库存
V1版本:JVM可重入锁的版本
private ReentrantLock lock = new ReentrantLock();//V1.0 基础版本public String saleV1(){String retMessage="";lock.lock();try {//查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//判断库存是否足够Integer inventory =result==null?0: Integer.valueOf(result);//扣减库存if(inventory>0){stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));retMessage="成功卖出一个商品,库存剩余:"+inventory;System.out.println(retMessage+"\t"+"服务端口号"+port);}else{retMessage="商品卖完了";}}finally {lock.unlock();}return retMessage+"\t"+"服务端口号"+port;}
swagger结果
V2版本:分布式部署,将V1版本copy一份,端口为8888,同时用nginx路由转发
docker部署nginx
本地做好nginx.conf与宿主机的映射
nginx.conf的默认配置
#
#user nobody;
worker_processes 1;#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;#pid logs/nginx.pid;events {worker_connections 1024;
}http {include mime.types;default_type application/octet-stream;#log_format main '$remote_addr - $remote_user [$time_local] "$request" '# '$status $body_bytes_sent "$http_referer" '# '"$http_user_agent" "$http_x_forwarded_for"';#access_log logs/access.log main;sendfile on;#tcp_nopush on;#keepalive_timeout 0;keepalive_timeout 65;#gzip on;server {listen 80;server_name localhost;#charset koi8-r;#access_log logs/host.access.log main;location / {root html;index index.html index.htm;}#error_page 404 /404.html;# redirect server error pages to the static page /50x.html#error_page 500 502 503 504 /50x.html;location = /50x.html {root html;}# proxy the PHP scripts to Apache listening on 127.0.0.1:80##location ~ \.php$ {# proxy_pass http://127.0.0.1;#}# pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000##location ~ \.php$ {# root html;# fastcgi_pass 127.0.0.1:9000;# fastcgi_index index.php;# fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name;# include fastcgi_params;#}# deny access to .htaccess files, if Apache's document root# concurs with nginx's one##location ~ /\.ht {# deny all;#}}# another virtual host using mix of IP-, name-, and port-based configuration##server {# listen 8000;# listen somename:8080;# server_name somename alias another.alias;# location / {# root html;# index index.html index.htm;# }#}# HTTPS server##server {# listen 443 ssl;# server_name localhost;# ssl_certificate cert.pem;# ssl_certificate_key cert.key;# ssl_session_cache shared:SSL:1m;# ssl_session_timeout 5m;# ssl_ciphers HIGH:!aNULL:!MD5;# ssl_prefer_server_ciphers on;# location / {# root html;# index index.html index.htm;# }#}
}
docker 启动命令
docker run -d --name nginx -p 80:80 -v /home/run/nginx/conf/nginx.conf:/etc/nginx/nginx.conf -v /home/run/nginx/html:/etc/nginx/html docker.1ms.run/library/nginx
nginx验证
修改nginx的配置加上负载均衡+反方向代理
负载均衡的效果
手工点击是OK的,模拟高并发100个请求
redis中还有多少数据
重复下单数据,出现了超卖现象
为什么加了 synchronized 或者 Lock 还是没有控制住?
分布式锁的出现
- 跨进程+跨服务
- 解决超卖
- 防止缓存击穿
redis分布式锁V1版本
//V2版本:public String saleV2(){String retMessage="";String key="redisLock";String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();//分布式锁的设置Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);//抢不到的线程继续重试if(!flag){//暂停20毫秒,递归重试try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}saleV2();}else{//抢锁成功的线程继续进行正常的业务逻辑操作 扣减库存try {//查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//判断库存是否足够Integer inventory =result==null?0: Integer.valueOf(result);//扣减库存if(inventory>0){stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));retMessage="成功卖出一个商品,库存剩余:"+inventory;System.out.println(retMessage+"\t"+"服务端口号"+port);}else{retMessage="商品卖完了";}} finally {// 释放分布式锁stringRedisTemplate.delete(key);}}return retMessage+"\t"+"服务端口号"+port;}
结果
扣减库存为0
存在的问题
测试手工OK,测试Jmeter压测5000OK
递归是一种思想没错,但是容易导致StackOverflowError,不太推荐,进一步完善
多线程判断想想JUC里面说过的虚假唤醒,用while替代if
用自旋替代递归重试
redis分布式锁V2版本:用while替换if,用自旋替换递归
//V3版本:public String saleV3(){String retMessage="";String key="redisLock";String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();//分布式锁的设置Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);// 用自旋替代递归、用while替换ifwhile (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)){//暂停20毫秒try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}}//抢锁成功的线程继续进行正常的业务逻辑操作 扣减库存try {//查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//判断库存是否足够Integer inventory =result==null?0: Integer.valueOf(result);//扣减库存if(inventory>0){stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));retMessage="成功卖出一个商品,库存剩余:"+inventory;System.out.println(retMessage+"\t"+"服务端口号"+port);}else{retMessage="商品卖完了";}} finally {// 释放分布式锁stringRedisTemplate.delete(key);}return retMessage+"\t"+"服务端口号"+port;}
上面版本存在的问题:
部署了微服务的Java程序机器挂了,代码层面根本没有走到finally这块,
没办法保证解锁(无过期时间该key一直存在),这个key没有被删除,需要加入一个过期时间限定key
redis分布式锁版本3.0:宕机与过期+防止死锁
//V4版本:public String saleV4(){String retMessage="";String key="redisLock";String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();// 用自旋替代递归、用while替换ifwhile (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)){//暂停20毫秒try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}}//添加过期时间stringRedisTemplate.expire(key,30L,TimeUnit.SECONDS);//抢锁成功的线程继续进行正常的业务逻辑操作 扣减库存try {//查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//判断库存是否足够Integer inventory =result==null?0: Integer.valueOf(result);//扣减库存if(inventory>0){stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));retMessage="成功卖出一个商品,库存剩余:"+inventory;System.out.println(retMessage+"\t"+"服务端口号"+port);}else{retMessage="商品卖完了";}} finally {// 释放分布式锁stringRedisTemplate.delete(key);}return retMessage+"\t"+"服务端口号"+port;}
存在的问题
设置key+过期时间分开了,必须要合并成一行具备原子性
redis 分布式锁版本3.1
public String saleV4(){String retMessage="";String key="redisLock";String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();// 用自旋替代递归、用while替换ifwhile (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)){//暂停20毫秒try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}}//添加过期时间//stringRedisTemplate.expire(key,30L,TimeUnit.SECONDS);//抢锁成功的线程继续进行正常的业务逻辑操作 扣减库存try {//查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//判断库存是否足够Integer inventory =result==null?0: Integer.valueOf(result);//扣减库存if(inventory>0){stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));retMessage="成功卖出一个商品,库存剩余:"+inventory;System.out.println(retMessage+"\t"+"服务端口号"+port);}else{retMessage="商品卖完了";}} finally {// 释放分布式锁stringRedisTemplate.delete(key);}return retMessage+"\t"+"服务端口号"+port;}
结论:加锁与过期时间必须同一行,保证原子性
redis分布式锁版本4:防止key误删的问题
实际业务处理时间如果超过了默认设置key的过期时间??尴尬 ̄□ ̄||
张冠李戴,删除了别人的锁
解决: 只能自己删除自己的,不许动别人的
//V5版本:public String saleV5(){String retMessage="";String key="redisLock";String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();// 用自旋替代递归、用while替换ifwhile (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)){//暂停20毫秒try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}}//添加过期时间//stringRedisTemplate.expire(key,30L,TimeUnit.SECONDS);//抢锁成功的线程继续进行正常的业务逻辑操作 扣减库存try {//查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//判断库存是否足够Integer inventory =result==null?0: Integer.valueOf(result);//扣减库存if(inventory>0){stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));retMessage="成功卖出一个商品,库存剩余:"+inventory;System.out.println(retMessage+"\t"+"服务端口号"+port);}else{retMessage="商品卖完了";}} finally {// 改进点:只能删除属于自己的key,不能删除别人的if(stringRedisTemplate.opsForValue().get(key).equals(uuidValue)){stringRedisTemplate.delete(key);}}return retMessage+"\t"+"服务端口号"+port;}
redis分布式锁5.0版本:Lua脚本保证原子性
上个版本,finally块的判断+del删除操作不是原子性的
Lua脚本
官方脚本
Redis调用Lua脚本通过eval命令保证代码执行的原子性,直接用return返回脚本执行后的结果。
eval "redis.call('set', 'k1', 'v1') redis.call('expire', 'k1', '30') return redis.call('get', 'k1')" 0
eval "return redis.call('mset',KEYS[1],ARGV[1],KEYS[2],ARGV[2])" 2 k1 k2 lua1 lua2
redis get+del命令Lua脚本的原子操作
eval "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" 1 redisLock 12345
Lua脚本条件判断分支
redis分布式锁V5版本
//V6版本:public String saleV6(){String retMessage="";String key="redisLock";String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();// 用自旋替代递归、用while替换ifwhile (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)){//暂停20毫秒try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}}//添加过期时间//stringRedisTemplate.expire(key,30L,TimeUnit.SECONDS);//抢锁成功的线程继续进行正常的业务逻辑操作 扣减库存try {//查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//判断库存是否足够Integer inventory =result==null?0: Integer.valueOf(result);//扣减库存if(inventory>0){stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));retMessage="成功卖出一个商品,库存剩余:"+inventory;System.out.println(retMessage+"\t"+"服务端口号"+port);}else{retMessage="商品卖完了";}} finally {//改进点,修改为Lua脚本的redis分布式锁调用,必须保证原子性,参考官网脚本案例String luaScript ="if redis.call('get',KEYS[1]) == ARGV[1] then " +"return redis.call('del',KEYS[1]) " +"else " +"return 0 " +"end";stringRedisTemplate.execute(new DefaultRedisScript(luaScript,Boolean.class), Arrays.asList(key),uuidValue);}return retMessage+"\t"+"服务端口号"+port;}
redis分布式锁V6:可重入锁+设计模式
上一个版本中while判断并自旋重试获取锁+setnx含自然过期+Lua脚本官网删除锁的命令。
存在的问题:如何兼顾锁的可重入性问题
写好一个锁的条件与规约
可重入锁(又名递归锁)
可重入锁又名递归锁。是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。
如果是1个有 synchronized 修饰的递归调用方法,程序第2次进入被自己阻塞了岂不是天大的笑话,出现了作茧自缚。所以Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。
”可重入锁“这四个字分开来解释:
可(可以)重(再次)入(进入)锁(同步锁)
进入什么?进入同步域(即同步代码块/方法或显式锁锁定的代码)
一句话:一个线程中的多个流程可以获取同一把锁,持有这把同步锁可以再次进入。自己可以获取自己的内部锁。
可重入锁的分类(隐式锁与显式锁)
隐式锁:也就是synchronized关键字使用的锁,默认是可重入锁。指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁,这样的锁就叫做可重入锁。
简单的来说就是:在一个synchronized修饰的方法或代码块的内部调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的与可重入锁相反,不可重入锁不可递归调用,递归调用就发生死锁
上面的三种情况出现的锁中锁,如果没有可重入性(持有同一把锁)就会产生死锁了
同步块
public class ReEntryLockDemo
{public static void main(String[] args){final Object objectLockA = new Object();new Thread(() -> {synchronized (objectLockA){System.out.println("-----外层调用");synchronized (objectLockA){System.out.println("-----中层调用");synchronized (objectLockA){System.out.println("-----内层调用");}}}},"a").start();}
}
同步方法
/*** 在一个Synchronized修饰的方法或代码块的内部调用本类的其他Synchronized修饰的方法或代码块时,是永远可以得到锁的*/
public class ReEntryLockDemo
{public synchronized void m1(){System.out.println("-----m1");m2();}public synchronized void m2(){System.out.println("-----m2");m3();}public synchronized void m3(){System.out.println("-----m3");}public static void main(String[] args){ReEntryLockDemo reEntryLockDemo = new ReEntryLockDemo();reEntryLockDemo.m1();}
}
Synchronized锁重入的实现原理
显式锁(即Lock) 也有ReentrantLock这样的可重入锁
/*** 在一个Synchronized修饰的方法或代码块的内部调用本类的其他Synchronized修饰的方法或代码块时,是永远可以得到锁的*/
public class ReEntryLockDemo
{static Lock lock = new ReentrantLock();public static void main(String[] args){new Thread(() -> {lock.lock();try{System.out.println("----外层调用lock");lock.lock();try{System.out.println("----内层调用lock");}finally {// 这里故意注释,实现加锁次数和释放次数不一样// 由于加锁次数和释放次数不一样,第二个线程始终无法获取到锁,导致一直在等待。lock.unlock(); // 正常情况,加锁几次就要解锁几次}}finally {lock.unlock();}},"a").start();new Thread(() -> {lock.lock();try{System.out.println("b thread----外层调用lock");}finally {lock.unlock();}},"b").start();}
}
切记:一般而言,lock了几次就要unlock几次
思考:上面可重入锁的计数问题,redis中的哪个数据类型可以代替?
K,K,V
Map<String,Map<Object,Object>>
案例
hincrby 加了几次1 最后再减去几次1,直到为0。也就是可重入性的lock几次再unlock几次。
小总结:
setnx只能解决无的问题,够用但不够完美。hset,不但可以解决有无,还可以解决可重入性的问题。
设计重点(两条线)
目前有2条支线,目的是保证同一个时候只能有一个线程持有锁进去redis做扣减库存的动作。
2个分分支
-
保证加锁、解锁(lock\unlock)
-
扣减库存redis命令的原子性
Lua脚本实现Lock与Unlock的操作
加锁lua脚本lock
先判断redis分布式锁这个key是否存在。
Exists Key 返回0 说明不存在,hset新建当前线程属于自己的锁BY UUID:ThreadId.
Exists Key返回1 说明已有锁,需要进一步判断是不是当前线程自己的。HEXISTS key uuid:ThreadID 返回0 说明不是自己的,返回1说明是自己的锁,自增1次表示重入。
V1版本的Lua脚本
if redis.call('exists','key') == 0 thenredis.call('hset','key','uuid:threadid',1)redis.call('expire','key',30)return 1elseif redis.call('hexists','key','uuid:threadid') == 1 thenredis.call('hincrby','key','uuid:threadid',1)redis.call('expire','key',30)return 1elsereturn 0end
相同部分是否可以替换处理???
hincrby命令可否替代为hset命令??
V2版本
if redis.call('exists','key') == 0 or redis.call('hexists','key','uuid:threadid') == 1 thenredis.call('hincrby','key','uuid:threadid',1)redis.call('expire','key',30)return 1
elsereturn 0
end
KEYS[1]与ARGV[1]的参数化提取与处理
if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 elsereturn 0
end
测试
EVAL "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end" 1 redisLock 0c90d37cb6ec42268861b3d739f8b3a8:1 30
解锁lua脚本unlock
设计思路:有锁且还是自己的锁
Hexists key uuid:ThreadId 返回0,说明根本没有锁,程序块返回nil。不是0,说明有锁且是自己的锁,直接调用HINCRBY 负1,表示每次减个1,解锁1次。直到它变为0表示可以删除该锁key,del 锁key
V1版本
if redis.call('HEXISTS',lock,uuid:threadID) == 0 thenreturn nil
elseif redis.call('HINCRBY',lock,uuid:threadID,-1) == 0 thenreturn redis.call('del',lock)
else return 0
end
V2版本参数化处理
if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 thenreturn nilelseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 thenreturn redis.call('del',KEYS[1])elsereturn 0end
测试
eval "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end" 1 redisLock 2f586ae740a94736894ab9d51880ed9d:1
整合到微服务代码中
RedisLock实现Lock接口
package com.atguigu.redislock.mylock;import cn.hutool.core.util.IdUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Arrays;
import java.util.PrimitiveIterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** 自研分布式锁,实现Lock接口*/
public class RedisLock implements Lock {private StringRedisTemplate stringRedisTemplate;private String lockName;private String uuidValue;private long expireTime;public RedisLock(StringRedisTemplate stringRedisTemplate, String lockName) {this.stringRedisTemplate = stringRedisTemplate;this.lockName = lockName;this.uuidValue= IdUtil.simpleUUID()+":"+Thread.currentThread().getId();this.expireTime = 50L;}@Overridepublic void lock() {tryLock();}@Overridepublic void unlock() {String script="if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then" +" return nil " +"elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +"return redis.call('del',KEYS[1])" +" else " +"return 0" +" end";// nil ==false 1==true 0==falseLong executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));if(null==executeFlag){throw new RuntimeException("this lock doesnt exists");}}@Overridepublic boolean tryLock() {try {tryLock(-1L,TimeUnit.SECONDS);} catch (InterruptedException e) {throw new RuntimeException(e);}return false;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if(time==-1L){String script="if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +" redis.call('hincrby',KEYS[1],ARGV[1],1) " +"redis.call('expire',KEYS[1],ARGV[2])" +" return 1 " +"else" +" return 0 end";Boolean executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));while (!executeFlag){//60 ms后再重试TimeUnit.MILLISECONDS.sleep(60);executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));}return true;}return false;}@Overridepublic Condition newCondition() {return null;}@Overridepublic void lockInterruptibly() throws InterruptedException {}
}
调用redisLock的lock与unlock方法
private Lock redisLock = new RedisLock(stringRedisTemplate,"redisLock");//V7版本public String saleV7(){String retMessage="";redisLock.lock();try {//查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//判断库存是否足够Integer inventory =result==null?0: Integer.valueOf(result);//扣减库存if(inventory>0){stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));retMessage="成功卖出一个商品,库存剩余:"+inventory;System.out.println(retMessage+"\t"+"服务端口号"+port);}else{retMessage="商品卖完了";}}finally {redisLock.unlock();}return retMessage+"\t"+"服务端口号"+port;}
利用工厂模式进行优化
package com.atguigu.redislock.mylock;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.locks.Lock;@Component
public class DistributeLockFactory {@Autowiredprivate StringRedisTemplate stringRedisTemplate;private String lockName;public Lock getDistributeLock(String lockType) {if(lockType==null){return null;}if(lockType.equalsIgnoreCase("REDIS")){this.lockName = "redisLock";return new RedisLock(stringRedisTemplate,lockName);}else if (lockType.equalsIgnoreCase("zookeeper")){this.lockName = "zookeeperLock";//TODOreturn null;}else if (lockType.equalsIgnoreCase("mysql")){this.lockName = "mysqlLock";//TODOreturn null;}return null;}
}
@Autowiredprivate DistributeLockFactory distributeLockFactory;//V7版本public String saleV7(){String retMessage="";Lock redisLock= distributeLockFactory.getDistributeLock("redis");redisLock.lock();try {//查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//判断库存是否足够Integer inventory =result==null?0: Integer.valueOf(result);//扣减库存if(inventory>0){stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));retMessage="成功卖出一个商品,库存剩余:"+inventory;System.out.println(retMessage+"\t"+"服务端口号"+port);}else{retMessage="商品卖完了";}}finally {redisLock.unlock();}return retMessage+"\t"+"服务端口号"+port;}
压测+验证
redis中的库存
后台日志
可重入性验证代码
//V7版本public String saleV7(){String retMessage="";Lock redisLock= distributeLockFactory.getDistributeLock("redis");redisLock.lock();try {//查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//判断库存是否足够Integer inventory =result==null?0: Integer.valueOf(result);//扣减库存if(inventory>0){stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));retMessage="成功卖出一个商品,库存剩余:"+inventory;System.out.println(retMessage+"\t"+"服务端口号"+port);testReentry();}else{retMessage="商品卖完了";}}finally {redisLock.unlock();}return retMessage+"\t"+"服务端口号"+port;}private void testReentry() {Lock redisLock= distributeLockFactory.getDistributeLock("redis");redisLock.lock();try {System.out.println("==========测试可重入锁==================");}finally {redisLock.unlock();}}
出现的问题
线程Id一致,但是uuid不一致
问题修复
package com.atguigu.redislock.mylock;import cn.hutool.core.util.IdUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.locks.Lock;@Component
public class DistributeLockFactory {@Autowiredprivate StringRedisTemplate stringRedisTemplate;private String lockName;private String uuid;public DistributeLockFactory(){this.uuid = IdUtil.simpleUUID();}public Lock getDistributeLock(String lockType) {if(lockType==null){return null;}if(lockType.equalsIgnoreCase("REDIS")){this.lockName = "redisLock";return new RedisLock(stringRedisTemplate,lockName,uuid);}else if (lockType.equalsIgnoreCase("zookeeper")){this.lockName = "zookeeperLock";//TODOreturn null;}else if (lockType.equalsIgnoreCase("mysql")){this.lockName = "mysqlLock";//TODOreturn null;}return null;}
}
package com.atguigu.redislock.mylock;import cn.hutool.core.util.IdUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Arrays;
import java.util.PrimitiveIterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** 自研分布式锁,实现Lock接口*/
public class RedisLock implements Lock {private StringRedisTemplate stringRedisTemplate;private String lockName;private String uuidValue;private long expireTime;public RedisLock(StringRedisTemplate stringRedisTemplate, String lockName) {this.stringRedisTemplate = stringRedisTemplate;this.lockName = lockName;this.uuidValue= IdUtil.simpleUUID()+":"+Thread.currentThread().getId();this.expireTime = 25L;}public RedisLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid) {this.stringRedisTemplate = stringRedisTemplate;this.lockName = lockName;this.uuidValue= uuid;this.expireTime = 25L;}@Overridepublic void lock() {tryLock();}@Overridepublic void unlock() {String script="if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then" +" return nil " +"elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +"return redis.call('del',KEYS[1])" +" else " +"return 0" +" end";// nil ==false 1==true 0==falseSystem.out.println("unlock lockName:"+lockName+"\t"+"uuidValue:"+uuidValue+"\t");Long executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));if(null==executeFlag){throw new RuntimeException("this lock doesnt exists");}}@Overridepublic boolean tryLock() {try {tryLock(-1L,TimeUnit.SECONDS);} catch (InterruptedException e) {throw new RuntimeException(e);}return false;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if(time==-1L){String script="if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +" redis.call('hincrby',KEYS[1],ARGV[1],1) " +"redis.call('expire',KEYS[1],ARGV[2])" +" return 1 " +"else" +" return 0 end";System.out.println("lockName:"+lockName+"\t"+"uuidValue:"+uuidValue+"\t");Boolean executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));while (!executeFlag){//60 ms后再重试TimeUnit.MILLISECONDS.sleep(60);executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));}return true;}return false;}@Overridepublic Condition newCondition() {return null;}@Overridepublic void lockInterruptibly() throws InterruptedException {}
}
测试验证
自动续期
确保redisLock的过期时间大于业务执行时间的问题。redis分布式锁如何续期??
CAP
-
redis集群是AP
redis异步复制造成的锁丢失
比如:主节点没来的及把刚刚set进来这条数据给从节点,master就挂了,从机上位但从机上无该数据 -
zookeeper集群是CP
故障
-
Eureka集群是AP
-
Nacos集群是AP
加个钟 Lua脚本
redis分布式锁加锁成功后续期
package com.atguigu.redislock.mylock;import cn.hutool.core.util.IdUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Arrays;
import java.util.PrimitiveIterator;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** 自研分布式锁,实现Lock接口*/
public class RedisLock implements Lock {private StringRedisTemplate stringRedisTemplate;private String lockName;private String uuidValue;private long expireTime;public RedisLock(StringRedisTemplate stringRedisTemplate, String lockName) {this.stringRedisTemplate = stringRedisTemplate;this.lockName = lockName;this.uuidValue= IdUtil.simpleUUID()+":"+Thread.currentThread().getId();this.expireTime = 25L;}public RedisLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid) {this.stringRedisTemplate = stringRedisTemplate;this.lockName = lockName;this.uuidValue= uuid;this.expireTime = 30L;}@Overridepublic void lock() {tryLock();}@Overridepublic void unlock() {String script="if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then" +" return nil " +"elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +"return redis.call('del',KEYS[1])" +" else " +"return 0" +" end";// nil ==false 1==true 0==falseSystem.out.println("unlock lockName:"+lockName+"\t"+"uuidValue:"+uuidValue+"\t");Long executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));if(null==executeFlag){throw new RuntimeException("this lock doesnt exists");}}@Overridepublic boolean tryLock() {try {tryLock(-1L,TimeUnit.SECONDS);} catch (InterruptedException e) {throw new RuntimeException(e);}return false;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if(time==-1L){String script="if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +" redis.call('hincrby',KEYS[1],ARGV[1],1) " +"redis.call('expire',KEYS[1],ARGV[2])" +" return 1 " +"else" +" return 0 end";System.out.println("lockName:"+lockName+"\t"+"uuidValue:"+uuidValue+"\t");Boolean executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));while (!executeFlag){//60 ms后再重试TimeUnit.MILLISECONDS.sleep(60);executeFlag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));}// 新建一个扫描程序,监控对应key的ttl是否到规定的1/3 进行续期resetExpireTime();return true;}return false;}private void resetExpireTime() {String script ="if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " +"return redis.call('expire',KEYS[1],ARGV[2]) " +"else " +"return 0 " +"end";// time调度方法new Timer().schedule(new TimerTask(){@Overridepublic void run(){if (stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) {resetExpireTime();}}},(this.expireTime * 1000)/3);}@Overridepublic Condition newCondition() {return null;}@Overridepublic void lockInterruptibly() throws InterruptedException {}
}
//V8版本public String saleV8(){String retMessage="";Lock redisLock= distributeLockFactory.getDistributeLock("redis");redisLock.lock();try {//查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory001");//判断库存是否足够Integer inventory =result==null?0: Integer.valueOf(result);//扣减库存if(inventory>0){stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));retMessage="成功卖出一个商品,库存剩余:"+inventory;System.out.println(retMessage+"\t"+"服务端口号"+port);try {TimeUnit.SECONDS.sleep(120);} catch (InterruptedException e) {throw new RuntimeException(e);}}else{retMessage="商品卖完了";}}finally {redisLock.unlock();}return retMessage+"\t"+"服务端口号"+port;}
验证续期成功
redis分布式锁总结
视频链接
Redis分布式锁