您的位置:首页 > 健康 > 养生 > 【Zookeeper】两种基于原生zk客户端的分布式锁的实现

【Zookeeper】两种基于原生zk客户端的分布式锁的实现

2024/10/6 2:27:13 来源:https://blog.csdn.net/gengzhihao10/article/details/139994696  浏览:    关键词:【Zookeeper】两种基于原生zk客户端的分布式锁的实现

基于zk的分布式锁的实现主要依赖zk节点的原子性,可以基于原生zk来自己实现分布式锁,更多的是基于Curator这个框架来直接使用基于zk的分布式锁[1]。这里我们仅仅讨论基于原生zk客户端依赖自己实现的zk分布式锁。

原生zk客户端中的一些调用如getChildren方法,可以是同步返回,也可以通过实现AsyncCallback的内部接口来重写异步回调处理逻辑。这里我们举例同步和异步两种方式的实现。

同步实现[1],这篇文章中缺少了关于"Watcher关注的前面节点状态改变后CountDown"的逻辑,即缺少了Watcher的回调。这里我补上了回调并做了一些调整,代码如下:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class SyncZkLock implements Watcher {// zookeeper server 列表private String connectString ="192.168.1.128:2181,192.168.1.129:2181,192.168.1.130:2181";// 超时时间private int sessionTimeout = 2000;private ZooKeeper zk;private String rootNode = "locks";private String subNode = "seq-";// 当前 client 等待的子节点private String waitPath;// ZooKeeper 连接等待private CountDownLatch connectLatch = new CountDownLatch(1);// ZooKeeper 节点等待private CountDownLatch waitLatch = new CountDownLatch(1);// 当前 client 创建的子节点private String currentNode;// 和 zk 服务建立连接,并创建根节点public SyncZkLock() throws IOException, InterruptedException, KeeperException {zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {// 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程if (event.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}// 发生了 waitPath 的删除事件if (event.getType() == Event.EventType.NodeDeleted &&event.getPath().equals(waitPath)) {waitLatch.countDown();}}});// 等待连接建立connectLatch.await();//获取根节点状态Stat stat = zk.exists("/" + rootNode, false);//如果根节点不存在,则创建根节点,根节点类型为永久节点if (stat == null) {System.out.println("根节点不存在");zk.create("/" + rootNode, new byte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}// 加锁方法public void zkLock() {try {//在根节点下创建临时顺序节点,返回值为创建的节点路径currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);checkAndLockOrAwait(false);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}// 解锁方法public void zkUnlock() {try {zk.delete(this.currentNode, -1);} catch (InterruptedException | KeeperException e) {e.printStackTrace();}}//watch被触发@Overridepublic void process(WatchedEvent event) {switch (event.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:checkAndLockOrAwait(true);break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}//查看当前节点状态,Lock结束或者添加Watcher并等待private void checkAndLockOrAwait(boolean flag) {try {// 注意, 没有必要监听"/locks"的子节点的变化情况List<String> childrenNodes = zk.getChildren("/" + rootNode, false);// 列表中只有一个子节点, 那肯定就是 currentNode , 说明client 获得锁if (childrenNodes.size() == 1) {return;} else {//对根节点下的所有临时顺序节点进行从小到大排序Collections.sort(childrenNodes);//当前节点名称String thisNode = currentNode.substring(("/" + rootNode + "/").length());//获取当前节点的位置int index = childrenNodes.indexOf(thisNode);if (index == -1) {System.out.println("数据异常");} else if (index == 0) {//刚创建时flag为false,不需要countDown。//watch触发时flag为true,需要countDown。if (flag){waitLatch.countDown();}// index == 0, 说明 thisNode 在列表中最小, 当前client 获得锁return;} else {// 获得排名比 currentNode 前 1 位的节点this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);// 在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的 process 方法zk.getData(waitPath, true, new Stat());//进入等待锁状态waitLatch.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}

异步的实现,代码如下:

public class AsyncZkLock implements Watcher, AsyncCallback.StringCallback ,AsyncCallback.Children2Callback ,AsyncCallback.StatCallback {private ZooKeeper zk ;private String threadName;private CountDownLatch cc = new CountDownLatch(1);private String pathName;private final String ctx = "zk_lock";public String getPathName() {return pathName;}public void setPathName(String pathName) {this.pathName = pathName;}public String getThreadName() {return threadName;}public void setThreadName(String threadName) {this.threadName = threadName;}public ZooKeeper getZk() {return zk;}public void setZk(ZooKeeper zk) {this.zk = zk;}public void tryLock(){try {System.out.println(threadName + "  create....");zk.create("/lock",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this, ctx);cc.await();} catch (InterruptedException e) {e.printStackTrace();}}public void unLock(){try {zk.delete(pathName,-1);System.out.println(threadName + " over work....");} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}//给前一个节点加的Watcher被触发的回调@Overridepublic void process(WatchedEvent event) {switch (event.getType()) {case None:break;case NodeCreated:break;case NodeDeleted://这个getChildren是个异步方法,通过重写AsyncCallback.Children2Callback的processResult方法,处理回调zk.getChildren("/",false,this ,ctx);break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}//string callback//zk.create方法的异步回调@Overridepublic void processResult(int rc, String path, Object ctx, String name) {if(name != null ){System.out.println(threadName  +"  create node : " +  name );pathName =  name ;zk.getChildren("/",false,this , ctx);}}//getChildren  call back//zk.getChildren方法的异步回调@Overridepublic void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {Collections.sort(children);int i = children.indexOf(pathName.substring(1));//是不是第一个if(i == 0){//yesSystem.out.println(threadName +" i am first....");try {zk.setData("/",threadName.getBytes(),-1);cc.countDown();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}else{//no//监控前面节点,创建监控前面节点的Watcherzk.exists("/"+children.get(i-1),this,this, ctx);}}//statCallback//zk.exists的异步回调@Overridepublic void processResult(int rc, String path, Object ctx, Stat stat) {//这里默认添加watch成功,没有做失败的处理。//假设有依次A B C D E ,C取消了,D接受到回调,取到了children列表 A B D E,//但是此时B也取消了,而D此时给前面节点B添加watch,会出现问题,//因此这里如果添加失败,应该重新获取children列表,// 依靠getChildren的回调逻辑:如果是第一个就结束,不是第一个,就找到前一个节点并给前一个添加监控//来重新添加watch}
}

参考文章:
[1],Zookeeper + Curator实现分布式锁

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com