概述
-
定义
:分布式系统或集群模式下,多进程或多节点之间 “可见” 并且 “互斥” 的锁机制 -
功能
:确保同一时刻只有一个进程或节点能够获取某项资源的访问权 -
特点
- 互斥
- 高可用
- 多进程可见
- 高并发 (高性能)
- 安全性 (避免死锁问题)
-
常见的分布式锁
MySQL Redis Zookeeper 互斥 MySQL 本身的互斥锁机制 利用 setnx 互斥命令 利用节点的唯一性和有序性实现互斥 高可用 好 好 好 高性能 一般 好 一般 (强调一致性) 安全性 断开连接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开连接自动释放
Redis 分布式锁
一、核心思路
运行逻辑
- 线程进入时,利用 redis 的setNx 方法添加 key 作为逻辑锁
- 返回结果 == 1 → 当前线程抢到了锁,继续执行业务逻辑后删除 key,逻辑上释放锁
- 返回结果 == 0 → 等待一定时间后重新尝试获取锁
实现途径
:setNx 方法(添加代表锁的 key,如果没有当前 key 则添加当前 key-value 并返回 1,已有当前 key 则返回 0)最佳实践
- 在 Util 包下创建一个分布式锁的类,专门供业务模块(xxxService)获取锁和释放锁
- 在 ServiceImpl 的代码中调用 Util 中的锁工具并手动 ”上锁 / 释放锁”
二、获取锁
功能
:如果 Redis 中不存在当前锁,则在 Redis 中写入锁名 / 线程名 / 超时释放时间- ⭐
Redis命令
:SET myLockName myThreadName NX EX myExpireTime - ⭐
核心代码
:stringRedisTemplate.opsForValue().setIfAbsent( KEY_PREFIX + keyName, threadId, time, TimeUnit.SECONDS )
三、释放锁
功能
:逻辑上释放分布式锁,让该服务不再占用资源实现流程
- 判断 Redis 中存在对此业务的锁
- 存在锁 → 判断这个锁是否是当前线程获取
- 锁属于当前线程 → 释放锁(删除 redis 中的 key-value)
- 锁不属于当前线程 → 拒绝释放锁
- 不存在锁 → 释放锁失败
- 存在锁 → 判断这个锁是否是当前线程获取
- 判断 Redis 中存在对此业务的锁
Redis命令
:DEL myLockName
四、常见问题
误删问题
问题引入
- 持有锁的线程 1 出现阻塞,导致锁超时自动释放
- 线程2 请求获取锁,成功获得锁并开始执行业务逻辑
- 线程 1 唤醒,继续执行业务逻辑并删除锁 (误删其他线程的锁)
核心思路
:key 作为锁,value 用来标识是属于哪个进程的锁解决方案
- 存入锁时,value 中放入当前线程的标识(声明锁的主人)
- 删除锁时,判断当前锁的 value 是否包含当前线程的标识(检查是否由主人自己解锁)
- 锁属于当前线程 → 删除
- 锁不属于当前线程 → 拒绝删除
原子性问题
-
问题引入
- 线程 1 持有锁并执行业务逻辑后,已经判断锁属于自己
- 线程1准备删除锁,但是此时锁过期
- 线程2请求获取锁,获取锁成功
- 线程1继续执行删除锁逻辑 (误删其他线程的锁)
-
核心思路
:通过 Lua 脚本保证 “判断锁” 与 “删除锁” 的原子性 -
Redis 的调用函数
:redis.call('命令名称', 'key', '其它参数', ...) -
代码实现
- 目标:保证 unlock 操作的判断锁和删除锁是原子性操作 ( Lua 脚本 )
-- 定义 Lua 脚本,用于安全释放分布式锁 local key = KEYS[1] -- 锁的键名 local threadId = ARGV[1] -- 当前线程标识-- 判断锁是否属于当前线程 if (redis.call('GET', key) == threadId) thenreturn redis.call('DEL', key) -- 锁属于当前线程 -> 删除锁 end return 0 -- 锁不属于当前线程 -> 返回失败
可重入问题
-
目标
:对于当前获取了锁的进程,可以再次获取自己的锁 -
原理
-
借助底层的一个 voaltile 的一个 state 变量来记录重入的状态
-
没有进程持有当前锁(state=0) ⇒ 线程请求当前锁(state=1)
-
持有这把锁的线程再次请求当前锁 ⇒ state++
-
持有这把锁的线程释放当前锁 ⇒ state--
-
state ==0 ⇒ 释放当前锁
-
-
参数
- KEYS[1]:锁名称
- ARGV[1]:锁失效时间
- ARGV[2]:id + “:” + threadId ,表示持有锁的进程 (field)
过期问题
-
问题引入:如果任务执行时间超过锁的过期时间,锁可能会被误删
-
解决方案:看门狗机制,即在锁的过期时间内,通过后台线程周期性地延长锁的有效期
-
代码实现
// 全局调度器,负责分配任务 private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); // 使用 ThreadLocal 保存当前线程的续约任务 private final ThreadLocal<ScheduledFuture<?>> renewalTask = new ThreadLocal<>();public void lockWithWatchdog() {lock();// 启动定时任务ScheduledFuture<?> task = scheduler.scheduleAtFixedRate(() -> {stringRedisTemplate.expire(KEY_PREFIX + name, RELEASE_TIME_IN_SECONDS, TimeUnit.SECONDS);}, // 续约操作RELEASE_TIME_IN_SECONDS / 2, // initialDelay: 延迟多长时间后首次执行任务RELEASE_TIME_IN_SECONDS / 2, // period: 两次任务之间的时间间隔TimeUnit.SECONDS); // 时间单位:这里为秒renewalTask.set(task); // 将当前线程的续约任务保存到 ThreadLocal 中 }public void unlockWithWatchdog() {// 解锁unlock();// 从 ThreadLocal 中获取当前线程的续约任务ScheduledFuture<?> task = renewalTask.get();if (task != null) {task.cancel(false); // 取消当前续约任务renewalTask.remove(); // 清理 ThreadLocal,防止内存泄漏} }
解决方案
-
"获取锁" 脚本(lock.lua)
local key = KEYS[1]; -- key:锁的id local threadId = ARGV[1]; -- threadId:线程的id local releaseTime = tonumber(ARGV[2]); -- releaseTime:锁的自动释放时间(失效时间)-- 锁(key)不存在 if( redis.call('exists', key) == 0 ) thenredis.call('hset', key, threadId, 1); -- 成功获取锁redis.call('pexpire', key, releaseTime); -- 设置有效期return 1; -- 返回成功 end;-- 锁(key)已存在 && 锁(threadId)属于当前线程 if( redis.call('hexists', key, threadId ) == 1 ) thenredis.call( 'hincrby', key, threadId, 1 ); -- 当前线程的锁 +1redis.call( 'pexpire', key, releaseTime ); -- 更新有效期return 1; -- 返回成功 end;-- 锁存在 && 锁不属于当前线程,返回失败(当前锁已被占用) return 0;
-
”释放锁“ 脚本(unlock.lua)
local key = KEYS[1]; -- key:锁的id local threadId = ARGV[1]; -- threadId:线程的id local releaseTime = tonumber(ARGV[2]); -- releaseTime:锁的自动释放时间(失效时间)-- 当前锁不属于当前进程 if( redis.call('HEXISTS', key, threadId ) == 0 ) thenreturn 0; end;-- 当前锁属于当前进程, 重入次数 -1 local count = redis.call('HINCRBY', key, threadId, -1 );-- 重入次数已经为0,释放锁 if( count == 0 ) thenredis.call('DEL', key);return 1; end;-- 重入次数还不为0,更新有效期 redis.call('PEXPIRE', key, releaseTime); return 0;
-
Java 代码
public class RedisLock {private final StringRedisTemplate stringRedisTemplate;private static final String KEY_PREFIX = "lock:";private final int RELEASE_TIME_IN_SECONDS; // 锁的过期时间// 创建两个Lua脚本,用静态代码块加载 lock 和 unlock 脚本private static final DefaultRedisScript<Long> LOCK_SCRIPT;private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {LOCK_SCRIPT = new DefaultRedisScript<>();LOCK_SCRIPT.setLocation(new ClassPathResource("/path/to/lock.lua"));LOCK_SCRIPT.setResultType(Long.class);UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("/path/to/unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}// 重写锁操作,调用Lua脚本实现 “判断锁” 与 “删除锁” 的原子性操作,并实现可重入和避免误删public void lock(String key) {stringRedisTemplate.execute( // redis用execute函数调用lua脚本LOCK_SCRIPT, // 声明调用的脚本对象Collections.singletonList(KEY_PREFIX + key), // 传入key(锁)ID_PREFIX + Thread.currentThread().getId()); // 传入value(线程标识)String.valueOf(RELEASE_TIME_IN_MILLIS)); // 添加释放时间}public void unlock(String key) {stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + key),ID_PREFIX + Thread.currentThread().getId());String.valueOf(RELEASE_TIME_IN_MILLIS));}
Redisson 组件
一、概述
定义
:Redis 基础上实现的分布式工具集合,一个 Java 驻内存数据网格(In-Memory Data Grid)功能
- 常用对象:提供分布式的 Java 常用对象
- 分布式锁:提供分布式服务 (包括各种分布式锁的实现)
- 并发安全:解决集群模式下的并发安全问题
特点
- 可重入:线程可以二次获取自己加的锁,防止线程调用自己产生的死锁
- 可重试:线程请求锁失败后,应该能再次请求锁
- 超时续期:超时释放存在安全隐患,lua表达式只能保证不误删,但是超时释放后实际上有两个线程在锁的逻辑内
- 主从一致:主从同步之前,如果主机宕机就会出现死锁问题
二、使用入门
-
引入依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version> </dependency>
-
配置 Redission 客户端 (配置信息)
@Configuration public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){Config config = new Config();config.useSingleServer() // 使用单节点模式连接 Redis.setAddress("redis://192.168.7.7:6379").setPassword("123456");return Redisson.create(config); // 创建RedissonClient对象} }
-
在 ServiceImpl 中注入 RedissonClient 依赖
@AutoWired RedissonClient redissonClient;
四、锁重试 & WatchDog
锁重试机制
- 获取锁失败时可以设置重试等待时间
- 在等待时间内会周期性尝试获取锁
- 示例:
lock.tryLock(100, 10, TimeUnit.SECONDS)
- 等待100秒,每10秒重试一次
WatchDog 机制
- 默认锁有效期为 30 秒
- 获取锁成功后,会启动一个后台线程,定期延长锁的有效期
- 释放锁时,WatchDog 线程自动停止
五、multiLock 原理
-
概念
- MultiLock 可以将多个 RLock 对象关联为一个整体
- 只有所有的锁都获取成功,才算获取成功
-
使用场景
- Redis 主从架构下保证锁的可靠性
- 多个资源需要同时加锁的场景
-
示例代码
RLock lock1 = redissonClient1.getLock("lock1"); RLock lock2 = redissonClient2.getLock("lock2"); RLock multiLock = redissonClient.getMultiLock(lock1, lock2); try {// 同时获取多把锁multiLock.lock();// 业务逻辑 } finally {// 释放所有锁multiLock.unlock(); }
业务逻辑
创建资源
抢占资源 (Lua 脚本)
一、优惠券下单逻辑
二、代码实现 (Lua脚本)
--1. 参数列表
--1.1. 优惠券id
local voucherId = ARGV[1]
--1.2. 用户id
local userId = ARGV[2]
--1.3. 订单id
local orderId = ARGV[3]--2. 数据key
--2.1. 库存key
local stockKey = 'seckill:stock:' .. voucherId
--2.2. 订单key
local orderKey = 'seckill:order' .. voucherId--3. 脚本业务
--3.1. 判断库存是否充足 get stockKey
if( tonumber( redis.call('get', stockKey) ) <= 0 ) thenreturn 1
end
--3.2. 判断用户是否下单 SISMEMBER orderKey userId
if( redis.call( 'sismember', orderKey, userId ) == 1 ) thenreturn 2
end
--3.4 扣库存: stockKey 的库存 -1
redis.call( 'incrby', stockKey, -1 )
--3.5 下单(保存用户): orderKey 集合中添加 userId
redis.call( 'sadd', orderKey, userId )
-- 3.6. 发送消息到队列中
redis.call( 'xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId )
三、加载 Lua 脚本
RedisScript 接口
:用于绑定一个具体的 Lua 脚本DefaultRedisScript 实现类
-
定义:RedisScript 接口的实现类
-
功能:提前加载 Lua 脚本
-
示例
// 创建Lua脚本对象 private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// Lua脚本初始化 (通过静态代码块) static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("/path/to/lua_script.lua"));SECKILL_SCRIPT.setResultType(Long.class); }
-
四、执行 Lua 脚本
调用Lua脚本 API
:StringRedisTemplate.execute( RedisScript<T> script, List<K> keys, Object… args )示例
-
执行 ”下单脚本” (此时不需要 key,因为下单时只需要用 userId 和 voucherId 查询是否有锁)
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, // 要执行的脚本Collections.emptyList(), // KEYvoucherId.toString(), userId.toString(), String.valueOf(orderId) // VALUES );
-
执行 “unlock脚本”
-
用户下单
-
代码逻辑
-
VoucherOrderServiceImpl 代码
@Override public Result seckillVoucher( Long voucherId ) {// 1. 获取用户id/订单idLong userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");// 2. 执行Lua脚本Long luaResult = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));// 3. 反向判断lua脚本是否成功下单 (下单失败提前退出)if( luaResult != 0L ) {return Result.fail( r == 1 ? "库存不足":"同一用户不能重复下单");}// 4. 下单成功后的操作// 4.1. 获取订单idlong orderId = redisIdWorker.nextId("order");// 4.2. 保存阻塞队列return Result.ok(orderId); }