Java多线程之自旋锁与队列锁

phxv5212 9年前
   <p>编写高效的并发程序,需要对互斥问题重新研究,设计出适用于多线程的互斥协议。那么问题来了,如果不能获得锁,应该怎么做?</p>    <ul>     <li> <p>旋转:继续进行尝试,如自旋锁,延迟较短;</p> </li>     <li> <p>阻塞:挂起自己,请求调度器切换到另一个线程,代价较大。</p> </li>    </ul>    <p>综合来看,先旋转一小段时间再阻塞,是种不错的选择。</p>    <p>java.util.concurrent.locks.Lock 接口提供了 lock() 和 unlock() 两个重要的方法,用于解决实际互斥问题。</p>    <pre>  <code class="language-java">Lock mutex = new MyLock();  mutex.lock();  try {      do something  } finally {      mutex.unlock();  }</code></pre>    <h2>测试-设置锁</h2>    <p>测试-设置来源于 getAndSet() 操作,通过一个原子布尔型状态变量的值判断当前锁的状态。若为 true 表示锁忙,若为 false 表示锁空闲。</p>    <pre>  <code class="language-java">public class TASLock {      AtomicBoolean state = new AtomicBoolean(false);        public void lock() {          while (state.getAndSet(true)) {              ;          }      }        public void unlock() {          state.set(false);      }  }</code></pre>    <h2>测试-测试-设置锁</h2>    <p>TTASLock是升级版的TASLock算法,没有直接调用 getAndSet() 方法,而是在锁看起来空闲( state.get() 返回 false )时才调用。</p>    <pre>  <code class="language-java">public class TTASLock {      AtomicBoolean state = new AtomicBoolean(false);        public void lock() {          while (true) {              while (state.get()) {                  ;              }              if (! state.getAndSet(true)) {                  return;              }          }      }        public void unlock() {          state.set(false);      }  }</code></pre>    <p>这两个算法都能保证无死锁的互斥,但是TTASLock的性能会比TASLock高许多。</p>    <p>可以从计算机系统结构的高速缓存和局部性来解释这个问题,每个处理器都有一个cache,cache的访问速度比内存快好几个数量级。当cache命中时,会立即加载这个值;当cache缺失时,会在内存或两一个处理器的cache中寻找这个数据。寻找的过程比较漫长,处理器在总线上广播这个地址,其他处理器监听总线。若其他处理器在自己的cache中发现这个地址,则广播该地址及其值来做出响应。若所有处理器都没发现这个地址,则以内存地址及其所对应的值进行响应。</p>    <p>getAndSet() 的直接调用让TASLock性能损失许多:</p>    <ul>     <li> <p>getAndSet() 的调用实质是总线上的一个广播,这个调用将会延迟所有的线程,因为所有线程都要通过监听总线通信。</p> </li>     <li> <p>getAndSet() 的调用会更新 state 的值,即使值仍为 true ,但是其他处理器cache中的锁副本将会被丢弃,从而导致cache缺失。</p> </li>     <li> <p>当持有锁的线程试图释放锁时可能被延迟,因为总线被正在自旋的线程独占。</p> </li>    </ul>    <p>与此相反,对于TTASLock算法采用的是本地旋转(线程反复地重读被缓存的值而不是反复地使用总线),线程A持有锁时,线程B尝试获得锁,但线程B只会在第一次读锁是cache缺失,之后每次cache命中不产生总线流量。</p>    <p>那么缺点来了,TTASLock释放锁时,会使各自旋线程处理器中的cache副本立即失效,这些线程会重新读取这个值,造成总线流量风暴。</p>    <h2>指数后退锁</h2>    <p>对于TTASLock算法,当锁看似空闲( state.get() 返回 false )时,存在高争用(多个线程试图同时获取一个锁)的可能。高争用意味着获取锁的可能性小,并且会造成总线流量增加。线程在重试之前回退一段时间是种不错的选择。</p>    <p>这里实现的指数回退算法的回退准则是,不成功尝试的次数越多,发生争用的可能性就越高,线程需要后退的时间就应越长。</p>    <pre>  <code class="language-java">public class BackoffLock {      private AtomicBoolean state = new AtomicBoolean(false);      private static final int MIN_DELAY = 10;      private static final int MAX_DELAY = 100;        public void lock() {          Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);          while (true) {              while (state.get()) {                  ;              }              if (! state.getAndSet(true)) {                  return;              } else {                  backoff.backoff();              }          }      }        public void unlock() {          state.set(false);      }  }    class Backoff {      private final int minDelay, maxDelay;      int limit;      final Random random;        public Backoff(int min, int max) {          minDelay = min;          maxDelay = max;          limit = minDelay;          random = new Random();      }        public void backoff() {          int delay = random.nextInt(limit);          limit = Math.min(maxDelay, 2 * limit);          try {              Thread.sleep(delay);          } catch (InterruptedException e) {              ;          }      }  }</code></pre>    <p>指数后退算法解决了TTASLock释放锁时的高争用问题,但是它的性能与 minDelay 和 maxDelay 的选取密切相关,并且很难找到一个通用兼容的值。</p>    <p>另外,BackoffLock算法还有两个问题:</p>    <ul>     <li> <p>cache一致性流量:所有线程都在同一个共享存储单元上旋转;</p> </li>     <li> <p>临界区利用率低:后退时间无法确定,线程延迟可能过长。</p> </li>    </ul>    <h2>基于数组的锁</h2>    <p>下面的这些是队列锁,名字看上去奇形怪状的,其实是发明者名字的首字母。队列锁就是将线程组织成一个队列,让每个线程在不同的存储单元上旋转,从而降低cache一致性流量。</p>    <p>基于循环数组实现队列锁ALock,每个线程检测自己的slot对应的 flag[] 域来判断是否轮到自己。</p>    <p>一个线程想获得锁,就要调用 lock() 方法,获得自增 tail 获得分配的slot号,然后等待这个slot空闲;当释放锁时,就要阻塞当前slot,然后让下一个slot可运行。</p>    <p>当 flag[i] 为 true 时,那么这个线程就有权获得锁。任意时刻的 flag[] 数组中,应该只有一个slot的值为 true 。</p>    <pre>  <code class="language-java">public class ALock {      ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer>();      AtomicInteger tail;      volatile boolean [] flag;      int size;        public ALock() {          size = 100;          tail = new AtomicInteger(0);          flag = new boolean[size];          flag[0] = true;      }        public void lock() {          int slot = tail.getAndIncrement() % size;          mySlotIndex.set(slot);          while (! flag[slot]) {              ;          }      }        public void unlock() {          int slot = mySlotIndex.get();          flag[slot] = false;          flag[(slot + 1) % size] = true;      }  }</code></pre>    <ul>     <li> <p>mySlotIndex 是线程的局部变量,只能被一个线程访问,每个线程都有自己独立初始化的副本。不需要保存在共享存储器,不需要同步,不会产生一致性流量。使用 get() 和 set() 方法来访问局部变量的值。</p> </li>     <li> <p>tail 是常规变量,域被所有的线程共享,支持原子操作。</p> </li>     <li> <p>数组 flag[] 也是被多个线程共享的,但是每个线程都是在一个数组元素的本地cache副本上旋转。</p> </li>    </ul>    <p>ALock对BackoffLock的改进:在多个共享存储单元上旋转,将cache无效性降到最低;把一个线程释放锁和另一个线程获得该锁之间的时间间隔最小化;先来先服务的公平性。但是,数组的大小至少与最大的并发线程数相同,并不是空间有效的,当并发线程最大个数为n时,同步L个不同对象就需要O(Ln)大小的空间。</p>    <h2>CLH队列锁</h2>    <p>CLH队列锁表示为 QNode 对象的链表,每个线程通过一个线程局部变量 pred 指向其前驱。每个线程通过检测前驱结点的 locked 域来判断是否轮到自己。如果该域为 true ,则前驱线程要么已经获得锁要么正在等待锁;如果该域为 false ,则前驱进程已释放锁,轮到自己了。正常情况下,队列链中只有一个结点的 locked 域为 false 。</p>    <p>当一个线程调用 lock() 方法想获得锁时,将自己的 locked 域置为 true ,表示该线程不准备释放锁,然后并将自己的结点加入到队列链尾部。最后就是在前驱的 locked 域上旋转,等待前驱释放锁。当这个线程调用 unlock() 方法要释放锁时,线程要将自己的 locked 域置为 false ,表示已经释放锁,然后将前驱结点作为自己的新结点以便日后访问。</p>    <p>那么问题来了,为什么要在释放锁时做 myNode.set(myPred.get()) 这个处理呢?假设线程A释放锁,A的后继结点为B,如果不使用这种处理方式,A在释放锁后马上申请锁将自己的 locked 域置为 true ,整个动作在B检测到前驱A释放锁之前,那么将发生死锁。</p>    <pre>  <code class="language-java">public class CLHLock {      AtomicReference<QNode> tail;      ThreadLocal<QNode> myPred;      ThreadLocal<QNode> myNode;        public CLHLock() {          tail = new AtomicReference<QNode>(new QNode());          myPred = new ThreadLocal<QNode>() {              protected QNode initialValue() {                  return null;              }          };          myNode = new ThreadLocal<QNode>() {              protected QNode initialValue() {                  return new QNode();              }          };      }        public void lock() {          QNode qnode = myNode.get();          qnode.locked = true;          QNode pred = tail.getAndSet(qnode);          myPred.set(pred);          while (pred.locked) {              ;          }      }        public void unlock() {          QNode qnode = myNode.get();          qnode.locked = false;          myNode.set(myPred.get());      }        class QNode {          boolean locked = false;      }  }</code></pre>    <p>如果最大线程数为n,有L个不同对象,那么CLHLock只需要O(L+n)空间。比ALock所需空间少,并且不需要知道可能使用锁的最大线程数量。但是,在无cache的系统上性能较差,因为一次要访问两个结点,若这两个结点内存位置较远,性能损失会很大。</p>    <h2>MCS队列锁</h2>    <p>MCS队列锁通过检测自己所在结点的 locked 的值来判断是否轮到自己,等待这个域被前驱释放锁时改变。</p>    <p>线程若要获得锁,需把自己结点添加到链表的尾部。若队列链表原先为空,则获得锁。否则,将前驱结点的 next 域指向自己,在自己的 locked 域上自旋等待,直到前驱将该域置为 false 。线程若要释放锁,判断是否在队尾,如果是只需将 tail 置为 null ,如果不是需将后继的 locked 域置为 false 且将自己结点的 next 域置为默认的 null 。注意在队尾的情况,可能有个线程正在获得锁,要等一下变为后一种情况。</p>    <pre>  <code class="language-java">public class MCSLock {      AtomicReference<QNode> tail;      ThreadLocal<QNode> myNode;        public MCSLock() {          tail = new AtomicReference<QNode>(null);          myNode = new ThreadLocal<QNode>() {              protected QNode initialValue() {                  return new QNode();              }          };      }        public void lock() {          QNode qnode = myNode.get();          QNode pred = tail.getAndSet(qnode);          if (pred != null) {              qnode.locked = true;              pred.next = qnode;              while (qnode.locked) {                  ;              }          }      }        public void unlock() {          QNode qnode = myNode.get();          if (qnode.next == null) {              if (tail.compareAndSet(qnode, null)) {                  return;              }              while (qnode.next == null) {                  ;              }          }          qnode.next.locked = false;          qnode.next = null;      }        class QNode {          boolean locked = false;          QNode next = null;      }  }</code></pre>    <p>结点能被重复使用,该锁的空间复杂度为O(L+n)。MCSLock算法适合无cache的系统结构,因为每个线程只需控制自己自旋的存储单元。但是,释放锁也需要旋转,另外读写比较次数比CLHLock多。</p>    <h2>时限队列锁</h2>    <p>Lock接口中有个一个 tryLock() 方法,可以指定一个时限(获得锁可等待的最大时间),若获得锁超时则放弃尝试。最后返回一个布尔值说明锁是否申请成功。</p>    <p>时限队列锁TOLock是基于CLHLock类的,锁是一个结点的虚拟队列,每个结点在它的前驱结点上自旋等待锁释放。但是当这个线程超时时,不能简单的抛弃它的队列结点,而是将这个结点标记为已废弃,这样它的后继们(如果有)就会注意到这个结点已放弃,转而在放弃结点的前驱上自旋。为了保证队列链表的连续性,每次申请锁都会 new 一个 QNode 。</p>    <p>时限队列锁结点的域 pred 会特殊一点,它有3种类型的取值:</p>    <ul>     <li> <p>null :初始状态,未获得锁或已释放锁;</p> </li>     <li> <p>AVAILABLE :一个静态结点,表示对应结点已释放锁,申请锁成功;</p> </li>     <li> <p>QNode : pred 域为 null 的前驱结点,表示对应结点因超时放弃锁请求,在放弃请求时才会设置这个值。</p> </li>    </ul>    <p>申请锁时,创建一个 pred 域为 null 的新结点并加入到链表尾部,若原先链表为空或前驱结点已释放锁,则获得锁。否则,在尝试时间内,找到 pred 域为 null 的前驱结点,等待它释放锁。若在等待前驱结点释放锁的过程中超时,就尝试从链表中删除这个结点,要分这个结点是否有后继两种情况。</p>    <p>释放锁时,检查该结点是否有后继,若无后继可直接把 tail 设置为 null ,否则将该结点的 pred 域指向 AVAILABLE 。</p>    <pre>  <code class="language-java">public class TOLock {      static QNode AVAILABLE = new QNode();      AtomicReference<QNode> tail;      ThreadLocal<QNode> myNode;        public TOLock() {          tail = new AtomicReference<QNode>(null);          myNode = new ThreadLocal<QNode>();      }        public boolean tryLock(long time, TimeUnit unit) {          long startTime = System.currentTimeMillis();          long patience = TimeUnit.MILLISECONDS.convert(time, unit);          QNode qnode = new QNode();          myNode.set(qnode);          qnode.pred = null;          QNode myPred = tail.getAndSet(qnode);          if (myPred == null || myPred.pred == AVAILABLE) {              return true;          }          while (System.currentTimeMillis() - startTime < patience) {              QNode predPred = myPred.pred;              if (predPred == AVAILABLE) {                  return true;              } else {                  if (predPred != null) {                      myPred = predPred;                  }              }          }          if (! tail.compareAndSet(qnode, myPred)) {              qnode.pred = myPred;          }          return false;      }        public void unlock() {          QNode qnode = myNode.get();          if (! tail.compareAndSet(qnode, null)) {              qnode.pred = AVAILABLE;          }      }        static class QNode {          public QNode pred = null;      }  }</code></pre>    <p>优点:同CLHLock。缺点:每次申请锁都要分配一个新结点,在锁上旋转的线程可能要回溯一个超时结点链。</p>    <p>上面实现的这些锁算法不支持重入。我们可以使用银行转账的例子来测试一下锁的效果,任意账户间可以随意转账,锁生效时所有账户的总金额是不变的。完整的算法实现和测试代码在 <a href="/misc/goto?guid=4959670719143545615" rel="nofollow,noindex">这里</a> 。</p>