ConcurrentHashmap 解析

jopen 9年前

ConcurrentHashmap(JDK1.7) 

总体描述:

concurrentHashmap是为了高并发而实现,内部采用分离锁的设计,有效地避开了热点访问。而对于每个分段,ConcurrentHashmap采用final和内存可见修饰符volatile关键字(内存立即可见:Java 的内存模型可以保证:某个写线程对 value 域的写入马上可以被后续的某个读线程“看”到。注:并不能保证对volatile变量状态有依赖的其他操作的原子性

借用某博客对concurrentHashmap对结构图:

ConcurrentHashmap 解析

ConcurrentHashmap 解析

不难看出,concurrenthashmap采用了二次hash的方式,第一次hash将key映射到对应的segment,而第二次hash则是映射到segment的不同桶中。

为什么要用二次hash,主要原因是为了构造分离锁,使得对于map的修改不会锁住整个容器,提高并发能力。当然,没有一种东西是绝对完美的,二次hash带来的问题是整个hash的过程比hashmap单次hash要长,所以,如果不是并发情形,不要使用concurrentHashmap。

代码实现:

 该数据结构中,最核心的部分是两个内部类,HashEntry和Segment

    concurrentHashmap维护一个segment数组,将元素分成若干段(第一次hash)

/**  * The segments, each of which is a specialized hash table.  */  final Segment<K,V>[] segments;

segments的每一个segment维护一个链表数组

代码:

再来看看构造方法

public ConcurrentHashMap(int initialCapacity,      float loadFactor, int concurrencyLevel) {      if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)      throw new IllegalArgumentException();      if (concurrencyLevel > MAX_SEGMENTS)      concurrencyLevel = MAX_SEGMENTS;      // Find power-of-two sizes best matching arguments      int sshift = 0;      int ssize = 1;      while (ssize < concurrencyLevel) {      ++sshift;      ssize <<= 1;      }      this.segmentShift = 32 - sshift;      this.segmentMask = ssize - 1;      if (initialCapacity > MAXIMUM_CAPACITY)      initialCapacity = MAXIMUM_CAPACITY;      int c = initialCapacity / ssize;      if (c * ssize < initialCapacity)      ++c;      int cap = MIN_SEGMENT_TABLE_CAPACITY;      while (cap < c)      cap <<= 1;      // create segments and segments[0]      Segment<K,V> s0 =      new Segment<K,V>(loadFactor, (int)(cap * loadFactor),      (HashEntry<K,V>[])new HashEntry[cap]);      Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];      UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]      this.segments = ss;  }

代码28行,一旦指定了concurrencyLevel(segments数组大小)便不能改变,这样,一旦threshold超标,rehash真不会影响segments数组,这样,在大并发的情况下,只会影响某一个segment的rehash而其他segment不会受到影响

(put方法都要上锁)

HashEntry

    与hashmap类似,concurrentHashmap也采用了链表作为每个hash桶中的元素,不过concurrentHashmap又有些不同

static final class HashEntry<K,V> {      final int hash;      final K key;      volatile V value;      volatile HashEntry<K,V> next;             HashEntry(int hash, K key, V value, HashEntry<K,V> next) {      this.hash = hash;      this.key = key;      this.value = value;      this.next = next;      }             /**      * Sets next field with volatile write semantics. (See above      * about use of putOrderedObject.)      */      final void setNext(HashEntry<K,V> n) {      UNSAFE.putOrderedObject(this, nextOffset, n);      }             // Unsafe mechanics      static final sun.misc.Unsafe UNSAFE;      static final long nextOffset;      static {      try {      UNSAFE = sun.misc.Unsafe.getUnsafe();      Class k = HashEntry.class;      nextOffset = UNSAFE.objectFieldOffset      (k.getDeclaredField("next"));      } catch (Exception e) {      throw new Error(e);      }      }  }

HashEntry的key,hash采用final,可以避免并发修改问题,HashEntry链的尾部是不能修改的,而next和value采用volatile,可以避免使用同步造成的并发性能灾难,新版(jdk1.7)的concurrentHashmap大量使用java Unsafe类提供的原子操作,直接调用底层操作系统,提高性能(这块我也不是特别清楚

get方法(1.6 vs 1.7)

1.6

V get(Object key, int hash) {       if (count != 0) { // read-volatile       HashEntry<K,V> e = getFirst(hash);       while (e != null) {       if (e.hash == hash && key.equals(e.key)) {       V v = e.value;       if (v != null)       return v;       return readValueUnderLock(e); // recheck       }       e = e.next;       }       }       return null;   }

1.6的jdk采用了乐观锁的方式处理了get方法,在get的时候put方法正在new对象,而此时value并未赋值,这时判断为空则加锁访问

1.7

public V get(Object key) {      Segment<K,V> s; // manually integrate access methods to reduce overhead      HashEntry<K,V>[] tab;      int h = hash(key);      long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;      if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&      (tab = s.table) != null) {      for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile      (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);      e != null; e = e.next) {      K k;      if ((k = e.key) == key || (e.hash == h && key.equals(k)))      return e.value;      }      }      return null;  }

1.7并没有判断value=null的情况,不知为何

跟同事沟通过,无论是1.6还是1.7的实现,实际上都是一种乐观的方式,而乐观的方式带来的是性能上的提升,但同时也带来数据的弱一致性,如果你的业务是强一致性的业务,可能就要考虑另外的解决办法(用Collections包装或者像jdk6中一样二次加锁获取)

http://ifeve.com/concurrenthashmap-weakly-consistent/

这篇文章可以很好地解释弱一致性问题

put方法

public V put(K key, V value) {          Segment<K,V> s;          if (value == null)              throw new NullPointerException();          int hash = hash(key);          int j = (hash >>> segmentShift) & segmentMask;          if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck               (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment              s = ensureSegment(j);          return s.put(key, hash, value, false);      }

对于put,concurrentHashmap采用自旋锁的方式,不同于1.6的直接获取锁

注:个人理解,这里采用自旋锁可能作者是觉得在分段锁的状态下,并发的可能本来就比较小,并且锁占用时间又并不是特别长,因此自旋锁可以减小线程唤醒和切换的开销

关于hash

private int hash(Object k) {          int h = hashSeed;          if ((0 != h) && (k instanceof String)) {              return sun.misc.Hashing.stringHash32((String) k);          }          h ^= k.hashCode();          // Spread bits to regularize both segment and index locations,          // using variant of single-word Wang/Jenkins hash.          h += (h <<  15) ^ 0xffffcd7d;          h ^= (h >>> 10);          h += (h <<   3);          h ^= (h >>>  6);          h += (h <<   2) + (h << 14);          return h ^ (h >>> 16);      }

concurrentHashMap采用本身hashcode的同时,采用Wang/Jenkins算法对每位都做了处理,使得发生hash冲突的可能性大大减小(否则效率会很差)


而对于concurrentHashMap,segments的大小在初始时确定,此后不变,而元素所在segments桶序列由hash的高位决定

public V put(K key, V value) {          Segment<K,V> s;          if (value == null)              throw new NullPointerException();          int hash = hash(key);          int j = (hash >>> segmentShift) & segmentMask;          if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck               (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment              s = ensureSegment(j);          return s.put(key, hash, value, false);      }

segmentShift为(32-segments大小的二进制长度)


总结

    concurrentHashmap主要是为并发设计,与Collections的包装不同,他不是采用全同步的方式,而是采用非锁get方式,通过数据的弱一致性带来性能上的大幅提升,同时采用分段锁的策略,提高并发能力

参考:

http://www.jb51.net/article/49699.htm

http://my.oschina.net/chihz/blog/58035

http://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/

http://www.ibm.com/developerworks/cn/java/j-jtp06197.html    

来自:http://my.oschina.net/zhenglingfei/blog/400515