深入剖析 Java7 中的 HashMap 和 ConcurrentHashMap

本文將深入剖析 Java7 中的 HashMap 和 ConcurrentHashMap 的源碼,解析 HashMap 線程不安全的原理以及解決方案,最後以測試用例加以驗證。

1 Java7 HashMap

HashMap 的數據結構:

深入剖析 Java7 中的 HashMap 和 ConcurrentHashMap

從上圖中可以看出,HashMap 底層就是一個數組結構,數組中的每一項又是一個鏈表。

通過查看 JDK 中的 HashMap 源碼,可以看到其構造函數有一行代碼:

public HashMap(int initialCapacity, float loadFactor) {
...
table = new Entry[capacity];
...
}

即創建了一個大小為 capacity 的 Entry 數組,而 Entry 的結構如下:

static class Entry implements Map.Entry {
final K key;
V value;
Entry next;
final int hash;
……
}

可以看到,Entry 是一個 static class,其中包含了 key 和 value ,也就是鍵值對,另外還包含了一個 next 的 Entry 指針。

  • capacity:當前數組容量,始終保持 2^n,可以擴容,擴容後數組大小為當前的 2 倍。默認初始容量為 16。
  • loadFactor:負載因子,默認為 0.75。
  • threshold:擴容的閾值,等於 capacity * loadFactor

1.1 put過程分析

public V put(K key, V value) {
// 當插入第一個元素的時候,需要先初始化數組大小
if (table == EMPTY_TABLE) {
inflateTable(threshold);
}
// 如果 key 為 null,則這個 entry 放到 table[0] 中
if (key == null)
return putForNullKey(value);
// key 的 hash 值
int hash = hash(key);
// 找到對應的數組下標
int i = indexFor(hash, table.length);
// 遍歷一下對應下標處的鏈表,看是否有重複的 key 已經存在,
// 如果有,直接覆蓋,put 方法返回舊值就結束了
for (Entry e = table[i]; e != null; e = e.next) {
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}

modCount++;
// 不存在重複的 key,將此 entry 添加到鏈表中
addEntry(hash, key, value, i);
return null;
}

這裡對一些方法做深入解析。

  • 數組初始化
private void inflateTable(int toSize) {
// 保證數組大小一定是 2^n
int capacity = roundUpToPowerOf2(toSize);
// 計算擴容閾值
threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
// 初始化數組
table = new Entry[capacity];
initHashSeedAsNeeded(capacity);
}
  • 找到對應的數組下標
static int indexFor(int hash, int length) {
// 作用等價於取模運算,但這種方式效率更高
return hash & (length-1);
}

因為HashMap的底層數組長度總是 2^n,當 length 為 2 的 n 次方時,hash & (length-1) 就相當於對length取模,而且速度比直接取模要快的多。

  • 添加節點到鏈表中
void addEntry(int hash, K key, V value, int bucketIndex) {
// 如果當前 HashMap 大小已經達到了閾值,並且新值要插入的數組位置已經有元素了,那麼要擴容

if ((size >= threshold) && (null != table[bucketIndex])) {
// 擴容
resize(2 * table.length);
// 重新計算 hash 值
hash = (null != key) ? hash(key) : 0;
// 計算擴容後的新的下標
bucketIndex = indexFor(hash, table.length);
}
createEntry(hash, key, value, bucketIndex);
}
// 永遠都是在鏈表的表頭添加新元素
void createEntry(int hash, K key, V value, int bucketIndex) {
// 獲取指定 bucketIndex 索引處的 Entry
Entry e = table[bucketIndex];
// 將新創建的 Entry 放入 bucketIndex 索引處,並讓新的 Entry 指向原來的 Entry
table[bucketIndex] = new Entry<>(hash, key, value, e);
size++;
}

當系統決定存儲 HashMap 中的 key-value 對時,完全沒有考慮 Entry 中的 value,僅僅只是根據 key 來計算並決定每個 Entry 的存儲位置。我們完全可以把 Map 集合中的 value 當成 key 的附屬,當系統決定了 key 的存儲位置之後,value 隨之保存在那裡即可。

  • 數組擴容

隨著 HashMap 中元素的數量越來越多,發生碰撞的概率將越來越大,所產生的子鏈長度就會越來越長,這樣勢必會影響 HashMap 的存取速度。為了保證 HashMap 的效率,系統必須要在某個臨界點進行擴容處理,該臨界點 threshold。而在 HashMap 數組擴容之後,最消耗性能的點就出現了:原數組中的數據必須重新計算其在新數組中的位置,並放進去,這就是 resize。

void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
// 若 oldCapacity 已達到最大值,直接將 threshold 設為 Integer.MAX_VALUE
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return; // 直接返回
}
// 否則,創建一個更大的數組
Entry[] newTable = new Entry[newCapacity];
//將每條Entry重新哈希到新的數組中
transfer(newTable, initHashSeedAsNeeded(newCapacity));
table = newTable;
// 重新設定 threshold
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

1.2 get過程分析

public V get(Object key) {
// key 為 null 的話,會被放到 table[0],所以只要遍歷下 table[0] 處的鏈表就可以了
if (key == null)
return getForNullKey();
// key 非 null 的情況,詳見下文
Entry entry = getEntry(key);

return null == entry ? null : entry.getValue();
}
final Entry getEntry(Object key) {
// The number of key-value mappings contained in this map.
if (size == 0) {
return null;
}

// 根據該 key 的 hashCode 值計算它的 hash 碼
int hash = (key == null) ? 0 : hash(key);
// 確定數組下標,然後從頭開始遍歷鏈表,直到找到為止

for (Entry e = table[indexFor(hash, table.length)];
e != null;
e = e.next) {
Object k;
//若搜索的key與查找的key相同,則返回相對應的value
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
}
return null;
}

2 Java7 ConcurrentHashMap

ConcurrentHashMap 的成員變量中,包含了一個 Segment 數組 final Segment[] segments;,而 Segment 是ConcurrentHashMap 的內部類。

然後在 Segment 這個類中,包含了一個 HashEntry 的數組transient volatile HashEntry[] table,而 HashEntry 也是 ConcurrentHashMap 的內部類。

HashEntry 中,包含了 key 和 value 以及 next 指針(類似於 HashMap 中的 Entry),所以 HashEntry 可以構成一個鏈表。

2.1 成員變量及構造函數

public class ConcurrentHashMap extends AbstractMap
implements ConcurrentMap, Serializable {
...
//初始的容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//初始的加載因子

static final float DEFAULT_LOAD_FACTOR = 0.75f;
//初始的併發等級,表示當前更新線程的估計數
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//最小的segment數量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//最大的segment數量
static final int MAX_SEGMENTS = 1 << 16;
//
static final int RETRIES_BEFORE_LOCK = 2;
// segments 的掩碼值, key 的散列碼的高位用來選擇具體的 segment
final int segmentMask;
// 偏移量
final int segmentShift;
final Segment[] segments;
...
// 創建一個帶有指定初始容量、加載因子和併發級別的新的空映射
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 尋找最佳匹配參數(不小於給定參數的最接近的 2^n)
int sshift = 0; // 用來記錄向左按位移動的次數
int ssize = 1; // 用來記錄Segment數組的大小
// 計算並行級別 ssize,因為要保持並行級別是 2^n
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 若為默認值,concurrencyLevel 為 16,sshift 為 4
// 那麼計算出 segmentShift 為 28,segmentMask 為 15

this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 記錄每個 Segment 上要放置多少個元素
int c = initialCapacity / ssize;
// 假如有餘數,則Segment數量加1
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment s0 =
new Segment(loadFactor, (int)(cap * loadFactor),
(HashEntry[])new HashEntry[cap]);
Segment[] ss = (Segment[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

當用 new ConcurrentHashMap() 無參構造函數進行初始化的,那麼初始化完成後:

  • Segment 數組長度為 16,不可以擴容
  • Segment[i] 的默認大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
  • 這裡初始化了 segment[0],其他位置還是 null,至於為什麼要初始化 segment[0],後面的代碼會介紹
  • 當前 segmentShift 的值為 32 – 4 = 28,segmentMask 為 16 – 1 = 15,姑且把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到

2.2 put過程分析

根據 hash 值很快就能找到相應的 Segment,之後就是 Segment 內部的 put 操作。

public V put(K key, V value) {
Segment s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// 根據 hash 值找到 Segment 數組中的位置 j
// hash 是 32 位,無符號右移 segmentShift(28) 位,剩下低 4 位,
// 然後和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的最後 4 位,也就是槽的數組下標
int j = (hash >>> segmentShift) & segmentMask;
// 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null,
// ensureSegment(j) 對 segment[j] 進行初始化
if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 插入新值到 槽 s 中
return s.put(key, hash, value, false);
}

Segment 內部是由 數組+鏈表 組成的。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 先獲取該 segment 的獨佔鎖
// 每一個Segment進行put時,都會加鎖
HashEntry node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// segment 內部的數組
HashEntry[] tab = table;
// 利用 hash 值,求應該放置的數組下標
int index = (tab.length - 1) & hash;
// 數組該位置處的鏈表的表頭
HashEntry first = entryAt(tab, index);
for (HashEntry e = first;;) {
// 如果鏈頭不為 null
if (e != null) {
K k;
//如果在該鏈中找到相同的key,則用新值替換舊值,並退出循環
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
//如果沒有和key相同的,一直遍歷到鏈尾,鏈尾的next為null,進入到else
e = e.next;
}
else {
// node 到底是不是 null,這個要看獲取鎖的過程,不過和這裡都沒有關係。

// 如果不為 null,那就直接將它設置為鏈表表頭;如果是null,初始化並設置為鏈表表頭。
if (node != null)
node.setNext(first);
else
node = new HashEntry(hash, key, value, first);
int c = count + 1;
// 如果超過了該 segment 的閾值,這個 segment 需要擴容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// 沒有達到閾值,將 node 放到數組 tab 的 index 位置,
// 其實就是將新的節點設置成原鏈表的表頭
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解鎖
unlock();
}
return oldValue;
}

2.3 初始化Segment

ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其他槽來說,在插入第一個值的時候進行初始化。

這裡需要考慮併發,因為很可能會有多個線程同時進來初始化同一個槽 segment[k],不過只要有一個成功了就可以。

private Segment ensureSegment(int k) {
final Segment[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment seg;
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 這裡看到為什麼之前要初始化 segment[0] 了,
// 使用當前 segment[0] 處的數組長度和負載因子來初始化 segment[k]
// 為什麼要用“當前”,因為 segment[0] 可能早就擴容過了
Segment proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
// 初始化 segment[k] 內部的數組
HashEntry[] tab = (HashEntry[])new HashEntry[cap];
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck Segment[k] 是否被其它線程初始化了
Segment s = new Segment(lf, threshold, tab);
// 使用 while 循環,內部用 CAS,當前線程成功設值或其他線程成功設值後,退出
while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

2.4 get過程分析

比較簡單,先找到 Segment 數組的位置,然後找到 HashEntry 數組的位置,最後順著鏈表查找即可。

public V get(Object key) {
Segment s; // manually integrate access methods to reduce overhead
HashEntry[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

3 線程不安全

3.1 哈希碰撞

多個線程同時使用 put() 方法添加元素,若存在兩個或多個 put() 的 key 發生了碰撞,那麼有可能其中一個線程的數據被覆蓋。

3.2 擴容

當數據要插入 HashMap 時,都會檢查容量有沒有超過設定的 thredhold,如果超過,則需要擴容。而多線程會導致擴容後的鏈表形成環形數據結構,一旦形成環形數據結構,Entry 的 next 的節點永遠不為 null,就會在獲取 Entry 時產生死循環。

例子可見文章《HashMap多線程死循環問題》。

不過要注意,其使用的 Java 版本既不是 7,也不是 8。在 Java7 中方法 addEntry() 添加節點到鏈表中是先擴容後再添加,而例子中的源碼是:

void addEntry(int hash, K key, V value, int bucketIndex) {
Entry e = table[bucketIndex];
// 先添加節點
table[bucketIndex] = new Entry(hash, key, value, e);

// 然後擴容
if (size++ >= threshold)
resize(2 * table.length);
}

所以在 Java7 中此例子無效。而在 Java8 中,通過確保建新鏈與舊鏈的順序是相同的,即可避免產生死循環。

4 HashMap遍歷方式

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class HashMapTest {
private final static Map map = new HashMap(10000);
private static final Object PRESENT = new Object();
public static void main(String args[]) {
long startTime;
long endTime;
long totalTime;
for (int i = 0; i < 7500; i++) {
map.put(i, PRESENT);
}
// 方法一
startTime = System.nanoTime();
Iterator iter1 = map.entrySet().iterator();
while (iter1.hasNext()) {
Map.Entry entry = (Map.Entry) iter1.next();
Integer key = entry.getKey();
Object val = entry.getValue();
}
endTime = System.nanoTime();
totalTime = endTime - startTime;
System.out.println("methor1 pays " + totalTime + " ms");

// 方法二
startTime = System.nanoTime();
Iterator iter2 = map.keySet().iterator();
while (iter2.hasNext()) {
Object key = iter2.next();
Object val = map.get(key);
}
endTime = System.nanoTime();
totalTime = endTime - startTime;
System.out.println("methor2 pays " + totalTime + " ms");
}
}

運行結果:

深入剖析 Java7 中的 HashMap 和 ConcurrentHashMap

5 性能對比

線程安全的使用 HashMap 有三種方式,分別為 Hashtable、SynchronizedMap()、ConcurrentHashMap。

Hashtable

使用 synchronized 來保證線程安全,幾乎所有的 public 的方法都是 synchronized 的,而有些方法也是在內部通過 synchronized 代碼塊來實現。

synchronizedMap()

通過創建一個線程安全的 Map 對象,並把它作為一個封裝的對象來返回。

ConcurrentHashMap

支持多線程對 Map 做讀操作,並且不需要任何的 blocking 。這得益於 CHM 將 Map 分割成了不同的部分,在執行更新操作時只鎖住一部分。根據默認的併發級別, Map 被分割成 16 個部分,並且由不同的鎖控制。這意味著,同時最多可以有 16個 寫線程操作 Map 。試想一下,由只能一個線程進入變成同時可由 16 個寫線程同時進入(讀線程幾乎不受限制),性能的提升是顯而易見的。但由於一些更新操作,如 put(), remove(), putAll(), clear()只鎖住操作的部分,所以在檢索操作不能保證返回的是最新的結果。

在迭代遍歷 CHM 時, keySet 返回的 iterator 是弱一致和 fail-safe 的,可能不會返回某些最近的改變,並且在遍歷過程中,如果已經遍歷的數組上的內容變化了,不會拋出 ConcurrentModificationExceptoin 的異常。

什麼時候使用 ConcurrentHashMap ?

CHM 適用於讀者數量超過寫者時,當寫者數量大於等於讀者時,CHM 的性能是低於 Hashtable 和 synchronizedMap 的。這是因為當鎖住了整個 Map 時,讀操作要等待對同一部分執行寫操作的線程結束。

CHM 適用於做 cache ,在程序啟動時初始化,之後可以被多個請求線程訪問。

CHM 是Hashtable一個很好的替代,但要記住, CHM 的比 Hashrable 的同步性稍弱。

6 拓展:Java8 HashMap & ConcurrentHashMap

Java8 對 HashMap 和 ConcurrentHashMap 做了一些修改:

  • 二者均利用了紅黑樹,所以其數據結構由 數組 + 鏈表 + 紅黑樹 組成。我們知道,鏈表上的數據需要一個一個比較下去才能找到我們需要的,時間複雜度取決於鏈表的長度,為 O(n)。為了降低這一部分的開銷,在 Java8 中,當鏈表中的元素超過了 8 個以後,會將鏈表轉換為紅黑樹,這個時候時間複雜度就降為了 O(logN)
  • Java8 中 ConcurrentHashMap 摒棄 Java7 中的 Segment 的概念,使用了另一種方式實現保證線程安全。


分享到:


相關文章: