一、单体系统下,使用锁机制实现秒杀功能,并限制一人一单功能
1.流程图:
2.代码实现:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Autowiredprivate ISeckillVoucherService seckillVoucherService;@Autowiredprivate RedisIDWorker redisIDWorker;/*** 下单秒杀券* @param voucherId* @return*/@Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠券信息SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);//2.判断当前时间是否在优惠券的开始时间和结束时间内if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){return Result.fail("优惠券秒杀尚未开始");}if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("优惠券秒杀已经结束");}//3.判断库存是否充足if (seckillVoucher.getStock() < 1){return Result.fail("库存不足");}//加上锁Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()){//事务提交后才释放锁,避免事务未提交产生的线程安全问题//拿到代理对象-这样事务才会生效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}}@Transactionalpublic Result createVoucherOrder(Long voucherId) {//4.一人一单//4.1 查询数据库中是否有当前用户购买此秒杀券的记录Long userId = UserHolder.getUser().getId();long count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if(count>0){return Result.fail("此用户已经购买过一次了,不能重复购买");}//5.扣减库存boolean isDiscount = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0)//乐观锁:使用stock代替版本号- 减库存之前判断库存是否充足,防止超卖.update();if(!isDiscount){return Result.fail("库存不足");}//6.创建订单并保存VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIDWorker.nextId("order");voucherOrder.setVoucherId(voucherId);voucherOrder.setUserId(userId);voucherOrder.setId(orderId);save(voucherOrder);//7.返回订单IDreturn Result.ok(orderId);}
}
3.存在的并发安全问题
这里我们是使用用户ID作为锁对象的,但是在分布式系统下,有多台JVM,其内存空间是各自独立的,此时虽然用户ID的值是一样的,但是其userId.toString().intern()方法返回的对象只能保证在同一个JVM上是相同的,不同的JVM使用serId.toString().intern()方法返回的对象是不同的。
因此,对于不同JVM的线程,当前使用的方式并不能解决线程安全问题,也即,这种方法无法适配分布式系统。
4.解决方案-引入分布式锁
由于引发的问题是因为 :
在不同的JVM中获得到的锁对象是不同的,因此只要让它们获得的锁对象是同一个,那就可以解决这个问题。
4.1分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
4.2分布式锁的实现
二、使用Redis实现分布式锁
1.原理:
实现分布式锁时需要实现的两个基本方法:
(1)获取锁:
互斥:确保只能有一个线程获取锁
非阻塞:尝试一次,成功返回true,失败返回false
(2)释放锁:
手动释放
超时释放:设置过期时间
2.流程图:
3.代码改造:
1.ILock接口:(后续实现的锁都要实现这个接口)
public interface ILock {/*** 尝试获取锁* @param timeOutSec* @return true代表获取锁成功,false代表获取锁失败*/boolean tryLock(Long timeOutSec);/*** 释放锁*/void unlock();
}
2.SimpleRedisLock:
public class SimpleRedisLock implements ILock{private StringRedisTemplate stringRedisTemplate;private String name;private static final String KEY_PREFIX = "lock:";public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(Long timeOutSec) {//使用String 类型来实现锁,如果key存在,则无法设置新的值//value 为当前线程ID,为后面释放锁做准备String threadId = Thread.currentThread().getId() + "";Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeOutSec, TimeUnit.SECONDS);return BooleanUtil.isTrue(isLock);}@Overridepublic void unlock() {stringRedisTemplate.delete(KEY_PREFIX+name);}
}
3.Service代码改造:
将 seckillVoucher 中的:
//加上锁Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()){//事务提交后才释放锁,避免事务未提交产生的线程安全问题//拿到代理对象-这样事务才会生效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}
修改为:
//加上锁Long userId = UserHolder.getUser().getId();SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);boolean isLock = lock.tryLock(10L);if(!isLock){return Result.fail("请勿重复下单");}//拿到代理对象-这样事务才会生效try {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {lock.unlock();}
这样就实现了redis分布式锁在秒杀业务中的应用。
4.当前存在的问题-误删问题:
如上图,线程1在成功获取到锁,执行业务的时候发生阻塞,可能会由于超时释放锁。
此时线程2成功获取到锁,也在执行自己的业务,在线程2执行业务的时候,线程1又开始运行,在线程1执行完成之后会去释放这个锁。
此时如果线程3去获取锁,也能获取成功。
.....
这就是当前代码存在的线程不安全问题。
5.解决方案-改进Redis的分布式锁
5.1 解决误删除-step1:删前判断
需求:修改之前的分布式锁实现,满足:
1.在获取锁时存入线程标示(可以用UUID表示)
2.在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致 如果一致则释放锁 如果不一致则不释放锁
5.1.1 流程图
5.1.2 改造代码:
修改SimpleRedisLock 如下:
public class SimpleRedisLock implements ILock{private StringRedisTemplate stringRedisTemplate;private String name;private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";//用于区分不同jvm的线程public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(Long timeOutSec) {//使用String 类型来实现锁,如果key存在,则无法设置新的值//value 为当前线程ID,为后面释放锁做准备String threadId = ID_PREFIX + Thread.currentThread().getId() + "";//这样每个线程都有唯一key了Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeOutSec, TimeUnit.SECONDS);return BooleanUtil.isTrue(isLock);}@Overridepublic void unlock() {//获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId() + "";//这样每个线程都有唯一key了//获取redis中的值String key = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);if(threadId.equals(key)){//一致的话,删除锁stringRedisTemplate.delete(KEY_PREFIX+name);}}
}
可以发现,在释放锁(unlock)代码中,判断一致和删除锁是非原子性的,那么也会引发线程安全问题:
可以看到,线程1在判断一致之后阻塞,在线程1阻塞期间由于超时,锁被释放。
线程2此时获取锁成功,在执行业务的时候,线程1阻塞结束,将锁删除,那还是会存在误删的情况。
这里引入lua脚本来解决原子性问题
5.2 解决误删除-step2:使用Lua脚本实现原子性
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
(1)unlock.lua:
-- 比较线程标识与锁中的标识是否一致
if(redis.call('get',KEYS[1])== ARGV[1])then-- 删除锁return redis.call('del',KEYS[1])
end
return 0
(2)修改 SimpleRedisLock如下:
public class SimpleRedisLock implements ILock{private StringRedisTemplate stringRedisTemplate;private String name;private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";//用于区分不同jvm的线程private static final DefaultRedisScript<Long>UNLOCK_SCRIPT;static {//类加载的时候初始化,不用重复初始化UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));//设置lua脚本位置UNLOCK_SCRIPT.setResultType(Long.class);//设置返回类型}public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(Long timeOutSec) {//使用String 类型来实现锁,如果key存在,则无法设置新的值//value 为当前线程ID,为后面释放锁做准备String threadId = ID_PREFIX + Thread.currentThread().getId() + "";//这样每个线程都有唯一key了Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeOutSec, TimeUnit.SECONDS);return BooleanUtil.isTrue(isLock);}/*** 使用Lua脚本保证原子性*/@Overridepublic void unlock() {String threadId = ID_PREFIX + Thread.currentThread().getId() + "";//这样每个线程都有唯一key了stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),threadId);}// @Override
// public void unlock() {
// //获取线程标识
// String threadId = ID_PREFIX + Thread.currentThread().getId() + "";//这样每个线程都有唯一key了
// //获取redis中的值
// String key = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// if(threadId.equals(key)){//一致的话,删除锁
// stringRedisTemplate.delete(KEY_PREFIX+name);
// }
//
// }
}
此时这个使用Redis实现的分布式锁就可以满足大部分业务需求了,不过还是存在一些问题:
1.不可重入
2.不可重试
3.超时释放存在的安全隐患
4.集群时,主从节点之间存在的不一致性
这些问题我们可以直接采用成熟的,已实现的框架来帮助我们解决问题,如Redisson,后续也会出一篇关于Redisson的文章。