ConcurrentHashMap实现

2017-02-24 15:37:27

本文简单描述jdk7下ConcurrentHashMap的实现。

文章主要关注以下问题
1.同步
ConcurrentHashMap是线程安全的,那么如何解决线程冲突问题
2.扩容
如果当前容器已经接收新元素,将如何扩容
3.hash冲突
当出现hash冲突(不同hash值映射到同一位置)时,如何解决

构造

public ConcurrentHashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}


public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    ...

    // 计算一个大于concurrencyLevel的2的幂指数作为实际并发度
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // segmentShift,segmentMask用于计算Segment位置
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    // 根据计算的并发度,计算每一个Segment的容量
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;

    // 创建Segments数组和Segment[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

UNSAFE是jdk提供的,用于直接操作内存,是不安全的操作。
关于UNSAFE,可参考: java中的Unsafe

Segment是ConcurrentHashMap的内部静态类

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile HashEntry<K,V>[] table;
    transient int count;

}

HashEntry数组是真正的存储元素的数组,
Segment继承自ReentrantLock,所以带有锁的功能。

HashEntry也是ConcurrentHashMap的内部静态类

static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;
}

HashEntry中有next属性,指向下一个HashEntry元素,实际上HashEntry为一个链表,当hash冲突时,可以存储不同的值。(解决hash冲突的问题)

put

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 计算hash值
    int hash = hash(key);
    // 计算Segment下标
    int j = (hash >>> segmentShift) & segmentMask;
    // 查找Segment
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // put操作
    return s.put(key, hash, value, false);
}

计算hash值

private int hash(Object k) {
    int h = hashSeed;

    if ((0 != h) && (k instanceof String)) {
        return sun.misc.Hashing.stringHash32((String) k);
    }

    h ^= k.hashCode();


    h += (h <<  15) ^ 0xffffcd7d;
    h ^= (h >>> 10);
    h += (h <<   3);
    h ^= (h >>>  6);
    h += (h <<   2) + (h << 14);
    return h ^ (h >>> 16);
}

这时将对hash值使用Wang/Jenkins hash算法。

计算Segment下标

int j = (hash >>> segmentShift) & segmentMask;

运行时通过将key的高n位(n = 32 – segmentShift)和并发度减1(segmentMask)做位与运算定位到所在的Segment。
为什么是32呢,因为int只有32位(其中一位表示符号)。

创建Segment
由于支持并发操作,当segments[j]为空时,必须重复检查操作,

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    // 重复检查
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 使用ss[0]的属性构造新的Segment
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        // 再次检查
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // recheck
            // 构造新的Segment
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // cas操作
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

这时进行了两次重复检查,并使用cas操作添加新的Segment

cas操作即比较交换操作,只有当ss[u]为null时,才进行赋值操作,否则将直接返回,而且操作是原子性的。这样就可以保证在并发的情况下,构造Segment的是安全的。

Segment.put

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 加锁
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        // 计算元素所在数组下标
        int index = (tab.length - 1) & hash;
        // 查询对应的HashEntry
        HashEntry<K,V> first = entryAt(tab, index);

        for (HashEntry<K,V> e = first;;) {
            // 如果当前HashEntry不空,则遍历链表,如果hash相同的,则使用新值替换
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {    // 如果HashEntry为空,则创建新的HashEntry
                // 新HashEntry的next指向first
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 如果put元素后容量将超过临界值,则扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 释放锁
        unlock();
    }
    return oldValue;
}

ConcurrentHashMap中的put操作,在Segment范围内进行了加锁操作,而ConcurrentHashMap中将根据并发度创建多个Segment(默认为16),put操作时如果put到不同的Segment,就不需要竞争锁了,可以提高操作速度。

扩容

private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    int newCapacity = oldCapacity << 1;    // 容量翻倍
    threshold = (int)(newCapacity * loadFactor);    // 新扩容临界值
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];    // 创建新数组
    int sizeMask = newCapacity - 1;
    // 遍历原数组,放到新数组中
    for (int i = 0; i < oldCapacity ; i++) {    
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            int idx = e.hash & sizeMask;
            // 单一元素,直接移动
            if (next == null)   
                newTable[idx] = e;
            else {
                // 
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    int nodeIndex = node.hash & sizeMask; // 添加新元素
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

由于HashEntry长度都是2的倍数,而且扩展都是翻倍扩展,所以在长度oldLen的数组第i位的元素,扩展后的位置只可以为i或oldLen+i,所以这时做了一个简单的优化,以免创建过多的元素。当一个HashEntry为数组时,找到最后一段下标不变的数组,直接放到新的位置,然后遍历前面的元素,创建新的元素并放到适当位置

size

调用size方法时,需要遍历所有的Segment,并且需要并发情况下保持Segment长度不变,下面看看实现:

  1. 连续两次求所有Segment的modcount和, 如果相等,则过程中没有发生其他线程修改ConcurrentHashMap的情况,返回获得的值。

  2. 如果两次和不相等,则需要对所有的Segment依次进行加锁,获取统一的值后再依次解锁。

    public int size() {
     // Try a few times to get accurate count. On failure due to
     // continuous async changes in table, resort to locking.
     final Segment<K,V>[] segments = this.segments;
     int size;
     boolean overflow; // true if size overflows 32 bits
     long sum;         // sum of modCounts
     long last = 0L;   // previous sum
     int retries = -1; // first iteration isn't retry
     try {
         for (;;) {
             if (retries++ == RETRIES_BEFORE_LOCK) {
                 for (int j = 0; j < segments.length; ++j)
                     ensureSegment(j).lock(); // force creation
             }
             sum = 0L;
             size = 0;
             overflow = false;
             for (int j = 0; j < segments.length; ++j) {
                 Segment<K,V> seg = segmentAt(segments, j);
                 if (seg != null) {
                     sum += seg.modCount;
                     int c = seg.count;
                     if (c < 0 || (size += c) < 0)
                         overflow = true;
                 }
             }
             if (sum == last)
                 break;
             last = sum;
         }
     } finally {
         if (retries > RETRIES_BEFORE_LOCK) {
             for (int j = 0; j < segments.length; ++j)
                 segmentAt(segments, j).unlock();
         }
     }
     return overflow ? Integer.MAX_VALUE : size;
    }
    

错误之处,还望指出

参考:
Concurrency of ConcurrentHashMap
谈谈ConcurrentHashMap1.7和1.8的不同实现
Java 8系列之重新认识HashMap