Java併發系列:透過源碼徹底理解ThreadLocal

Java併發系列:透過源碼徹底理解ThreadLocal

ThreadLocal

經常會在資料上看到對ThreadLocal的描述:

  • "是一個變量的本地副本,為每一個線程提供一個變量副本,互相不影響"
  • "避免了共享變量的衝突"
  • "解決多線程的併發訪問的一種方式"

個人覺得這樣的描述很具有迷惑性,"副本"那主本是什麼?它根本就不是解決"併發訪問"問題的好吧,ThreadLocal中的變量根本就沒有共享,哪來的"併發訪問"?

更確切的定義:

ThreadLocal是線程執行時的上下文,用於存放線程局部變量。

ThreadLocal 涉及的三個類:

  • ThreadLocalMap 是存放局部變量的容器
  • Thread中通過變量ThreadLocal.ThreadLocalMap threadLocals來持有ThreadLocalMap的實例
  • ThreadLocal則是ThreadLocalMap的manager,控制著ThreadLocalMap的創建、存取、刪除等工作。

ThreadLocalMap

ThreadLocalMap和Map接口沒有關係,它是使用數組來存儲變量的:private Entry[] table,table的初始容量是16,當table的實際長度大於容量時進行成倍擴容,所以table的容量始終是2的冪。

Entry

Entry使用ThreadLocal對象作為鍵,注意不是使用線程(Thread)對象作為鍵。

WeakReference表示一個對象的弱引用,java將對象的引用按照強弱等級分為四種:

  • 強引用:"Person p = new Person()"代表一個強引用,只要p存在,GC不會回收Person對象。
  • 軟引用:SoftReference代表一個軟引用,在內存不足將要發生內存溢出時,GC會回收軟引用對象。
  • 弱引用:WeakReference代表一個弱引用,其生命週期在下次垃圾回收之前,不管內存足不足,都會被GC回收。
  • 虛引用:PhantomReference代表一個虛引用,無法通過虛引用獲取引用對象的值,也被稱為"幽靈引用",它的意義就是檢查對象是否被回收。

關於弱引用的一個小栗子:

import java.lang.ref.WeakReference;
public class WeakReferenceTest {
public static void main(String[] args) {
Object o = new Object();
WeakReference<object> wof = new WeakReference<object>(new Object());
System.out.println(wof.get()==null);//false
System.out.println(o==null);//false
System.gc();// 通知系統GC
System.out.println(wof.get()==null);//true
System.out.println(o==null);//false
}
}
/<object>/<object>

Entry定義成弱引用的目的是確保沒有了ThreadLocal對象的強引用時,能釋放ThreadLocalMap中的變量內存。

// 定義ThreadLocal
public static final ThreadLocal<session> sessions = new ThreadLocal<session>();
在某個時刻:
sessions = null;
說明已不使用sessions了,應該釋放ThreadLocalMap中的變量內存。
/<session>/<session>

因為ThreadLocalMap是隱藏在內部的,程序員不可見,所以必須要有一個機制能釋放ThreadLocalMap對象中的變量內存。

Java併發系列:透過源碼徹底理解ThreadLocal

存入

邏輯:

  • s1:從當前線程中拿出ThreadLocalMap,如果為空進行創建create,不為空進行值存入轉入s2
  • s2:使用ThreadLocal實例作為key和value調用ThreadLocalMap的set方法
  • s3:使用ThreadLocal實例的threadLocalHashCode與table的容量取模,計算值要放入table中的座標
  • s4:使用這個座標線性向後探測table,如果發現相同的key則更新值,返回。如果發現實現的key(ThreadLocal實例被GC回收,因為它是WeakReference,所以key為空),轉入s5,未發現相同的key和實現的key,轉入s6
  • s5:調用replaceStaleEntry方法清理talbe中key失效的entry,在清理過程中發現相同的key進行值更新,否則新建Entry插入table(座標為staleSlot),返回。
  • s6:新建一個Entry插入talbe(座標為i),使用i和自增後的長度sz調用cleanSomeSlots做table的連續段清理,轉入s7
  • s7:清理之後發現table長度大於等於擴容閾值threshold,進行table擴容

源碼:

public void set(T value) {
Thread t = Thread.currentThread();
// getMap(t):t.threadLocals,s1 從當前線程中拿出ThreadLocalMap

ThreadLocalMap map = getMap(t);
if (map != null)// map 不為空set
map.set(this, value); // s2 ThreadLocal的實例(this)作為key
else // map為空create
createMap(t, value);
}
private void set(ThreadLocal key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);// s3 hash計算出table座標
// 線性探測
for (Entry e = tab[i];e != null;e = tab[i = nextIndex(i, len)]) {
ThreadLocal k = e.get();
// 找到對應的entry
if (k == key) {
e.value = value;// 更新值
return;
}
// 替換失效的entry
if (k == null) {
replaceStaleEntry(key, value, i);//清理
return;
}
}
tab[i] = new Entry(key, value);// 插入新值
int sz = ++size; // 長度增加
// table連續清理,並判斷是否擴容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();// 擴容table,並重新hash
}

擴容

邏輯:

  • s1:進行一次全量的清理(清理對應ThreadLocal已經被GC回收的entry)
  • s2:因為進行了一次清理,所以talbe的長度會變小,改變擴容的閾值,由原來的2/3改為1/2,如果table長度大於等於閾值,擴容轉入s3
  • s3:新建一個數組,容量是table容量的2倍,數組拷貝,首先使用hash算法生成entry在新數組中的座標,如果發生碰撞,使用線性探測重新確定座標

代碼:

private void rehash() {
expungeStaleEntries(); // s1 做一次全量清理
// s2 size很可能會變小調低閾值來判斷是否需要擴容
if (size >= threshold - threshold / 4)
resize();// 擴容
}
private void resize() { // s3
Entry[] oldTab = table;
int oldLen = oldTab.length;// 原來的容量
int newLen = oldLen * 2; // 擴容兩倍
Entry[] newTab = new Entry[newLen];// 新數組
int count = 0;
for (int j = 0; j < oldLen; ++j) {// 拷貝
Entry e = oldTab[j];
if (e != null) {
ThreadLocal k = e.get();
if (k == null) { // key失效,被回收
e.value = null; // 幫助GC
} else {
// Hash獲取元素的座標
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen); // 線性探測 獲取座標
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);// 設置新的擴容閾值
size = count;
table = newTab;

}

魔數

HASH_INCREMENT = 0x61c88647這個數字和斐波那契散列有關(數學問題感興趣可以深入研究),通過這個數字可以得到均勻的散列碼。

一個小栗子:

public class Hash {
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
public static void main(String[] args) {
int length = 32;
for(int i=0;i System.out.println(nextHashCode()&(length-1));
}
}
}

會發現生成的散列碼非常均勻,如果把length改為31就會發現得到的散列碼不那麼均勻了。

length-1的二進制表示就是低位連續的N個1,nextHashCode()&(length-1)的值就是nextHashCode()的低N位, 這樣就能均勻的產生均勻的分佈,這是為什麼ThreadLocalMap中talbe的容量必須為2的冪的原因。

取值

邏輯:

  • s1:從當前線程中拿出ThreadLocalMap,如果為空進行初始化設置setInitialValue,不為空,使用ThreadLocal實例作為key從ThreadLocalMap中取值,轉入s2
  • s2:使用hash算法生成key對應entry在table中的座標i,如果table[i]對應的entry不為空且key未失效,說明命中直接返回,否則轉入s3
  • s3:線性探測table,如果發現相同的key,返回。如果發現失效的key,調用expungeStaleEntry清理talbe,探測完畢返回null

源碼:

public T get() {
Thread t = Thread.currentThread();// 當前線程
ThreadLocalMap map = getMap(t);// 拿出ThreadLocalMap
if (map != null) { // s1 ThreadLocalMap不為空
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null)
return (T)e.value;
}
return setInitialValue();// ThreadLocalMap為空
}
private Entry getEntry(ThreadLocal key) {
int i = key.threadLocalHashCode & (table.length - 1); // hash座標
Entry e = table[i];
if (e != null && e.get() == key) // s2 key有效,命中返回
return e;
else
return getEntryAfterMiss(key, i, e); // 線性探測,繼續查找
}
private Entry getEntryAfterMiss(ThreadLocal key, int i, Entry e) { // s3
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal k = e.get();
if (k == key) // 找到目標
return e;
if (k == null) // entry對應的ThreadLocal已經被回收,清理無效entry
expungeStaleEntry(i);
else
i = nextIndex(i, len); // 往後走
e = tab[i];

}
return null;
}

刪除

源碼:

public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());// 從當前線程拿出ThreadLocalMap
if (m != null)
m.remove(this);// 刪除,key為ThreadLocal實例
}
private void remove(ThreadLocal key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);// hash定位
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();// 斷開弱引用
expungeStaleEntry(i);// 從i開始,進行段清理
return;
}
}
}

內存洩露

通過上文可以看到ThreadLocal為應對內存洩露做的工作:

  • 將Entry定義成弱引用,如果ThreadLocal實例不存在強引用了那麼Entry的key就會失效
  • get()、set()方法都進行失效key的清理

即便是這樣也不能保證萬無一失:

  • 通常情況下為了使線程可以共用ThreadLocal,會這樣定義:static final ThreadLocal threadLocal,這樣static變量的生命週期是隨class一起的,所以它永遠不會被GC回收,這個強引用在key就不會失效。
  • 不使用static定義threadLocal,由於有一條強引用鏈:Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value的存在,在長生命週期的線程中(比如線程池)也有內存洩露的風險。短生命週期的線程則無所謂,因為隨著線程生命週期的結束,一切都煙消雲散。

當然這並不可怕,只要在使用完threadLocal後調用下remove()方法,清除數據,就可以了。

小結

1:ThreadLocal是線程執行時的上下文,用於存放線程局部變量。它不能解決併發情況下數據共享的問題

2:ThreadLocal是以ThreadLocal對象本身作為key的,不是線程(Thread)對象

3:ThreadLocal存在內存洩露的風險,要養成用完即刪的習慣

4:ThreadLocal使用散列定位數據存儲座標,如果發生碰撞,使用線性探測重新定位,這在高併發場景下會影響一點性能。改善方法如netty的FastThreadLocal,使用固定座標,以空間換時間,後面會分析FastThreadLocal實現。

Java併發系列:透過源碼徹底理解ThreadLocal


分享到:


相關文章: