zookeeper不可重入锁的实现代码
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;public class DistributedLockExample {private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:2181";private static final int SESSION_TIMEOUT = 5000;private static final String LOCK_PATH = "/distributed_lock";private ZooKeeper zooKeeper;//当前最新的一个节点private String currentZnodeName;public DistributedLockExample() throws IOException {this.zooKeeper = new ZooKeeper(ZOOKEEPER_CONNECTION_STRING, SESSION_TIMEOUT, new Watcher() {@Overridepublic void process(WatchedEvent event) {// Watcher实现}});}public void acquireLock() throws KeeperException, InterruptedException {while (true) {createLockNode();//获取锁节点下的所有子节点List<String> children = zooKeeper.getChildren(LOCK_PATH, false);//排序Collections.sort(children);//获取小的节点,也就是最先创建的那个节点String smallestNode = children.get(0);//如果当前最新节点等于最小节点,那么就代表获取到了锁if (currentZnodeName.equals(LOCK_PATH + "/" + smallestNode)) {return;} else {//这里等待获取锁,利用CountDownLatch, 是 Java 中的一个同步辅助类//CountDownLatch的countDown()可以把计数器减一, await()会让线程一直等待执行//直到计数器为0的时候
//children.get(Collections.binarySearch(children, currentZnodeName.substring(LOCK_PATH.length() + 1)) - 1)这段代码是获取当前最新节点的前一个节点,然后只监听前一个节点可以避免惊群效应。waitForLock(children.get(Collections.binarySearch(children, currentZnodeName.substring(LOCK_PATH.length() + 1)) - 1));}}}private void createLockNode() throws KeeperException, InterruptedException {Stat stat = zooKeeper.exists(LOCK_PATH, false);if (stat == null) {//如果stat锁的节点不存在,则创建一个持久的锁节点zooKeeper.create(LOCK_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}//在指定的锁节点下创建一个临时的顺序节点,并且赋值给currentZnodeName(当前最新的一个节点)currentZnodeName = zooKeeper.create(LOCK_PATH + "/", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);}private void waitForLock(String prevZnodeName) throws KeeperException, InterruptedException {CountDownLatch latch = new CountDownLatch(1);Stat stat = zooKeeper.exists(LOCK_PATH + "/" + prevZnodeName, new Watcher() {@Overridepublic void process(WatchedEvent event) {//如果监听的节点被删除,那么就会触发这个if,if (event.getType() == Event.EventType.NodeDeleted) {//减一,等于零,那么下面latch.await()就会让线程不在等待,继续执行,然后就会重新进入acquireLock()的while (true)循环,来获取锁latch.countDown();}}});if (stat != null) {latch.await();}}public void releaseLock() throws KeeperException, InterruptedException {zooKeeper.delete(currentZnodeName, -1);zooKeeper.close();}public static void main(String[] args) throws IOException, KeeperException, InterruptedException {DistributedLockExample lockExample = new DistributedLockExample();lockExample.acquireLock();// 执行需要加锁的业务逻辑lockExample.releaseLock();}
}
可重入锁的实现原理就是维护一个线程id和一个计数器,当同一个线程获取到最小节点时,可以拿到锁,当同一个线程的A方法调用B方法,A方法和B方法需要同一把锁时,通过判断线程id获取到锁,并且给锁的计数器加1,当B方法执行完时,给计数器减一,计数器达到0时,释放锁(删除节点)
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;public class ZookeeperReentrantLock {private ZooKeeper zk;private final String lockPath = "/reentrantLock";//利用ThreadLocal特性提供线程局部变量的存取。每个线程可以访问自己内部的 ThreadLocal 变量副本,互不影响,使用ThreadLocal要小心内存泄漏问题,所以用完后要手动删除ThreadLocalprivate ThreadLocal<String> lockNode = new ThreadLocal<>();private final int sessionTimeout = 3000; // 会话超时时间public ZookeeperReentrantLock(String hosts) throws Exception {zk = new ZooKeeper(hosts, sessionTimeout, (event) -> {if (event.getType() == Watcher.Event.EventType.NodeDeleted) {// 如果当前线程的锁节点被删除,可能是会话超时,需要重新创建acquireLock();}});ensureLockPath();}public void acquireLock() throws Exception {// 如果当前线程已经持有锁,则增加重入次数并返回String current = lockNode.get();if (current != null) {((LockData) new Stat()).incrementAndGet();return;}// 创建临时顺序节点String createdPath = zk.create(lockPath + "/", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);List<String> children = zk.getChildren(lockPath, false);Collections.sort(children);// 尝试获取锁for (String child : children) {if (createdPath.equals(lockPath + "/" + child)) {// 当前线程创建的节点序号最小,获取锁lockNode.set(createdPath);break;} else {// 等待前一个节点释放锁zk.exists(lockPath + "/" + child, true);}}}public void releaseLock() throws Exception {String current = lockNode.get();if (current != null) {// 减少重入次数,如果为0,则释放锁LockData lockData = (LockData) zk.exists(current, false);if (lockData == null || lockData.decrementAndGet() == 0) {zk.delete(current, -1);lockNode.remove();}}}private void ensureLockPath() throws KeeperException, InterruptedException {Stat stat = zk.exists(lockPath, false);if (stat == null) {zk.create(lockPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}private static class LockData extends Stat {private int count = 1;public void incrementAndGet() {count++;}public int decrementAndGet() {return --count;}}public static void main(String[] args) {try {ZookeeperReentrantLock lock = new ZookeeperReentrantLock("localhost:2181");lock.acquireLock();// 执行业务逻辑lock.releaseLock();} catch (Exception e) {e.printStackTrace();}}
}