線程池的實現原理與源碼分析

什麼是線程池

在 Java 中,如果每個請求到達就創建一個新線程,創建和銷燬線程花費的時間和消耗的系統資源都相當大,甚至可能要比在處理實際的用戶請求的時間和資源要多的多。如果在一個 Jvm 裡創建太多的線程,可能會使系統由於過度消耗內存或“切換過度”而導致系統資源不足為了解決這個問題,就有了線程池的概念,線程池的核心邏輯是提前創建好若干個線程放在一個容器中。如果有任務需要處理,則將任務直接分配給線程池中的線程來執行就行,任務處理完以後這個線程不會被銷燬,而是等待後續分配任務。同時通過線程池來重複管理線程還可以避免創建大量線程增加開銷。

線程池的優勢

1. 降低創建線程和銷燬線程的性能開銷

2. 提高響應速度,當有新任務需要執行是不需要等待線程創建就可以立馬執行

3. 合理的設置線程池大小可以避免因為線程數超過硬件資源瓶頸帶來的問題

線程池的構造參數

創建線程池需要使用 ThreadPoolExecutor 類,它的構造函數參數如下:

線程池的實現原理與源碼分析

線程池的創建方式

newFixedThreadPool:

線程池的實現原理與源碼分析

FixedThreadPool 的核心線程數和最大線程數都是指定值,也就是說當線程池中的線程數超過核心線程數後,任務都會被放到阻塞隊列中。另外 keepAliveTime 為 0,也就是超出核心線程數量以外的線程空餘存活時間,而這裡選用的阻塞隊列是LinkedBlockingQueue,使用的是默認容量 Integer.MAX_VALUE,相當於沒有上限這個線程池執行任務的流程如下:

1. 線程數少於核心線程數,也就是設置的線程數時,新建線程執行任務

2. 線程數等於核心線程數後,將任務加入阻塞隊列

3. 由於隊列容量非常大,可以一直添加

4. 執行完任務的線程反覆去隊列中取任務執行

用途:FixedThreadPool 用於負載比較大的服務器,為了資源的合理利用,需要限制當前線程數量

newCachedThreadPool:

線程池的實現原理與源碼分析

CachedThreadPool 創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閒線程,若無可回收,則新建線程; 並且沒有核心線程,非核心線程數無上限,但是每個空閒的時間只有 60 秒,超過後就會被回收。它的執行流程如下:

1. 沒有核心線程,直接向 SynchronousQueue 中提交任務

2. 如果有空閒線程,就去取出任務執行;如果沒有空閒線程,就新建一個

3. 執行完任務的線程有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就被回收

newScheduledThreadPool: 創建一個可以指定線程的數量的線程池,但是這個線程池還帶有延遲和週期性執行 任務的功能,類似定時器。

newSingleThreadExecutor: 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行

線程池的實現原理分析

線程池初始化時是沒有創建線程的,線程池裡的線程的初始化與其他線程一樣,但是在完成任務以後,該線程不會自行銷燬,而是以掛起的狀態返回到線程池。直到應用程序再次向線程池發出請求時,線程池裡掛起的線程就會再度激活執行任務。這樣既節省了建立線程所造成的性能損耗,也可以讓多個任務反覆重用同一線程,從而在應用程序生存期內節約大量開銷。

執行流程

線程池的實現原理與源碼分析

源碼分析

execute方法是提交任務的入口

線程池的實現原理與源碼分析

ctl 的作用

在線程池中,ctl 貫穿在線程池的整個生命週期中

線程池的實現原理與源碼分析

它是一個原子類,主要作用是用來保存線程數量和線程池的狀態。我們來分析一下這段代碼,其實比較有意思,他用到了位運算,一個 int 數值是 32 個 bit 位,這裡採用高 3 位來保存運行狀態,低 29 位來保存線程數量。

我們來分析默認情況下,也就是 ctlOf(RUNNING)運行狀態,調用了 ctlOf(int rs,int wc)方法;

private static int ctlOf(int rs, int wc) { return rs | wc; }

其中 RUNNING =-1 << COUNT_BITS ; -1 左移 29 位. -1 的二進制是 32 個 1(1111 1111 11111111 1111 1111 1111 1111)

-1 的二進制計算方法:

原碼是 1000…001, 高位 1 表示符號位。

然後對原碼取反,高位不變得到 1111…110

然後對反碼進行+1 ,也就是補碼操作, 最後得到 1111…1111

那麼-1 <

狀態轉化

線程池的實現原理與源碼分析

addWorker

如果工作線程數小於核心線程數的話,會調用 addWorker,顧名思義,其實就是要創建一個工作線程。我們來看看源碼的實現,源碼比較長,看起來比較唬人,其實就做了兩件事。

  • 用循環 CAS 操作來將線程數加 1;
  • 新建一個線程並啟用。
線程池的實現原理與源碼分析

Worker 類說明

我們發現 addWorker 方法只是構造了一個 Worker,並且把 firstTask 封裝到 worker 中,它是做什麼的呢?我們來看看

1. 每個 worker,都是一條線程,同時裡面包含了一個 firstTask,即初始化時要被首先執行的任務.

2. 最終執行任務的,是 runWorker()方法

Worker 類繼承了 AQS,並實現了 Runnable 接口,注意其中的 firstTask 和 thread 屬性:firstTask 用它來保存傳入的任務;thread 是在調用構造方法時通過ThreadFactory 來創建的線程,是用來處理任務的線程。在調用構造方法時,需要傳入任務,這裡通過 getThreadFactory().newThread(this);來新建一個線程,newThread 方法傳入的參數是 this,因為 Worker 本身繼承了 Runnable 接口,也就是一個線程,所以一個 Worker 對象在啟動的時候會調用 Worker 類中的 run 方法。Worker 繼承了 AQS,使用 AQS 來實現獨佔鎖的功能。為什麼不使用 ReentrantLock 來實現呢?可以看到 tryAcquire 方法,它是不允許重入的,而 ReentrantLock 是允許重入的:lock 方法一旦獲取了獨佔鎖,表示當前線程正在執行任務中;那麼它會有以下幾個作用

  • 如果正在執行任務,則不應該中斷線程;
  • 如果該線程現在不是獨佔鎖的狀態,也就是空閒的狀態,說明它沒有在處理任務,這時可以對該線程進行中斷;
  • 線程池在執行 shutdown 方法或 tryTerminate 方法時會調用 interruptIdleWorkers 方法來中斷空閒的線程,interruptIdleWorkers 方法會使用 tryLock 方法來判斷線程池中的線程是否是空閒狀態
  • 之所以設置為不可重入,是因為我們不希望任務在調用像 setCorePoolSize 這樣的線程池控制方法時重新獲取鎖,這樣會中斷正在運行的線程
線程池的實現原理與源碼分析

addWorkerFailed

addWorker 方法中,如果添加 Worker 並且啟動線程失敗,則會做失敗後的處理。這個方法主要做兩件事

1. 如果 worker 已經構造好了,則從 workers 集合中移除這個 worker

2. 原子遞減核心線程數(因為在 addWorker 方法中先做了原子增加)

3. 嘗試結束線程池

線程池的實現原理與源碼分析

runWorker 方法

前面已經瞭解了 ThreadPoolExecutor 的核心方法 addWorker,主要作用是增加工作線程,而 Worker 簡單理解其實就是一個線程,裡面重寫了 run 方法,這塊是線程池中執行任務的真正處理邏輯,也就是 runWorker 方法,這個方法主要做幾件事

1. 如果 task 不為空,則開始執行 task

2. 如果 task 為空,則通過 getTask()再去取任務,並賦值給 task,如果取到的 Runnable 不為空,則執行該任務

3. 執行完畢後,通過 while 循環繼續 getTask()取任務

4. 如果 getTask()取到的任務依然是空,那麼整個 runWorker()方法執行完畢

線程池的實現原理與源碼分析

getTask

worker 線程會從阻塞隊列中獲取需要執行的任務,這個方法不是簡單的 take 數據,我們來分析下他的源碼實現,

你也許好奇是怎樣判斷線程有多久沒有活動了,是不是以為線程池會啟動一個監控線程,專門監控哪個線程正在偷懶?想太多,其實只是在線程從工作隊列 poll 任務時,加上了超時限制,如果線程在 keepAliveTime 的時間內 poll 不到任務,那我就認為這條線程沒事做,可以幹掉了,看看這個代碼片段你就清楚了。

線程池的實現原理與源碼分析

這裡重要的地方是第二個 if 判斷,目的是控制線程池的有效線程數量。由上文中的分析可以知道,在執行 execute 方法時,如果當前線程池的線程數量超過了 corePoolSize且小於maximumPoolSize,並且 workQueue 已滿時,則可以增加工作線程,但這時如果超時沒有獲取到任務,也就是 timedOut 為 true 的情況,說明 workQueue 已經為空了,也就說明了當前線程池中不需要那麼多線程來執行任務了,可以把多於 corePoolSize 數量的線程銷燬掉,保持線程數量在 corePoolSize 即可。什麼時候會銷燬?當然是 runWorker 方法執行完之後,也就是 Worker 中的 run 方法執行完,由 JVM 自動回收。getTask 方法返回 null 時,在 runWorker 方法中會跳出 while 循環,然後會執行

processWorkerExit 方法。

processWorkerExit

runWorker 的 while 循環執行完畢以後,在 finally 中會調用 processWorkerExit, 來銷燬工作線程。到目前為止,我們已經從 execute 方法中輸入了 worker 線程的創建到執行以及最後到銷燬的全部過程。那麼我們繼續回到 execute 方法.我們只分析完addWorker 這段邏輯,繼續來看後面的判斷

execute 後續邏輯分析

如果核心線程數已滿,說明這個時候不能再創建核心線程了,於是走第二個判斷第二個判斷邏輯比較簡單,如果線程池處於運行狀態並且任務隊列沒有滿,則將任務添加到隊列中,第三個判斷,核心線程數滿了,隊列也滿了,那麼這個時候創建新的線程也就是(非核心線程)如果非核心線程數也達到了最大線程數大小,則直接拒絕任務

線程池的實現原理與源碼分析

拒絕策略

1、AbortPolicy:直接拋出異常,默認策略;

2、CallerRunsPolicy:用調用者所在的線程來執行任務;

3、DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;

4、DiscardPolicy:直接丟棄任務;

當然也可以根據應用場景實現 RejectedExecutionHandler 接口,自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務

阿里開發手冊不建議使用線程池

阿里巴巴開發手冊上是說線程池的構建不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式。分析完原理以後,大家自己一定要有一個答案。我來簡單分析下,用 Executors 使得用戶不需要關心線程池的參數配置,意味著大家對於線程池的運行規則也會慢慢的忽略。這會導致一個問題,比如我們用 newFixdThreadPool 或者 singleThreadPool.允許的隊列長度為Integer.MAX_VALUE,如果使用不當會導致大量請求堆積到隊列中導致 OOM 的風險而 newCachedThreadPool,允許創建線程數量為 Integer.MAX_VALUE,也可能會導致大量線程的創建出現 CPU 使用過高或者 OOM 的問題,而如果我們通過 ThreadPoolExecutor 來構造線程池的話,我們勢必要了解線程池構造中每個參數的具體含義,使得開發者在配置參數的時候能夠更加謹慎。

線程池中的線程初始化

默認情況下,創建線程池之後,線程池中是沒有線程的,需要提交任務之後才會創建線程。在實際中如果需要線程池創建之後立即創建線程,可以通過以下兩個方法辦到:

prestartCoreThread():初始化一個核心線程; prestartAllCoreThreads():初始化所有核心線程。

線程池的關閉

ThreadPoolExecutor 提供了兩個方法,用於線程池的關閉,分別是 shutdown()和shutdownNow(),其中:shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完後才終止,但再也不會接受新的任務 shutdownNow():立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務

線程池容量的動態調整

ThreadPoolExecutor 提供了動態調整線程池容量大小的方法: setCorePoolSize()和setMaximumPoolSize(),setCorePoolSize:設置核心池大小 setMaximumPoolSize:設置線程池最大能創建的線程數目大小。

任務緩存隊列及排隊策略

在前面我們多次提到了任務緩存隊列,即 workQueue,它用來存放等待執行的任務。workQueue 的類型為 BlockingQueue,通常可以取下面三種類型:

  • ArrayBlockingQueue:基於數組的先進先出隊列,此隊列創建時必須指定大小;
  • LinkedBlockingQueue:基於鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為 Integer.MAX_VALUE;
  • SynchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

線程池的監控

如果在項目中大規模的使用了線程池,那麼必須要有一套監控體系,來指導當前線程池的狀態,當出現問題的時候可以快速定位到問題。而線程池提供了相應的擴展方法,我們通過重寫線程池的 beforeExecute、afterExecute 和 shutdown 等方式就可以實現對線程的監控,簡單給大家演示一個案例

線程池的實現原理與源碼分析

測試

線程池的實現原理與源碼分析

運行效果:

線程池的實現原理與源碼分析

提交線程submit和execute的區別

  • execute只能接受Runnable類型的任務
  • execute 如果出現異常會拋出
  • execute 沒有返回值
  • submit不管是Runnable還是Callable類型的任務都可以接受
  • 對於 submit 方法,如果傳入一個 Callable,可以得到一個 Future 的返回值
  • submit 方法調用不會拋異常,除非調用 Future.get

最後總結任務提交給線程池之後的處理策略

任務提交給線程池之後的處理策略:

  1. 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
  2. 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
  3. 如果當前線程池中的線程數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;
  4. 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許 為核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。


分享到:


相關文章: