文章目录
- 一、引言
- 二、分布式锁的核心要求
- 三、基于Redis的分布式锁实现
- 3.1 Redis分布式锁的基本实现
- 3.2 Redis分布式锁的可重入实现
- 四、基于Zookeeper的分布式锁实现
- 4.1 Zookeeper分布式锁的基本原理
- 4.2 Zookeeper分布式锁的优化实现
- 五、基于数据库的分布式锁实现
- 总结
一、引言
在分布式系统中,多个应用实例对共享资源的并发访问需要一个跨JVM的锁机制来保证互斥。分布式锁通过分布式协调器来实现多个应用实例之间的同步,确保同一时间只有一个应用实例能够访问共享资源。
二、分布式锁的核心要求
分布式锁需要满足互斥性、防死锁、高可用和高性能等基本要求。互斥性确保任意时刻只有一个客户端持有锁,防死锁机制通过超时释放来避免死锁。
下面通过示例代码来说明分布式锁的基本应用:
public class DistributedSystemDemo {private static int inventory = 100; // 商品库存private final DistributedLock distributedLock;private final String lockKey = "inventory_lock";public DistributedSystemDemo(DistributedLock distributedLock) {this.distributedLock = distributedLock;}public boolean deductInventory() {try {// 尝试获取分布式锁if (distributedLock.tryLock(lockKey, 1000)) {try {if (inventory > 0) {inventory--;System.out.println("扣减库存成功,当前库存: " + inventory);return true;}return false;} finally {// 释放锁distributedLock.unlock(lockKey);}}return false;} catch (Exception e) {// 处理异常return false;}}
}
三、基于Redis的分布式锁实现
3.1 Redis分布式锁的基本实现
Redis分布式锁利用SET命令的原子性特性实现。通过SET NX命令尝试获取锁,并设置过期时间防止死锁。释放锁时需要确保只有锁的持有者才能释放锁,通常使用Lua脚本实现原子性释放。
public class RedisDistributedLock {private static final String LOCK_PREFIX = "distributed_lock:";private static final String LOCK_SUCCESS = "OK";private StringRedisTemplate redisTemplate;public RedisDistributedLock(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}public boolean tryLock(String lockKey, String requestId, long expireTime) {String key = LOCK_PREFIX + lockKey;try {String result = redisTemplate.execute((RedisCallback<String>) connection -> {JedisCommands commands = (JedisCommands) connection.getNativeConnection();return commands.set(key, requestId, "NX", "PX", expireTime);});return LOCK_SUCCESS.equals(result);} catch (Exception e) {return false;}}public boolean releaseLock(String lockKey, String requestId) {String key = LOCK_PREFIX + lockKey;String script = "if redis.call('get', KEYS[1]) == ARGV[1] then "+ "return redis.call('del', KEYS[1]) "+ "else return 0 end";try {DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();redisScript.setScriptText(script);redisScript.setResultType(Long.class);Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), requestId);return Long.valueOf(1).equals(result);} catch (Exception e) {return false;}}
}
3.2 Redis分布式锁的可重入实现
可重入锁允许同一个线程多次获取同一把锁,实现需要记录锁的持有线程和重入次数。使用ThreadLocal维护线程的锁计数,当计数归零时才真正释放Redis中的锁。
public class ReentrantRedisLock {private static final String LOCK_PREFIX = "reentrant_lock:";private StringRedisTemplate redisTemplate;private ThreadLocal<Map<String, LockInfo>> lockInfo = new ThreadLocal<>();public static class LockInfo {String requestId;int lockCount;public LockInfo(String requestId) {this.requestId = requestId;this.lockCount = 1;}}public ReentrantRedisLock(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}public boolean tryLock(String lockKey, long expireTime) {String key = LOCK_PREFIX + lockKey;Map<String, LockInfo> locks = lockInfo.get();if (locks == null) {locks = new HashMap<>();lockInfo.set(locks);}LockInfo info = locks.get(key);if (info != null && info.lockCount > 0) {info.lockCount++;return true;}String requestId = UUID.randomUUID().toString();Boolean success = redisTemplate.opsForValue().setIfAbsent(key, requestId, expireTime, TimeUnit.MILLISECONDS);if (Boolean.TRUE.equals(success)) {locks.put(key, new LockInfo(requestId));return true;}return false;}public boolean releaseLock(String lockKey) {String key = LOCK_PREFIX + lockKey;Map<String, LockInfo> locks = lockInfo.get();if (locks == null) {return false;}LockInfo info = locks.get(key);if (info == null || info.lockCount <= 0) {return false;}info.lockCount--;if (info.lockCount == 0) {locks.remove(key);if (locks.isEmpty()) {lockInfo.remove();}String script = "if redis.call('get', KEYS[1]) == ARGV[1] then "+ "return redis.call('del', KEYS[1]) "+ "else return 0 end";try {DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();redisScript.setScriptText(script);redisScript.setResultType(Long.class);Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), info.requestId);return Long.valueOf(1).equals(result);} catch (Exception e) {return false;}}return true;}
}
四、基于Zookeeper的分布式锁实现
4.1 Zookeeper分布式锁的基本原理
Zookeeper的分布式锁利用临时顺序节点特性,创建节点时会得到一个递增序号,序号最小的节点获得锁。其他节点通过监听前一个节点的删除事件来获取锁。
public class ZookeeperDistributedLock {private static final String LOCK_ROOT = "/distributed_locks";private final CuratorFramework client;private final String lockPath;private String currentLockPath;public ZookeeperDistributedLock(CuratorFramework client, String lockName) {this.client = client;this.lockPath = LOCK_ROOT + "/" + lockName;try {client.createContainers(LOCK_ROOT);} catch (Exception e) {throw new RuntimeException("Failed to create lock root", e);}}public boolean tryLock() throws Exception {currentLockPath = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(lockPath + "/lock-");List<String> children = getSortedChildren();String currentNode = currentLockPath.substring(currentLockPath.lastIndexOf('/') + 1);if (currentNode.equals(children.get(0))) {return true;}String previousNode = children.get(children.indexOf(currentNode) - 1);final CountDownLatch latch = new CountDownLatch(1);client.getData().usingWatcher(new CuratorWatcher() {@Overridepublic void process(WatchedEvent event) {if (event.getType() == EventType.NodeDeleted) {latch.countDown();}}}).forPath(lockPath + "/" + previousNode);return latch.await(30, TimeUnit.SECONDS);}private List<String> getSortedChildren() throws Exception {List<String> children = client.getChildren().forPath(lockPath);Collections.sort(children);return children;}public void unlock() {try {if (currentLockPath != null) {client.delete().guaranteed().forPath(currentLockPath);}} catch (Exception e) {throw new RuntimeException("Failed to release lock", e);}}
}
4.2 Zookeeper分布式锁的优化实现
为了提高可靠性和性能,需要添加重试机制、超时机制和异常处理。重试使用指数退避策略,避免频繁无效请求,同时妥善处理网络闪断等异常情况。
public class OptimizedZookeeperLock {private static final String LOCK_ROOT = "/distributed_locks";private final CuratorFramework client;private final String lockPath;private String currentLockPath;private final int maxRetries;private final long timeoutMs;public OptimizedZookeeperLock(CuratorFramework client, String lockName, int maxRetries, long timeoutMs) {this.client = client;this.lockPath = LOCK_ROOT + "/" + lockName;this.maxRetries = maxRetries;this.timeoutMs = timeoutMs;try {client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(LOCK_ROOT);} catch (Exception e) {throw new RuntimeException("Failed to initialize lock", e);}}public boolean tryLock() throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, maxRetries);try {currentLockPath = client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(lockPath + "/lock-");return waitForLock(timeoutMs);} catch (Exception e) {if (currentLockPath != null) {try {client.delete().guaranteed().forPath(currentLockPath);} catch (Exception ex) {// 处理删除失败的异常}}throw e;}}private boolean waitForLock(long millisToWait) throws Exception {long startTime = System.currentTimeMillis();while ((System.currentTimeMillis() - startTime) < millisToWait) {List<String> children = getSortedChildren();String currentNode = currentLockPath.substring(currentLockPath.lastIndexOf('/') + 1);if (currentNode.equals(children.get(0))) {return true;}String previousNode = children.get(children.indexOf(currentNode) - 1);final CountDownLatch latch = new CountDownLatch(1);Stat stat = client.checkExists().usingWatcher(new CuratorWatcher() {@Overridepublic void process(WatchedEvent event) {if (event.getType() == EventType.NodeDeleted) {latch.countDown();}}}).forPath(lockPath + "/" + previousNode);if (stat == null) {continue;}if (latch.await(millisToWait - (System.currentTimeMillis() - startTime), TimeUnit.MILLISECONDS)) {return true;}}return false;}private List<String> getSortedChildren() throws Exception {List<String> children = client.getChildren().forPath(lockPath);Collections.sort(children);return children;}public void unlock() {try {if (currentLockPath != null) {client.delete().guaranteed().forPath(currentLockPath);}} catch (Exception e) {throw new RuntimeException("Failed to release lock", e);}}
}
五、基于数据库的分布式锁实现
数据库分布式锁利用数据库的行锁机制实现,通过创建锁表并使用事务特性来保证互斥性。这种方式实现简单,但性能相对较低,适合并发量小的场景。
public class DatabaseDistributedLock {private final DataSource dataSource;public DatabaseDistributedLock(DataSource dataSource) {this.dataSource = dataSource;}public boolean tryLock(String lockKey, String owner, long expireTime) {Connection conn = null;PreparedStatement stmt = conn.prepareStatement(sql);stmt.setString(1, lockKey);stmt.setString(2, owner);stmt.setTimestamp(3, new Timestamp(System.currentTimeMillis() + expireTime));int affected = stmt.executeUpdate();conn.commit();return affected > 0;} catch (SQLException e) {if (conn != null) {try {conn.rollback();} catch (SQLException ex) {// 处理回滚异常}}return false;} finally {if (stmt != null) {try {stmt.close();} catch (SQLException e) {// 处理异常}}if (conn != null) {try {conn.close();} catch (SQLException e) {// 处理异常}}}}
}public class OptimisticLockImplementation {private final DataSource dataSource;public OptimisticLockImplementation(DataSource dataSource) {this.dataSource = dataSource;}public boolean tryOptimisticLock(String resourceId, String owner, int version) {Connection conn = null;PreparedStatement stmt = null;try {conn = dataSource.getConnection();String sql = "UPDATE resource_table SET version = ?, owner = ? "+ "WHERE resource_id = ? AND version = ?";stmt = conn.prepareStatement(sql);stmt.setInt(1, version + 1);stmt.setString(2, owner);stmt.setString(3, resourceId);stmt.setInt(4, version);int affected = stmt.executeUpdate();return affected > 0;} catch (SQLException e) {return false;} finally {// 关闭资源if (stmt != null) {try {stmt.close();} catch (SQLException e) {// 处理异常}}if (conn != null) {try {conn.close();} catch (SQLException e) {// 处理异常}}}}
}
总结
分布式锁是构建可靠分布式系统的重要组件。通过Redis、Zookeeper和数据库三种实现方案,可以满足不同场景的需求。在实际应用中,需要结合业务特点选择合适的实现方式,同时做好异常处理,确保分布式锁的可靠性。