zookeeper分布式锁

jopen 9年前

1、pom.xml中添加zookeeper依赖

<dependency>      <groupId>org.apache.zookeeper</groupId>      <artifactId>zookeeper</artifactId>      <version>3.4.6</version>  </dependency>

2、DistributedLock.java

package com.zk.dlm;    import org.apache.zookeeper.CreateMode;  import org.apache.zookeeper.KeeperException;  import org.apache.zookeeper.WatchedEvent;  import org.apache.zookeeper.Watcher;  import org.apache.zookeeper.ZooDefs;  import org.apache.zookeeper.ZooKeeper;  import org.apache.zookeeper.data.Stat;    import java.io.IOException;  import java.util.ArrayList;  import java.util.Collections;  import java.util.List;  import java.util.concurrent.CountDownLatch;  import java.util.concurrent.TimeUnit;  import java.util.concurrent.atomic.AtomicInteger;  import java.util.concurrent.locks.Condition;  import java.util.concurrent.locks.Lock;    /**   * Created by Administrator on 2015/11/8.   */  public class DistributedLock implements Lock, Watcher {      private ZooKeeper zk = null;        private String root = "/locks";//根      private String lockName;//竞争资源的标志      private String waitNode;//等待前一个锁      private String myZnode;//当前锁        private CountDownLatch latch;//计数器        private int sessionTimeout = 5000;        private boolean isGetLock = false;        static volatile AtomicInteger count = new AtomicInteger(0);        private DistributedLock(){        }        public static DistributedLock instanceLock(String lockName){          return new DistributedLock(lockName);      }        /**       * 创建分布式锁,使用前请确认config配置的zookeeper服务可用       * @param config 127.0.0.1:2181       * @param lockName 竞争资源标志,lockName中不能包含单词lock       */      private DistributedLock(String lockName){          this.lockName = lockName;          // 创建一个与服务器的连接          try {              zk = initZk();              Stat stat = zk.exists(root, false);              if(stat == null){                  // 创建根节点                  zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);              }          } catch (KeeperException e) {              throw new LockException(e);          } catch (InterruptedException e) {              throw new LockException(e);          }      }          /**       * zookeeper节点的监视器       */      public void process(WatchedEvent event) {          if(this.latch != null) {              this.latch.countDown();          }      }        public void lock() {          try {              if(this.tryLock()){                  //System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");                  return;              }              else{                  waitForLock(waitNode, sessionTimeout);//等待锁              }          } catch (KeeperException e) {              throw new LockException(e);          } catch (InterruptedException e) {              throw new LockException(e);          }      }        public boolean tryLock() {          try {              String splitStr = "_lock_";              if(lockName.contains(splitStr)){                  throw new LockException("lockName can not contains \\u000B");              }              //创建临时子节点              myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);              //System.out.println(myZnode + " is created ");                //取出所有子节点              List<String> subNodes = zk.getChildren(root, false);              //取出所有lockName的锁              List<String> lockObjNodes = new ArrayList<String>();              for (String node : subNodes) {                  String _node = node.split(splitStr)[0];                  if(_node.equals(lockName)){                      lockObjNodes.add(node);                  }              }              Collections.sort(lockObjNodes);              //System.out.println(myZnode + "==" + lockObjNodes.get(0));                if(myZnode.equals(root+"/"+lockObjNodes.get(0))){                  //如果是最小的节点,则表示取得锁                  return true;              }              //如果不是最小的节点,找到比自己小1的节点              String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);              waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);          } catch (KeeperException e) {              throw new LockException(e);          } catch (InterruptedException e) {              throw new LockException(e);          }          return false;      }        @SuppressWarnings("finally")      public boolean tryLock(long time, TimeUnit unit) {          try {              if(this.tryLock()){                  return true;              }              return waitForLock(waitNode,time);          } catch (Exception e) {              throw new LockException(e);          }finally{              return false;          }      }        private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {          Stat stat = zk.exists(root + "/" + lower,true);          //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听          if(stat != null){              //System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);              this.latch = new CountDownLatch(1);              isGetLock = this.latch.await(waitTime, TimeUnit.MILLISECONDS);              this.latch = null;          }          return true;      }        public void unlock() {          try {              //System.out.println("unlock " + myZnode);              zk.delete(myZnode,-1);              myZnode = null;              //zk.close();          } catch (InterruptedException e) {              throw new LockException(e);          } catch (KeeperException e) {              throw new LockException(e);          }      }        public synchronized ZooKeeper initZk() {          try {              if(zk==null){                  zk = new ZooKeeper("127.0.0.1:2181", sessionTimeout,this);              }            } catch (IOException e) {              throw new LockException("zk init connect fail" + e.getMessage());              //System.err.println("zk init connect fail" + e.getMessage());          }          return zk;      }        public void lockInterruptibly() throws InterruptedException {          this.lock();      }        public Condition newCondition() {          return null;      }          public boolean isGetLock() {          return isGetLock;      }        class LockException extends RuntimeException {          private static final long serialVersionUID = 1L;          public LockException(String e){              super(e);          }          public LockException(Exception e){              super(e);          }      }        public static void main(String[] args) throws Exception {          final long starttime = System.currentTimeMillis();            for(int i=0;i<30;i++){              new Thread(new Runnable() {                  public void run() {                      DistributedLock lock = DistributedLock.instanceLock("mylock");;                      while(true){                          try {                              lock.lock();                                count.incrementAndGet();                              System.err.println(System.currentTimeMillis()+"|"+Thread.currentThread().getId() + " | lock value: " + count.get());                            } catch (Exception e) {                              e.printStackTrace();                          }finally{                              lock.unlock();                              long endtime = System.currentTimeMillis();                              System.err.println(count.get()/((endtime-starttime)/1000)+"/s");                          }                        }                    }              }).start();            }            //Thread.sleep(10000);      }  }

参考

caurtor 实现的zk分布式锁

redisson 实现的redis分布式锁

如果规模很大推荐caurtor  如果不是特别大用redisson 就可以

http://www.jiacheo.org/blog/620 

http://www.jiacheo.org/blog/122 

http://blog.csdn.net/zhu_tianwei/article/details/44927331 

https://github.com/mrniko/redisson 

http://www.pandablog.cn/41.html 

http://songwie.com/ 


http://blog.csdn.net/zhu_tianwei/article/details/44927331   jedis实现

http://newliferen.github.io/2015/07/27/ZooKeeper%E5%BA%94%E7%94%A8%E4%B9%8B%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81/ 

https://github.com/sfines/menagerie 

https://github.com/xing4git/blog/blob/master/zookeeper/ZooKeeper%E7%A4%BA%E4%BE%8B%20%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81.md 

http://www.111cn.net/jsp/Java/95461.htm 

http://itfish.net/article/23060.html 

http://www.qkeye.com/blog-37-456727.html