zookeeper 分布式锁的实现
zookeeper 分布式锁的实现
临时顺序节点,这种类型的节点有几下几个特性:
-
节点的生命周期和客户端会话绑定,即创建节点的客户端会话一旦失效,那么这个节点也会被清除。
-
每个父节点都会负责维护其子节点创建的先后顺序,并且如果创建的是顺序节点(SEQUENTIAL)的话,父节点会自动为这个节点分配一个整形数值,以后缀的形式自动追加到节点名中,作为这个节点最终的节点名。
利用上面这两个特性,我们来看下获取实现分布式锁的基本逻辑:
-
客户端调用create()方法创建名为“_locknode_/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。
-
客户端调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,同时在这个节点上注册上子节点变更通知的Watcher。
-
客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。
-
如果在步骤3中发现自己并非是所有子节点中最小的,说明自己还没有获取到锁,就开始等待,直到下次子节点变更通知的时候,再进行子节点的获取,判断是否获取锁。
释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可。
以上信息来自:http://jm-blog.aliapp.com/?p=2554
根据这个思路,来实现基于zookeeper的分布式锁。
直接贴代码,如下,如果有不合适的或需要改进的地方,请指教。
package com.usfot; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ThreadLocalRandom; /** * Shared意味着锁是全局可见的,客户端都可以请求锁。 * DistributedSharedLock 应该是线程安全的。有待验证 * Created by liyanxin on 2015/3/18. */ public class DistributedSharedLock implements Watcher { private static final String ADDR = "127.0.0.1:2181"; private static final String LOCK_NODE = "guid-lock-"; private String rootLockNode; //锁目录 private ZooKeeper zk = null; private Integer mutex; private Integer currentLock; /** * 构造函数实现 * 连接zk服务器 * 创建zk锁目录 * * @param rootLockNode */ public DistributedSharedLock(String rootLockNode) { this.rootLockNode = rootLockNode; try { //连接zk服务器 zk = new ZooKeeper(ADDR, 10 * 10000, this); } catch (IOException e) { e.printStackTrace(); } mutex = new Integer(-1); // Create ZK node name if (zk != null) { try { //建立根目录节点 Stat s = zk.exists(rootLockNode, false); if (s == null) { zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out.println("Keeper exception when instantiating queue: " + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception"); } } } /** * 请求zk服务器,获得锁 * * @throws KeeperException * @throws InterruptedException */ public void acquire() throws KeeperException, InterruptedException { ByteBuffer b = ByteBuffer.allocate(4); byte[] value; // Add child with value i b.putInt(ThreadLocalRandom.current().nextInt(10)); value = b.array(); // 创建锁节点 String lockName = zk.create(rootLockNode + "/" + LOCK_NODE, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); synchronized (mutex) { while (true) { // 获得当前锁节点的number,和所有的锁节点比较 Integer acquireLock = new Integer(lockName.substring(lockName.lastIndexOf('-') + 1)); List<String> childLockNode = zk.getChildren(rootLockNode, true); SortedSet<Integer> sortedLock = new TreeSet<Integer>(); for (String temp : childLockNode) { Integer tempLockNumber = new Integer(temp.substring(temp.lastIndexOf('-') + 1)); sortedLock.add(tempLockNumber); } currentLock = sortedLock.first(); //如果当前创建的锁的序号是最小的那么认为这个客户端获得了锁 if (currentLock >= acquireLock) { System.err.println("thread_name=" + Thread.currentThread().getName() + "|attend lcok|lock_num=" + currentLock); return; } else { //没有获得锁则等待下次事件的发生 System.err.println("thread_name=" + Thread.currentThread().getName() + "|wait lcok|lock_num=" + currentLock); mutex.wait(); } } } } /** * 释放锁 * * @throws KeeperException * @throws InterruptedException */ public void release() throws KeeperException, InterruptedException { String lockName = String.format("%010d", currentLock); zk.delete(rootLockNode + "/" + LOCK_NODE + lockName, -1); System.err.println("thread_name=" + Thread.currentThread().getName() + "|release lcok|lock_num=" + currentLock); } @Override public void process(WatchedEvent event) { synchronized (mutex) { mutex.notify(); } } }
测试代码如下,
package com.usfot; import org.apache.zookeeper.KeeperException; /** * 启动10个线程,都获得一个锁对象,每个线程都有一个锁对象。 * 锁对象请求zk服务器获得锁。如果不能获得锁,则等待。 * 当一个线程获得锁时,其他线程将等待锁的释放。 * Created by liyanxin on 2015/3/18. */ public class DistributedSharedLockTest { public static void main(String args[]) { for (int i = 0; i < 10; i++) { Thread t = new Thread(new Runnable() { @Override public void run() { DistributedSharedLock lock = new DistributedSharedLock("/_locknode_"); try { lock.acquire(); Thread.sleep(1000); //获得锁之后可以进行相应的处理 System.out.println("======获得锁后进行相应的操作======"); lock.release(); System.err.println("============================="); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); t.start(); } } }
看一下测试情况,
thread_name=Thread-5|wait lcok|lock_num=300 thread_name=Thread-1|attend lcok|lock_num=300 thread_name=Thread-9|wait lcok|lock_num=300 thread_name=Thread-3|wait lcok|lock_num=300 thread_name=Thread-7|wait lcok|lock_num=300 thread_name=Thread-6|wait lcok|lock_num=300 thread_name=Thread-2|wait lcok|lock_num=300 thread_name=Thread-0|wait lcok|lock_num=300 thread_name=Thread-8|wait lcok|lock_num=300 thread_name=Thread-4|wait lcok|lock_num=300 ======获得锁后进行相应的操作====== thread_name=Thread-2|wait lcok|lock_num=301 thread_name=Thread-6|wait lcok|lock_num=301 thread_name=Thread-1|release lcok|lock_num=300 ============================= .................................
这是一部分的打印日志。
在换一种方式测试一下锁对象的线程安全性。如下测试代码,
package com.usfot; import org.apache.zookeeper.KeeperException; /** * Created by liyanxin on 2015/3/18. */ public class DistributedSharedLockTest2 { public static void main(String args[]) { final DistributedSharedLock lock = new DistributedSharedLock("/_locknode_"); /** * 所有的线程都共享一个锁对象,验证锁对象的线程安全性 * 锁是阻塞的 */ for (int i = 0; i < 10; i++) { Thread t = new Thread(new Runnable() { @Override public void run() { try { lock.acquire(); Thread.sleep(1000); //获得锁之后可以进行相应的处理 System.out.println("======获得锁后进行相应的操作======"); lock.release(); System.err.println("============================="); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); t.start(); } } }
经过测试,发现了死锁deadlock的问题,这个问题如何导致的呢?是因为多个线程都会在mutex对象的内置锁上发生竞争。当线程A获得mutex对象的内置锁时,会进入到同步代码块,进行获取zk服务器的分布式锁的操作,当获得分布式锁后,退出同步代码块,mutex的内置锁也就被线程A释放。大量的线程都在竞争mutex对象的内置锁。这时,线程B获得mutex的内置锁,进入同步代码块,由于没有获得分布式锁,线程B等待。然后这时线程A释放分布式锁,删除zk服务器锁节点,此时触发watcher事件,唤醒mutex对象内置锁上等待的线程,
注意使用notify唤醒。notify大家应该知道,只能唤醒所有等待线程的其中一个,或许刚好此时唤醒的不是线程B,那么deadlock就来了。
怎么解决?我把notify换成notifyAll试了下,程序能顺利执行,没有死锁的现象。
重新运行代码,如下日志,
thread_name=Thread-9|wait lcok|lock_num=360 thread_name=Thread-0|wait lcok|lock_num=360 thread_name=Thread-1|wait lcok|lock_num=360 thread_name=Thread-5|wait lcok|lock_num=360 thread_name=Thread-7|wait lcok|lock_num=360 thread_name=Thread-3|wait lcok|lock_num=360 thread_name=Thread-6|wait lcok|lock_num=360 thread_name=Thread-2|attend lcok|lock_num=360 thread_name=Thread-4|wait lcok|lock_num=360 thread_name=Thread-8|wait lcok|lock_num=360 ======获得锁后进行相应的操作====== thread_name=Thread-2|release lcok|lock_num=360 ============================= thread_name=Thread-8|attend lcok|lock_num=361 thread_name=Thread-4|wait lcok|lock_num=361 thread_name=Thread-6|wait lcok|lock_num=361 ..............
一部分的打印日志
参考:http://blog.csdn.net/java2000_wl/article/details/8694270
=====================================================================
改进后的分布式锁实现
下面是改进后的分布式锁实现,和之前的实现方式唯一不同之处在于,这里设计成每个锁竞争者,只需要关注”_locknode_”节点下序号比自己小的那个节点是否存在即可。实现如下:
-
客户端调用create()方法创建名为“_locknode_/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。
-
客户端调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,注意,这里不注册任何Watcher。
-
客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点序号最小,那么就认为这个客户端获得了锁。
-
如果在步骤3中发现自己并非所有子节点中最小的,说明自己还没有获取到锁。此时客户端需要找到比自己小的那个节点,然后对其调用exist()方法,同时注册事件监听。
-
之后当这个被关注的节点被移除了,客户端会收到相应的通知。这个时候客户端需要再次调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,确保自己确实是最小的节点了,然后进入步骤3。
=================================END=================================