源碼角度分析-newFixedThreadPool線程池導致的內存飆升問題

原文鏈接:https://juejin.im/post/5d6a57eee51d4561e721df30#heading-0

前言

使用無界隊列的線程池會導致內存飆升嗎?面試官經常會問這個問題,本文將基於源碼,去分析newFixedThreadPool線程池導致的內存飆升問題,希望能加深大家的理解。

(想自學習編程的小夥伴請搜索圈T社區,更多行業相關資訊更有行業相關免費視頻教程。完全免費哦!)

內存飆升問題復現

實例代碼

ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executor.execute(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
//do nothing
}
});
}

配置Jvm參數

IDE指定JVM參數:-Xmx8m -Xms8m :

源碼角度分析-newFixedThreadPool線程池導致的內存飆升問題

執行結果

run以上代碼,會拋出OOM:

源碼角度分析-newFixedThreadPool線程池導致的內存飆升問題

JVM OOM問題一般是創建太多對象,同時GC 垃圾來不及回收導致的,那麼什麼原因導致線程池的OOM呢?帶著發現新大陸的心情,我們從源碼角度分析這個問題,去找找實例代碼中哪裡創了太多對象。

線程池源碼分析

以上的實例代碼,就一個newFixedThreadPool和一個execute方法。首先,我們先來看一下newFixedThreadPool方法的源碼

newFixedThreadPool源碼

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<runnable>());
}
/<runnable>

該段源碼以及結合線程池特點,我們可以知道newFixedThreadPool

  • 核心線程數coreSize和最大線程數maximumPoolSize大小一樣,都是nThreads。
  • 空閒時間為0,即keepAliveTime為0
  • 阻塞隊列為無參構造的LinkedBlockingQueue

線程池特點了解不是很清楚的朋友,可以看我這篇文章,面試必備:Java線程池解析

接下來,我們再來看看線程池執行方法execute的源碼。

線程池執行方法execute的源碼

execute的源碼以及相關解釋如下:

 public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //步驟一:判斷當前正在工作的線程是否比核心線程數量小
if (addWorker(command, true)) // 以核心線程的身份,添加到工作集合
return;
c = ctl.get();
}
//步驟二:不滿足步驟一,線程池還在RUNNING狀態,阻塞隊列也沒滿的情況下,把執行任務添加到阻塞隊列workQueue。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//來個double check ,檢查線程池是否突然被關閉
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//步驟三:如果阻塞隊列也滿了,執行任務以非核心線程的身份,添加到工作集合

else if (!addWorker(command, false))
reject(command);
}

縱觀以上代碼,我們可以發現就addWorker 以及workQueue.offer(command) 可能在創建對象。那我們先分析addWorker方法。

addWorker源碼分析

addWorker源碼以及相關解釋如下

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//獲取當前線程池的狀態
int rs = runStateOf(c);
//如果線程池狀態是STOP,TIDYING,TERMINATED狀態的話,則會返回false。
// 如果現在狀態是SHUTDOWN,但是firstTask不為空或者workQueue為空的話,那麼直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//自旋
for (;;) {
//獲取當前工作線程的數量
int wc = workerCountOf(c);
//判斷線程數量是否符合要求,如果要創建的是核心工作線程,判斷當前工作線程數量是否已經超過coreSize,
// 如果要創建的是非核心線程,判斷當前工作線程數量是否超過maximumPoolSize,是的話就返回false

if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//如果線程數量符合要求,就通過CAS算法,將WorkerCount加1,成功就跳出retry自旋
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
retry inner loop
}
}
//線程啟動標誌
boolean workerStarted = false;
//線程添加進集合workers標誌
boolean workerAdded = false;
Worker w = null;
try {
//由(Runnable 構造Worker對象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//獲取線程池的重入鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//獲取線程池狀態
int rs = runStateOf(ctl.get());
//如果狀態滿足,將Worker對象添加到workers集合
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//啟動Worker中的線程開始執行任務

if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//線程啟動失敗,執行addWorkerFailed方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

addWorker執行流程

大概就是判斷線程池狀態是否OK,如果OK,在判斷當前工作中的線程數量是否滿足(小於coreSize/maximumPoolSize),如果不滿足,不添加,如果滿足,就將執行任務添加到工作集合workers,,並啟動執行該線程。

再看一下workers的類型:

/**

  • Set containing all worker threads in pool. Accessed only when
  • holding mainLock.
  • */
  • private final HashSet workers = new HashSet();

workers是一個HashSet集合,它由coreSize/maximumPoolSize控制著,那麼addWorker方法會導致OOM?結合實例代碼demo,coreSize=maximumPoolSize=10,如果超過10,不會再添加到workers了,所以它不是導致newFixedThreadPool內存飆升的原因。那麼,問題應該就在於workQueue.offer(command) 方法了。為了讓整個流程清晰,我們畫一下execute執行的流程圖。

線程池執行方法execute的流程

根據以上execute以及addWork源碼分析,我們把流程圖畫出來:

源碼角度分析-newFixedThreadPool線程池導致的內存飆升問題

  • 提交一個任務command,線程池裡存活的核心線程數小於線程數corePoolSize時,調用addWorker方法,線程池會創建一個核心線程去處理提交的任務。
  • 如果線程池核心線程數已滿,即線程數已經等於corePoolSize,一個新提交的任務,會被放進任務隊列workQueue排隊等待執行。
  • 當線程池裡面存活的線程數已經等於corePoolSize了,並且任務隊列workQueue也滿,判斷線程數是否達到maximumPoolSize,即最大線程數是否已滿,如果沒到達,創建一個非核心線程執行提交的任務。
  • 如果當前的線程數達到了maximumPoolSize,還有新的任務過來的話,直接採用拒絕策略處理 。

看完execute的執行流程,我猜測,內存飆升問題就是workQueue塞滿了。接下來,進行阻塞隊列源碼分析,揭開內存飆升問題的神秘面紗。

阻塞隊列源碼分析

源碼角度分析-newFixedThreadPool線程池導致的內存飆升問題

回到newFixedThreadPool構造函數,發現阻塞隊列就是LinkedBlockingQueue,而且是個無參的LinkedBlockingQueue隊列。OK,那我們直接分析LinkedBlockingQueue源碼。

LinkedBlockingQueue類圖

源碼角度分析-newFixedThreadPool線程池導致的內存飆升問題

由類圖可以看到:

  • LinkedBlockingQueue 是使用單向鏈表實現的,其有兩個 Node,分別用來存放首、尾節點, 並且還有一個初始值為 0 的原子變量 count,用來記錄 隊列元素個數。
  • 另外還有兩個 ReentrantLock 的實例,分別用來控制元素入隊和出隊的原 子性,其中 takeLock 用來控制同時只有一個線程可以從隊列頭獲取元素,其他線程必須 等待, putLock 控制同時只能有一個線程可以獲取鎖,在隊列尾部添加元素,其他線程必 須等待。
  • 另外, notEmpty 和 notFull 是條件變量,它們內部都有一個條件隊列用來存放進 隊和出隊時被阻塞的線程,其實這是生產者一消費者模型。

LinkedBlockingQueue無參構造函數

public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node(null);
}

LinkedBlockingQueue無參構造函數,默認構造Integer.MAX_VALUE(那麼大) 的鏈表,看到這裡,你回想一下execute流程,是不是阻塞隊列一直不會滿了,這隊列來者不拒,把所有阻塞任務收於麾下。。。是不是內存飆升問題水落石出啦。

LinkedBlockingQueue的offer函數

源碼角度分析-newFixedThreadPool線程池導致的內存飆升問題

線程池中,插入隊列用了offer方法,我們來看一下阻塞隊列LinkedBlockingQueue的offer騷操作吧

public boolean offer(E e) {
//為空元素則拋出空指針異常
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如採當前隊列滿則丟棄將要放入的元素, 然後返回false
if (count.get() == capacity)
return false;
int c = -1;
//構造新節點,獲取putLock獨佔鎖
Node
node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//如採隊列不滿則進隊列,並遞增元素計數
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
//新元素入隊後隊列還有空閒空間,則
喚醒 notFull 的條件隊列中一條阻塞線程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//釋放鎖
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}

offer操作向隊列尾部插入一個元素,如果隊列中有空閒則插入成功後返回 true,如果隊列己滿 則丟棄當前元素然後返回 false。 如果 e 元素為 null 則拋出 Nul!PointerException 異常。另外, 該方法是非阻塞的。

內存飆升問題結果揭曉

newFixedThreadPool線程池的核心線程數是固定的,它使用了近乎於無界的LinkedBlockingQueue阻塞隊列。當核心線程用完後,任務會入隊到阻塞隊列,如果任務執行的時間比較長,沒有釋放,會導致越來越多的任務堆積到阻塞隊列,最後導致機器的內存使用不停的飆升,造成JVM OOM。


分享到:


相關文章: