《Java核心技術系列二》ThreadPoolExecutor 使用注意事項

該系列統一使用java8的源碼進行講解

上一篇中對ThreadPoolExecutor的源碼以及工作原理進行了講解。今天來講解一下在使用的過程中我們應該注意哪些問題

一. 參數衝突問題

在上一篇文章中,我們對ThreadPoolExecutor的構造函數的參數進行了講解。其中

  1. 當corePoolSize==maximumPoolSize,且核心線程數不允許超時時,設置keepAliveTime與unit是沒有意義的(因為沒有需要超時的Worker)
  2. 當workQueue為無界隊列時,其永遠也放不滿,這種情況maximumPoolSize就不會生效

二. Exectors 提供的三種線程池能不能用

2.1. 固定大小的線程池
<code>    public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<runnable>());
}
/<runnable>/<code>

看實現可知,線程數固定,且採用的是無界阻塞隊列。當任務提交的速度快Worker消耗任務的速度時,任務會在workQueue中不斷擠壓,最終撐垮整個進程的內存空間,造成OOM,因此線上謹慎使用

2.2 單個線程的線程池
<code>    public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<runnable>()));
}
/<runnable>/<code>

與1相比都是固定大小的線程池,唯一不同的是該線程數為1,存在於1一樣的問題。

2.3 Cached線程池
<code>    public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<runnable>());
}
/<runnable>/<code>

與固定大小的線程池正好相反,核心線程數為0,阻塞隊列容量為1,有任務就開闢新的Worker執行,空閒時Worker會自動銷燬。如果任務提交的任務非常的多,應用進程達到開闢線程的最大上限開闢不了線程拋出OOM,或者因為線程佔用太多的內存空間,導致內存被耗盡拋出OOM(一個線程棧的默認大小為1m),或者因為線程太多,線程上下文切換的開銷不可忽略,導致任務執行緩存從而拖垮整個應用進程。

綜上:上面三種線程池都不推薦使用,而是應該由我們自己制定線程池參數,保證線程個數與任務隊列的長度都在可控的範圍內,而不是任由其無限制增長,在線上應用中,是不推薦存在這種不可控的情況出現的。

資源都可控制了,那就延伸出另一種問題,達到了線程池的最大處理能力了怎麼辦?

三. 拒絕策略 RejectedExecutionHandler

當線程池達到自己最大處理能力時(任務隊列滿,每個Worker都在不停工作),再添加新任務時就會觸發拒絕策略。JDK提供了4中拒絕策略

3.1. AbortPolicy 默認

直接拒絕,就是會拋出異常

<code>public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
/<code>

注意是誰拋出異常,是執行提交任務的線程拋出異常。如果提交任務的線程正在循環的處理一批數據,且沒有對異常進行捕獲處理,那麼在執行拒絕策略拋出異常後,後面的任務都不會被執行。如果提交任務的線程是一個用戶請求線程,且業務中沒有捕獲該異常進行處理,用戶請求會因為該異常而請求失敗。並且這是默認的拒絕策略,初學者很容易掉進這個坑

3.2 DiscardPolicy

直接丟棄,實現就是什麼都不做

<code>public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
/<code>

針對不能對任務的場景是不能使用的

3.3 DiscardOldestPolicy

丟棄最老的一個任務

<code>       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
/<code>

先從workQueue中彈出一個任務,然後再提交任務。同樣會丟失數據,在不允許丟數據的場景不能使用

3.4 CallerRunsPolicy

調用者線程執行,誰提交任務,誰執行

<code>        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
/<code>

通過直接調用任務run方法的方式自己執行,雖然會延長線程的執行時間,但是這樣不會丟失數據

四. 線程池如何關閉

4.1 shutdown

將線程池的狀態修改為SHUTDOWN狀態,在該狀態不允許提交新任務

<code>    public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改線程池的狀態
advanceRunState(SHUTDOWN);
//終端空閒的worker
interruptIdleWorkers();
//空方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
/<code>

首先將線程池狀態修改為shutdown狀態,然後中斷所有空閒的Worker。為什麼這裡需要中斷線程呢,因為調用線程的interrupt方法,能夠讓阻塞在getTask中的take方法與poll方法上的Worker喚醒,喚醒之後,重新檢查線程池的狀態為shutdown狀態,getTash方法就會返回null,從而Thread的run方法,線程銷燬。最後調用tryTerminate嘗試終止,只要有任務沒有執行完成,有Worker在運行就不會轉為terminate狀態。看下tryTerimate的源碼:

<code>    final void tryTerminate() {
for (;;) {

int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// stop 狀態 或者shutdown狀態,且workQueue沒有內容
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// stop狀態 或者shutdown狀態,worker都已經銷燬
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//將狀態修改為tidying狀態
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//將狀態修改為terminate狀態
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
/<code>
  1. 首先判斷狀態,如果是running, tidying,terminate或者shutdown狀態且workQueue不為空,就直接返回。總結就是:只有狀態為stop 或者 shutdown且workQueue為空了,才能往下走。
  2. 如果Worker還存在,則中斷一個空閒的worker,然後返回。
  3. 將狀態修改為tidying狀態,接著修改為terminate狀態,在ThreadPoolExecutor中由於terminated方法時空方法,所以這兩個狀態可以認為是一樣的。

什麼樣的狀態才能轉化為tidying狀態, 首先前提是Worker都關閉了,其次是 stop,或者 shutdown且隊列中沒有任務了

4.2 shutdownNow

不同於shutdown的是,這裡將狀態修改為stop狀態

<code>    public List<runnable> shutdownNow() {
List<runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
/<runnable>/<runnable>/<code>

首先將線程池狀態修改為stop狀態,然後喚醒所有的worker,然後調用tryTerminate方法,該方法已經講過了。shutdownNow方法會返回阻塞隊列中所有的任務。這些任務都沒有被執行。

首先我們瞭解了shutdown與shutdownNow兩個方法一個將狀態修改為shutdown,一個將狀態修改為stop,一個沒有返回值,一直返回了workQueue中的任務列表。為了能夠更加清晰的認清這兩個方法,我們還是有必要再看一次getTask方法對於shutdown狀態與stop狀態有什麼不同的處理操作。

<code>// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
/<code>

如果是shutdown狀態,且任務隊列中有任務,則繼續執行。如果是stop狀態,直接返回null,銷燬worker。總結:兩個方法修改任務狀態後都不允許再提交新的任務,shutdown方法將線程池的狀態修改為shutdown狀態,會等待所有已經提交的任務全部執行完成然後各個worker自動退出,shutdownNow不會執行完成所有的任務,但是會將沒有執行完成的任務列表返回,其在getTask中會直接返回null也會引導各個worker自動退出。最終Worker全部銷燬,整個線程池關閉

4.3 線程池都是事後自行退出,那是誰在修改線程池的狀態到最終terminate狀態?

這個問題的答案隱藏在Worker的run方法中,run方法調用了runWorker方法,在runWorker方法中,退出循環後會執行finnaly語句,在這裡會調用processWorkerExit方法,而在這個方法中會調用tryTerminate方法,也就是說每有一個Worker執行完成,都會去執行下tryTerminate方法,嘗試將線程池的狀態修改為terminate狀態。代碼如下:

<code>private void processWorkerExit(Worker w, boolean completedAbruptly) {
... 省略
tryTerminate();
... 省略

}
/<code>

在我們調用shutdown或者shutdownNow方法後,可以調用awaitTerminate方法,等待線程池關閉。

總結

針對ThreadPoolExecutor的使用事項,就先總結到這,大家如果有好的意見也可以提出來,我們一起探討。


分享到:


相關文章: