您的位置:首页 > 新闻 > 会展 > 大数据技术之Zookeeper实现分布式锁(5)

大数据技术之Zookeeper实现分布式锁(5)

2025/3/5 4:43:00 来源:https://blog.csdn.net/qq_45115959/article/details/141473375  浏览:    关键词:大数据技术之Zookeeper实现分布式锁(5)

目录

分布式锁案例

1. 创建锁节点

2. 获取锁

3. 释放锁

4. 锁自动释放

示例代码

分布式锁测试

测试环境准备

测试步骤

示例代码

Client1.java

Client2.java

运行测试

Curator 框架实现分布式锁案例 

步骤 1: 添加依赖

步骤 2: 创建 CuratorFramework 实例

步骤 3: 实现分布式锁

说明

运行示例

注意事项


分布式锁案例

Zookeeper 是一个分布式的协调服务,它能够提供一系列的基础服务,比如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理等。其中,分布式锁是 Zookeeper 应用场景中的一个重要功能,可以用来解决分布式系统中多进程之间的互斥访问问题。

下面是一个简单的使用原生 Zookeeper 实现分布式锁的步骤概述:

1. 创建锁节点

首先,在 Zookeeper 中创建一个持久化的父节点(例如/distributed-lock),用于保存所有的锁节点。

2. 获取锁

当一个客户端想要获取锁时,需要执行以下操作:

  • /distributed-lock下创建一个临时有序节点,例如/distributed-lock/lock-0000000001
  • 列出父节点的所有子节点,并根据序号进行排序。
  • 检查创建的子节点是否是最小的一个,如果是,则获取锁成功;如果不是,则找到序号比当前节点小的最近的那个节点,并对该节点添加监听器。

3. 释放锁

当客户端不再需要锁时,只需要删除之前创建的临时节点即可释放锁。

4. 锁自动释放

由于锁节点是临时节点,所以当客户端与 Zookeeper 服务器的会话结束时,锁也会自动释放。

示例代码

这里给出一个简化版的 Java 代码示例,假设已经有一个 ZooKeeper 客户端实例 zooKeeper

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class DistributedLockExample {private static final String LOCK_PATH = "/distributed-lock";private static final String CLIENT_ID = "Client1";public static void main(String[] args) throws Exception {ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println("Received event: " + event);if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(lockNodePath)) {latch.countDown();}}});// 创建锁路径Stat stat = zooKeeper.exists(LOCK_PATH, false);if (stat == null) {zooKeeper.create(LOCK_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}// 尝试获取锁String lockNodePath = zooKeeper.create(LOCK_PATH + "/lock-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);List<String> children = zooKeeper.getChildren(LOCK_PATH, true);Collections.sort(children);boolean isLocked = false;for (String child : children) {if (lockNodePath.endsWith(child)) {if (children.indexOf(child) == 0) {isLocked = true;break;} else {String prevChild = children.get(children.indexOf(child) - 1);String prevNodePath = LOCK_PATH + "/" + prevChild;CountDownLatch latch = new CountDownLatch(1);zooKeeper.exists(prevNodePath, event -> {if (event.getType() == Event.EventType.NodeDeleted) {latch.countDown();}});latch.await();isLocked = true;break;}}}if (isLocked) {System.out.println(CLIENT_ID + " acquired the lock");// 执行需要锁定的操作Thread.sleep(5000); // 模拟长时间运行的任务} else {System.out.println(CLIENT_ID + " failed to acquire the lock");}// 释放锁zooKeeper.delete(lockNodePath, -1);zooKeeper.close();}

这个示例中,我们使用了 ZooKeeper 的客户端 API 来创建锁节点,并通过监听前一个节点的状态来等待锁的释放。

分布式锁测试

为了测试分布式锁的有效性,你需要设置多个客户端同时尝试获取锁,并观察它们的行为是否符合预期。下面是一个简单的测试方案:

测试环境准备

  1. 启动 Zookeeper 服务:确保有一个可用的 Zookeeper 服务器。
  2. 编写客户端程序:根据上一个回答中提供的示例代码,你可以编写多个客户端程序,每个程序都试图获取相同的锁。

测试步骤

  1. 启动多个客户端:同时启动多个客户端程序,每个客户端都尝试获取同一个锁。
  2. 检查锁的获取情况:观察哪些客户端获得了锁,哪些客户端没有获得锁,并且等待其他客户端释放锁。
  3. 检查锁的释放情况:当持有锁的客户端完成任务后,它应该释放锁,然后观察下一个等待的客户端是否能够成功获取锁。
  4. 重复上述过程:多次执行上述步骤,以确保分布式锁机制的稳定性和可靠性。

示例代码

基于上一个回答中的示例代码,你可以创建两个或更多的客户端来模拟并发获取锁的情况。这里是一个简单的示例,展示如何编写两个客户端程序来测试分布式锁。

Client1.java
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class Client1 implements Runnable {private final ZooKeeper zooKeeper;private final String lockPath;public Client1(ZooKeeper zooKeeper, String lockPath) throws InterruptedException, KeeperException {this.zooKeeper = zooKeeper;this.lockPath = lockPath;}@Overridepublic void run() {try {// 创建锁路径Stat stat = zooKeeper.exists(lockPath, false);if (stat == null) {zooKeeper.create(lockPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}// 尝试获取锁String lockNodePath = zooKeeper.create(lockPath + "/lock-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);List<String> children = zooKeeper.getChildren(lockPath, true);Collections.sort(children);boolean isLocked = false;for (String child : children) {if (lockNodePath.endsWith(child)) {if (children.indexOf(child) == 0) {isLocked = true;break;} else {String prevChild = children.get(children.indexOf(child) - 1);String prevNodePath = lockPath + "/" + prevChild;CountDownLatch latch = new CountDownLatch(1);zooKeeper.exists(prevNodePath, event -> {if (event.getType() == Event.EventType.NodeDeleted) {latch.countDown();}});latch.await();isLocked = true;break;}}}if (isLocked) {System.out.println("Client1 acquired the lock");// 执行需要锁定的操作Thread.sleep(5000); // 模拟长时间运行的任务} else {System.out.println("Client1 failed to acquire the lock");}// 释放锁zooKeeper.delete(lockNodePath, -1);zooKeeper.close();} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, event -> {System.out.println("Received event: " + event);});Client1 client = new Client1(zooKeeper, "/distributed-lock");Thread thread = new Thread(client);thread.start();}
}
Client2.java
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class Client2 implements Runnable {private final ZooKeeper zooKeeper;private final String lockPath;public Client2(ZooKeeper zooKeeper, String lockPath) throws InterruptedException, KeeperException {this.zooKeeper = zooKeeper;this.lockPath = lockPath;}@Overridepublic void run() {try {// 创建锁路径Stat stat = zooKeeper.exists(lockPath, false);if (stat == null) {zooKeeper.create(lockPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}// 尝试获取锁String lockNodePath = zooKeeper.create(lockPath + "/lock-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);List<String> children = zooKeeper.getChildren(lockPath, true);Collections.sort(children);boolean isLocked = false;for (String child : children) {if (lockNodePath.endsWith(child)) {if (children.indexOf(child) == 0) {isLocked = true;break;} else {String prevChild = children.get(children.indexOf(child) - 1);String prevNodePath = lockPath + "/" + prevChild;CountDownLatch latch = new CountDownLatch(1);zooKeeper.exists(prevNodePath, event -> {if (event.getType() == Event.EventType.NodeDeleted) {latch.countDown();}});latch.await();isLocked = true;break;}}}if (isLocked) {System.out.println("Client2 acquired the lock");// 执行需要锁定的操作Thread.sleep(5000); // 模拟长时间运行的任务} else {System.out.println("Client2 failed to acquire the lock");}// 释放锁zooKeeper.delete(lockNodePath, -1);zooKeeper.close();} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, event -> {System.out.println("Received event: " + event);});Client2 client = new Client2(zooKeeper, "/distributed-lock");Thread thread = new Thread(client);thread.start();}
}

运行测试

  1. 启动 Zookeeper 服务器。
  2. 分别运行 Client1 和 Client2 程序。
  3. 观察输出结果,确认只有一个客户端能够成功获取锁,并且在执行完任务后正确释放锁。

Curator 框架实现分布式锁案例 

Curator 是一个 Apache ZooKeeper 的高级 Java 客户端库,它提供了许多简化 ZooKeeper 使用的工具类和框架。Curator 提供了一个非常方便的方式来实现分布式锁,其中包括了 InterProcessMutex 类,这是一个高级的分布式锁实现。

下面是一个使用 Curator 框架实现的分布式锁示例:

步骤 1: 添加依赖

首先,你需要在你的项目中添加 Curator 的依赖。如果你使用 Maven,可以在 pom.xml 文件中添加如下依赖:

<dependencies><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.2.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.0</version></dependency>
</dependencies

步骤 2: 创建 CuratorFramework 实例

接下来,我们需要创建一个 CuratorFramework 实例,这是 Curator 的主要入口点。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorDistributedLockExample {private static final String CONNECT_STRING = "localhost:2181";private static final int SESSION_TIMEOUT_MS = 5000;private static final int CONNECTION_TIMEOUT_MS = 3000;private CuratorFramework client;public CuratorDistributedLockExample() {ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString(CONNECT_STRING).sessionTimeoutMs(SESSION_TIMEOUT_MS).connectionTimeoutMs(CONNECTION_TIMEOUT_MS).retryPolicy(retryPolicy).namespace("locks") // 可选,指定命名空间.build();client.start();}public void close() throws Exception {if (client != null) {client.close();}}
}

步骤 3: 实现分布式锁

现在我们可以使用 Curator 提供的 InterProcessMutex 类来实现分布式锁。

import org.apache.curator.framework.recipes.locks.InterProcessMutex;public class CuratorDistributedLockExample extends CuratorDistributedLockExample {private InterProcessMutex lock;private final String lockPath = "/distributed-lock";public CuratorDistributedLockExample() {super();lock = new InterProcessMutex(client, lockPath);}public void acquireLock() throws Exception {System.out.println("Client is trying to acquire the lock...");lock.acquire();System.out.println("Client acquired the lock.");}public void releaseLock() throws Exception {lock.release();System.out.println("Client released the lock.");}public void doWorkWithLock() throws Exception {acquireLock();try {System.out.println("Doing work with the lock...");Thread.sleep(5000); // 模拟执行一些耗时操作} finally {releaseLock();}}public static void main(String[] args) throws Exception {CuratorDistributedLockExample example = new CuratorDistributedLockExample();example.doWorkWithLock();example.close();}
}

说明

  • InterProcessMutex:这是 Curator 提供的一个分布式锁实现。
  • acquireLock 方法:尝试获取锁。
  • releaseLock 方法:释放锁。
  • doWorkWithLock 方法:在这个方法中,我们尝试获取锁,执行一些操作,然后释放锁。

运行示例

你可以运行上面的 main 方法来测试分布式锁的功能。你也可以创建多个 CuratorDistributedLockExample 实例来模拟并发获取锁的情况。

注意事项

  • 确保你的 ZooKeeper 服务器正在运行。
  • 如果你使用的是不同的 ZooKeeper 配置(如连接字符串),请相应地修改 CONNECT_STRING 和其他配置参数。
  • 如果你想测试多个客户端同时尝试获取锁的情况,可以在不同的 JVM 或线程中启动多个 CuratorDistributedLockExample 实例。

 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com