09.13 Java併發之線程池ThreadPoolExecutor源碼分析學習


線程池學習

以下所有內容以及源碼分析都是基於JDK1.8的,請知悉。

我寫博客就真的比較沒有順序了,這可能跟我的學習方式有關,我自己也覺得這樣挺不好的,但是沒辦法說服自己去改變,所以也只能這樣想到什麼學什麼了。

池化技術真的是一門在我看來非常牛逼的技術,因為它做到了在有限資源內實現了資源利用的最大化,這讓我想到了一門課程,那就是運籌學,當時在上運籌學的時候就經常做這種類似的問題。

言歸正傳吧,我接下來會進行一次線程池方面知識點的學習,也會記錄下來分享給大家。

線程池的內容當中有涉及到AQS同步器的知識點,如果對AQS同步器知識點感覺有點薄弱。

線程池的優勢

既然說到線程池了,而且大多數的大牛也都會建議我們使用池化技術來管理一些資源,那線程池肯定也是有它的好處的,要不然怎麼會那麼出名並且讓大家使用呢?

我們就來看看它究竟有什麼優勢?

  • 資源可控性:使用線程池可以避免創建大量線程而導致內存的消耗
  • 提高響應速度:線程池地創建實際上是很消耗時間和性能的,由線程池創建好有任務就運行,提升響應速度。
  • 便於管理:池化技術最突出的一個特點就是可以幫助我們對池子裡的資源進行管理。由線程池統一分配和管理。

線程池的創建

我們要用線程池來統一分配和管理我們的線程,那首先我們要創建一個線程池出來,還是有很多大牛已經幫我們寫好了很多方面的代碼的,Executors的工廠方法就給我們提供了創建多種不同線程池的方法。因為這個類只是一個創建對象的工廠,並沒有涉及到很多的具體實現,所以我不會過於詳細地去說明。

老規矩,還是直接上代碼吧。

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<runnable>());
}
/<runnable>

這裡也就舉出一個方法的例子來進行之後的講解吧,我們可以看出,Executors只是個工廠而已,方法也只是來實例化不同的對象,實際上實例化出來的關鍵類就是 ThreadPoolExecutor。現在我們就先來簡單地對 ThreadPoolExecutor 構造函數內的每個參數進行解釋一下吧。

  • corePoolSize(核心線程池大小):當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閒的基本線程能夠執行新任務也會創建線程,當任務數大於核心線程數的時候就不會再創建。在這裡要注意一點,線程池剛創建的時候,其中並沒有創建任何線程,而是等任務來才去創建線程,除非調用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法 ,這樣才會預先創建好 corePoolSize 個線程或者一個線程。
  • maximumPoolSize(線程池最大線程數):線程池允許創建的最大線程數,如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。值得注意的是,如果使用了×××隊列,此參數就沒有意義了。
  • keepAliveTime(線程活動保持時間):此參數默認在線程數大於 corePoolSize 的情況下才會起作用, 當線程的空閒時間達到 keepAliveTime 的時候就會終止,直至線程數目小於 corePoolSize 。不過如果調用了 allowCoreThreadTimeOut 方法,則當線程數目小於 corePoolSize 的時候也會起作用.
  • unit(keelAliveTime的時間單位):keelAliveTime的時間單位,一共有7種,在這裡就不列舉了。
  • workQueue(阻塞隊列):阻塞隊列,用來存儲等待執行的任務,這個參數也是非常重要的,在這裡簡單介紹一下幾個阻塞隊列。
  • ArrayBlockingQueue:這是一個基於數組結構的有界阻塞隊列,此隊列按照FIFO的原則對元素進行排序。
  • LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按照FIFO排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法 Executors.newFixedThreadPool()就是使用了這個隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態。吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法 Executors.newCachedThreadPool() 就使用了這個隊列。
  • PriorityBlockingQueue:一個具有優先級的無阻塞隊列。
  • handler(飽和策略);當線程池和隊列都滿了,說明線程池已經處於飽和狀態了,那麼必須採取一種策略來處理還在提交過來的新任務。這個飽和策略默認情況下是 AbortPolicy ,表示無法處理新任務時拋出異常。共有四種飽和策略提供,當然我們也可以選擇自己實現飽和策略。
  • AbortPolicy:直接丟棄並且拋出 RejectedExecutionException 異常
  • CallerRunsPolicy:只用調用者所在線程來運行任務。
  • DiscardOldestPolicy:丟棄隊列裡最近的一個任務,並執行當前任務。
  • DiscardPolicy:丟棄任務並且不拋出異常。

線程池的執行流程就用參考資料裡的圖介紹一下了,具體我們還是通過代碼去講解。

Java併發之線程池ThreadPoolExecutor源碼分析學習

在上面我們簡單的講解了一下 Executors 這個工廠類裡的工廠方法,並且講述了一下創建線程池的一些參數以及它們的作用,當然上面的講解並不是很深入,因為想要弄懂的話是需要持續地花時間去看去理解的,而博主自己也還是沒有完全弄懂,不過博主的學習方法是先學了個大概,再回頭來看看之前的知識點,可能會更加好理解,所以我們接著往下面講吧。

ThreadPoolExecutor源碼分析

在上面我們就發現了, Executors 的工廠方法主要就返回了 ThreadPoolExecutor 對象,至於另一個在這裡暫時不講,也就是說,要學習線程池,其實關鍵的還是得學會分析 ThreadPoolExecutor 這個對象裡面的源碼,我們接下來就會對 ThreadPoolExecutor 裡的關鍵代碼進行分析。

AtomicInteger ctl

ctl 是主要的控制狀態,是一個複合類型的變量,其中包括了兩個概念。

  • workerCount:表示有效的線程數目
  • runState:線程池裡線程的運行狀態

我們來分析一下跟 ctl 有關的一些源代碼吧,直接上代碼

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//用來表示線程池數量的位數,很明顯是29,Integer.SIZE=32
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程池最大數量,2^29 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//我們可以看出有5種runState狀態,證明至少需要3位來表示runState狀態
//所以高三位就是表示runState了
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//用於存放線程任務的阻塞隊列
private final BlockingQueue<runnable> workQueue;
//重入鎖
private final ReentrantLock mainLock = new ReentrantLock();
//線程池當中的線程集合,只有當擁有mainLock鎖的時候,才可以進行訪問
private final HashSet<worker> workers = new HashSet<worker>();
//等待條件支持終止
private final Condition termination = mainLock.newCondition();
//創建新線程的線程工廠
private volatile ThreadFactory threadFactory;
//飽和策略
private volatile RejectedExecutionHandler handler;
/<worker>/<worker>/<runnable>
  1. CAPACITY
  2. 在這裡我們講一下這個線程池最大數量的計算吧,因為這裡涉及到源碼以及位移之類的操作,我感覺大多數人都還是不太會這個,因為我一開始看的時候也是不太會的。
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

從代碼我們可以看出,是需要 1往左移29位 ,然後再減去1,那個 1往左移29位 是怎麼計算的呢?

1 << COUNT_BITS

1的32位2進制是
00000000 00000000 00000000 00000001

左移29位的話就是
00100000 00000000 00000000 00000000

再進行減一的操作
000 11111 11111111 11111111 11111111

也就是說線程池最大數目就是
000 11111 11111111 11111111 11111111

2.runState

正數的原碼、反碼、補碼都是一樣的

在計算機底層,是用補碼來表示的

private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;

private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
  • RUNNING
  • 可以接受新任務並且處理已經在阻塞隊列的任務
  • 高3位全部是1的話,就是RUNNING狀態
-1 << COUNT_BITS
這裡是-1往左移29位,稍微有點不一樣,-1的話需要我們自己算出補碼來

-1的原碼
10000000 00000000 00000000 00000001

-1的反碼,負數的反碼是將原碼除符號位以外全部取反
11111111 11111111 11111111 11111110

-1的補碼,負數的補碼就是將反碼+1
11111111 11111111 11111111 11111111

關鍵了,往左移29位,所以高3位全是1就是RUNNING狀態
111 00000 00000000 00000000 00000000
  • SHUTDOWN
  • 不接受新任務,但是處理已經在阻塞隊列的任務
  • 高3位全是0,就是SHUTDOWN狀態
0 << COUNT_BITS

0的表示
00000000 00000000 00000000 00000000

往左移29位
00000000 00000000 00000000 00000000
  • STOP
  • 不接受新任務,也不處理阻塞隊列裡的任務,並且會中斷正在處理的任務
  • 所以高3位是001,就是STOP狀態
1 << COUNT_BITS

1的表示
00000000 00000000 00000000 00000001

往左移29位
00100000 00000000 00000000 00000000
  • TIDYING
  • 所有任務都被中止,workerCount是0,線程狀態轉化為TIDYING並且調用terminated()鉤子方法
  • 所以高3位是010,就是TIDYING狀態
2 << COUNT_BITS

2的32位2進制
00000000 00000000 00000000 00000010

往左移29位
01000000 00000000 00000000 00000000
  • TERMINATED
  • terminated()鉤子方法已經完成
  • 所以高3位是110,就是TERMINATED狀態
3 << COUNT_BITS

3的32位2進制
00000000 00000000 00000000 00000011

往左移29位
11000000 00000000 00000000 00000000

3.部分方法介紹

  • runStateOf(int c)

實時獲取runState的方法

private static int runStateOf(int c) { return c & ~CAPACITY; }
~CAPACITY
~是按位取反的意思
&是按位與的意思

而CAPACITY是,高位3個0,低29位都是1,所以是
000 11111 11111111 11111111 11111111

取反的話就是
111 00000 00000000 00000000 00000000

傳進來的c參數與取反的CAPACITY進行按位與操作

1、低位29個0進行按位與,還是29個0
2、高位3個1,既保持c參數的高3位
既高位保持原樣,低29位都是0,這也就獲得了線程池的運行狀態runState
  • workerCountOf(int c)

獲取線程池的當前有效線程數目

private static int workerCountOf(int c) { return c & CAPACITY; }
CAPACITY的32位2進制是
000 11111 11111111 11111111 11111111

用入參c跟CAPACITY進行按位與操作
1、低29位都是1,所以保留c的低29位,也就是有效線程數
2、高3位都是0,所以c的高3位也是0

這樣獲取出來的便是workerCount的值
  • ctlOf(int rs, int wc)
  • 原子整型變量ctl的初始化方法
//結合這幾句代碼來看
private static final int RUNNING = -1 << COUNT_BITS;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int ctlOf(int rs, int wc) { return rs | wc; }
RUNNING是
111 00000 00000000 00000000 00000000


ctlOf是將rs和wc進行按位或的操作

初始化的時候是將RUNNING和0進行按位或
0的32位2進制是
00000000 00000000 00000000 00000000

所以初始化的ctl是
111 00000 00000000 00000000 00000000

核心方法源碼分析

  1. execute(Runnable command)方法
public void execute(Runnable command) {
//需要執行的任務command為空,拋出空指針異常
if (command == null) // 1
throw new NullPointerException();
/*
*執行的流程實際上分為三步
*1、如果運行的線程小於corePoolSize,以用戶給定的Runable對象新開一個線程去執行
* 並且執行addWorker方法會以原子性操作去檢查runState和workerCount,以防止當返回false的
* 時候添加了不應該添加的線程
*2、 如果任務能夠成功添加到隊列當中,我們仍需要對添加的線程進行雙重檢查,有可能添加的線程在前
* 一次檢查時已經死亡,又或者在進入該方法的時候線程池關閉了。所以我們需要複查狀態,並有有必

* 要的話需要在停止時回滾入列操作,或者在沒有線程的時候新開一個線程
*3、如果任務無法入列,那我們需要嘗試新增一個線程,如果新建線程失敗了,我們就知道線程可能關閉了
* 或者飽和了,就需要拒絕這個任務
*
*/
//獲取線程池的控制狀態
int c = ctl.get(); // 2
//通過workCountOf方法算workerCount值,小於corePoolSize
if (workerCountOf(c) < corePoolSize) {
//添加任務到worker集合當中
if (addWorker(command, true))
return; //成功返回
//失敗的話再次獲取線程池的控制狀態
c = ctl.get();
}
/*
*判斷線程池是否正處於RUNNING狀態
*是的話添加Runnable對象到workQueue隊列當中
*/
if (isRunning(c) && workQueue.offer(command)) { // 3
//再次獲取線程池的狀態
int recheck = ctl.get();
//再次檢查狀態
//線程池不處於RUNNING狀態,將任務從workQueue隊列中移除
if (! isRunning(recheck) && remove(command))
//拒絕任務
reject(command);
//workerCount等於0
else if (workerCountOf(recheck) == 0) // 4
//添加worker

addWorker(null, false);
}
//加入阻塞隊列失敗,則嘗試以線程池最大線程數新開線程去執行該任務
else if (!addWorker(command, false)) // 5
//執行失敗則拒絕任務
reject(command);
}

我們來說一下上面這個代碼的流程:

1、首先判斷任務是否為空,空則拋出空指針異常

2、不為空則獲取線程池控制狀態,判斷小於corePoolSize,添加到worker集合當中執行,

  • 如成功,則返回
  • 失敗的話再接著獲取線程池控制狀態,因為只有狀態變了才會失敗,所以重新獲取
  • 3、判斷線程池是否處於運行狀態,是的話則添加command到阻塞隊列,加入時也會再次獲取狀態並且檢測
  • 狀態是否不處於運行狀態,不處於的話則將command從阻塞隊列移除,並且拒絕任務
  • 4、如果線程池裡沒有了線程,則創建新的線程去執行獲取阻塞隊列的任務執行
  • 5、如果以上都沒執行成功,則需要開啟最大線程池裡的線程來執行任務,失敗的話就丟棄

有時候再多的文字也不如一個流程圖來的明白,所以還是畫了個execute的流程圖給大家方便理解。

Java併發之線程池ThreadPoolExecutor源碼分析學習

2.addWorker(Runnable firstTask, boolean core)

private boolean addWorker(Runnable firstTask, boolean core) {
//外部循環標記
retry:
//外層死循環
for (;;) {
//獲取線程池控制狀態
int c = ctl.get();
//獲取runState
int rs = runStateOf(c);

// Check if queue empty only if necessary.
/**
*1.如果線程池runState至少已經是SHUTDOWN
*2. 有一個是false則addWorker失敗,看false的情況
* - runState==SHUTDOWN,即狀態已經大於SHUTDOWN了
* - firstTask為null,即傳進來的任務為空,結合上面就是runState是SHUTDOWN,但是
* firstTask不為空,代表線程池已經關閉了還在傳任務進來
* - 隊列為空,既然任務已經為空,隊列為空,就不需要往線程池添加任務了
*/
if (rs >= SHUTDOWN && //runState大於等於SHUTDOWN,初始位RUNNING
! (rs == SHUTDOWN && //runState等於SHUTDOWN
firstTask == null && //firstTask為null
! workQueue.isEmpty())) //workQueue隊列不為空
return false;

//內層死循環
for (;;) {
//獲取線程池的workerCount數量
int wc = workerCountOf(c);
//如果workerCount超出最大值或者大於corePoolSize/maximumPoolSize

//返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//通過CAS操作,使workerCount數量+1,成功則跳出循環,回到retry標記
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS操作失敗,再次獲取線程池的控制狀態
c = ctl.get(); // Re-read ctl
//如果當前runState不等於剛開始獲取的runState,則跳出內層循環,繼續外層循環
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//CAS由於更改workerCount而失敗,繼續內層循環
}
}

//通過以上循環,能執行到這是workerCount成功+1了
//worker開始標記
boolean workerStarted = false;
//worker添加標記
boolean workerAdded = false;
//初始化worker為null
Worker w = null;
try {
//初始化一個當前Runnable對象的worker對象
w = new Worker(firstTask);
//獲取該worker對應的線程
final Thread t = w.thread;
//如果線程不為null
if (t != null) {
//初始線程池的鎖
final ReentrantLock mainLock = this.mainLock;
//獲取鎖
mainLock.lock();
try {

// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//獲取鎖後再次檢查,獲取線程池runState
int rs = runStateOf(ctl.get());

//當runState小於SHUTDOWN或者runState等於SHUTDOWN並且firstTask為null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//線程已存活
if (t.isAlive()) // precheck that t is startable
//線程未啟動就存活,拋出IllegalThreadStateException異常
throw new IllegalThreadStateException();
//將worker對象添加到workers集合當中
workers.add(w);
//獲取workers集合的大小
int s = workers.size();
//如果大小超過largestPoolSize
if (s > largestPoolSize)
//重新設置largestPoolSize
largestPoolSize = s;
//標記worker已經被添加
workerAdded = true;
}
} finally {
//釋放鎖
mainLock.unlock();
}
//如果worker添加成功
if (workerAdded) {
//啟動線程
t.start();
//標記worker已經啟動
workerStarted = true;
}
}
} finally {
//如果worker沒有啟動成功
if (! workerStarted)
//workerCount-1的操作

addWorkerFailed(w);
}
//返回worker是否啟動的標記
return workerStarted;
}

我們也簡單說一下這個代碼的流程吧,還真的是挺難的,博主寫的時候都停了好多次,想砸鍵盤的說:

1、獲取線程池的控制狀態,進行判斷,不符合則返回false,符合則下一步

2、死循環,判斷workerCount是否大於上限,或者大於corePoolSize/maximumPoolSize,沒有的話則對workerCount+1操作,

3、如果不符合上述判斷或+1操作失敗,再次獲取線程池的控制狀態,獲取runState與剛開始獲取的runState相比,不一致則跳出內層循環繼續外層循環,否則繼續內層循環

4、+1操作成功後,使用重入鎖ReentrantLock來保證往workers當中添加worker實例,添加成功就啟動該實例。

接下來看看流程圖來理解一下上面代碼的一個執行流程

Java併發之線程池ThreadPoolExecutor源碼分析學習

3.addWorkerFailed(Worker w)

addWorker方法添加worker失敗,並且沒有成功啟動任務的時候,就會調用此方法,將任務從workers中移除,並且workerCount做-1操作。

private void addWorkerFailed(Worker w) {
//重入鎖
final ReentrantLock mainLock = this.mainLock;
//獲取鎖
mainLock.lock();
try {
//如果worker不為null
if (w != null)
//workers移除worker
workers.remove(w);
//通過CAS操作,workerCount-1
decrementWorkerCount();
tryTerminate();
} finally {
//釋放鎖
mainLock.unlock();
}
}

4.tryTerminate()

當對線程池執行了非正常成功邏輯的操作時,都會需要執行tryTerminate嘗試終止線程池

final void tryTerminate() {
//死循環
for (;;) {
//獲取線程池控制狀態
int c = ctl.get();
/*
*線程池處於RUNNING狀態
*線程池狀態最小大於TIDYING

*線程池==SHUTDOWN並且workQUeue不為空
*直接return,不能終止
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果workerCount不為0
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}

//獲取線程池的鎖
final ReentrantLock mainLock = this.mainLock;
//獲取鎖
mainLock.lock();
try {
//通過CAS操作,設置線程池狀態為TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//設置線程池的狀態為TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//發送釋放信號給在termination條件上等待的線程
termination.signalAll();
}
return;
}
} finally {
//釋放鎖
mainLock.unlock();
}
// else retry on failed CAS
}
}

5.runWorker(Worker w)

該方法的作用就是去執行任務

final void runWorker(Worker w) {
//獲取當前線程
Thread wt = Thread.currentThread();
//獲取worker裡的任務
Runnable task = w.firstTask;
//將worker實例的任務賦值為null
w.firstTask = null;
/*
*unlock方法會調用AQS的release方法
*release方法會調用具體實現類也就是Worker的tryRelease方法
*也就是將AQS狀態置為0,允許中斷
*/
w.unlock(); // allow interrupts
//是否突然完成
boolean completedAbruptly = true;
try {
//worker實例的task不為空,或者通過getTask獲取的不為空
while (task != null || (task = getTask()) != null) {
//獲取鎖
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/*
*獲取線程池的控制狀態,至少要大於STOP狀態
*如果狀態不對,檢查當前線程是否中斷並清除中斷狀態,並且再次檢查線程池狀態是否大於STOP
*如果上述滿足,檢查該對象是否處於中斷狀態,不清除中斷標記
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())
//中斷改對象
wt.interrupt();
try {
//執行前的方法,由子類具體實現
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//執行完後調用的方法,也是由子類具體實現
afterExecute(task, thrown);
}
} finally {//執行完後
//task設置為null
task = null;
//已完成任務數+1
w.completedTasks++;
//釋放鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
//處理並退出當前worker
processWorkerExit(w, completedAbruptly);
}
}

接下來我們用文字來說明一下執行任務這個方法的具體邏輯和流程。

  1. 首先在方法一進來,就執行了w.unlock(),這是為了將AQS的狀態改為0,因為只有getState() >= 0的時候,線程才可以被中斷;
  2. 判斷firstTask是否為空,為空則通過getTask()獲取任務,不為空接著往下執行
  3. 判斷是否符合中斷狀態,符合的話設置中斷標記
  4. 執行beforeExecute(),task.run(),afterExecute()方法
  5. 任何一個出異常都會導致任務執行的終止;進入processWorkerExit來退出任務
  6. 正常執行的話會接著回到步驟2

附上一副簡單的流程圖:

Java併發之線程池ThreadPoolExecutor源碼分析學習

6.getTask()

在上面的runWorker方法當中我們可以看出,當firstTask為空的時候,會通過該方法來接著獲取任務去執行,那我們就看看獲取任務這個方法到底是怎麼樣的?

private Runnable getTask() {
//標誌是否獲取任務超時
boolean timedOut = false; // Did the last poll() time out?

//死循環
for (;;) {
//獲取線程池的控制狀態
int c = ctl.get();
//獲取線程池的runState
int rs = runStateOf(c);

// Check if queue empty only if necessary.
/*
*判斷線程池的狀態,出現以下兩種情況
*1、runState大於等於SHUTDOWN狀態
*2、runState大於等於STOP或者阻塞隊列為空
*將會通過CAS操作,進行workerCount-1並返回null
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

//獲取線程池的workerCount
int wc = workerCountOf(c);

// Are workers subject to culling?
/*
*allowCoreThreadTimeOut:是否允許core Thread超時,默認false

*workerCount是否大於核心核心線程池
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

/*
*1、wc大於maximumPoolSize或者已超時
*2、隊列不為空時保證至少有一個任務
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
/*
*通過CAS操作,workerCount-1
*能進行-1操作,證明wc大於maximumPoolSize或者已經超時
*/
if (compareAndDecrementWorkerCount(c))
//-1操作成功,返回null
return null;
//-1操作失敗,繼續循環
continue;
}

try {
/*
*wc大於核心線程池
*執行poll方法
*小於核心線程池
*執行take方法
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//判斷任務不為空返回任務
if (r != null)
return r;
//獲取一段時間沒有獲取到,獲取超時
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;

}
}
}

還是文字解說一下上面的代碼邏輯和流程:

  1. 獲取線程池控制狀態和runState,判斷線程池是否已經關閉或者正在關閉,是的話則workerCount-1操作返回null
  2. 獲取workerCount判斷是否大於核心線程池
  3. 判斷workerCount是否大於最大線程池數目或者已經超時,是的話workerCount-1,-1成功則返回null,不成功則回到步驟1重新繼續
  4. 判斷workerCount是否大於核心線程池,大於則用poll方法從隊列獲取任務,否則用take方法從隊列獲取任務
  5. 判斷任務是否為空,不為空則返回獲取的任務,否則回到步驟1重新繼續

接下來依然有一副流程圖:

Java併發之線程池ThreadPoolExecutor源碼分析學習

7.processWorkerExit

明顯的,在執行任務當中,會去獲取任務進行執行,那既然是執行任務,肯定就會有執行完或者出現異常中斷執行的時候,那這時候肯定也會有相對應的操作,至於具體操作是怎麼樣的,我們還是直接去看源碼最實際。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
/*
*completedAbruptly:在runWorker出現,代表是否突然完成的意思
*也就是在執行任務過程當中出現異常,就會突然完成,傳true
*
*如果是突然完成,需要通過CAS操作,workerCount-1
*不是突然完成,則不需要-1,因為getTask方法當中已經-1
*
*下面的代碼註釋貌似與代碼意思相反了
*/
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

//生成重入鎖
final ReentrantLock mainLock = this.mainLock;
//獲取鎖
mainLock.lock();
try {
//線程池統計的完成任務數completedTaskCount加上worker當中完成的任務數
completedTaskCount += w.completedTasks;
//從HashSet<worker>中移除
workers.remove(w);

} finally {
//釋放鎖
mainLock.unlock();
}

//因為上述操作是釋放任務或線程,所以會判斷線程池狀態,嘗試終止線程池
tryTerminate();

//獲取線程池的控制狀態
int c = ctl.get();
//判斷runState是否小魚STOP,即是RUNNING或者SHUTDOWN
//如果是RUNNING或者SHUTDOWN,代表沒有成功終止線程池
if (runStateLessThan(c, STOP)) {
/*
*是否突然完成
*如若不是,代表已經沒有任務可獲取完成,因為getTask當中是while循環
*/
if (!completedAbruptly) {
/*
*allowCoreThreadTimeOut:是否允許core thread超時,默認false
*min-默認是corePoolSize
*/
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//允許core thread超時並且隊列不為空
//min為0,即允許core thread超時,這樣就不需要維護核心核心線程池了
//如果workQueue不為空,則至少保持一個線程存活
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果workerCount大於min,則表示滿足所需,可以直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}

//如果是突然完成,添加一個空任務的worker線程--這裡我也不太理解
addWorker(null, false);
}
}
/<worker>
  1. 首先判斷線程是否突然終止,如果是突然終止,通過CAS,workerCount-1
  2. 統計線程池完成任務數,並將worker從workers當中移除
  3. 判斷線程池狀態,嘗試終止線程池
  4. 線程池沒有成功終止
  • 判斷是否突然完成任務,不是則進行下一步,是則進行第三步
  • 如允許核心線程超時,隊列不為空,則至少保證一個線程存活
  • 添加一個空任務的worker線程

Worker內部類

我們在上面已經算是挺詳細地講了線程池執行任務 execute 的執行流程和一些細節,在上面頻繁地出現了一個字眼,那就是worker實例,那麼這個worker究竟是什麼呢?裡面都包含了一些什麼信息,以及worker這個任務究竟是怎麼執行的呢?

我們就在這個部分來介紹一下吧,還是直接上源碼:

我們可以看到Worker內部類繼承AQS同步器並且實現了Runnable接口,所以Worker很明顯就是一個可執行任務並且又可以控制中斷、起到鎖效果的類。

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** 工作線程,如果工廠失敗則為空. */
final Thread thread;
/** 初始化任務,有可能為空 */
Runnable firstTask;
/** 已完成的任務計數 */
volatile long completedTasks;

/**
* 創建並初始化第一個任務,使用線程工廠來創建線程
* 初始化有3步
*1、設置AQS的同步狀態為-1,表示該對象需要被喚醒
*2、初始化第一個任務
*3、調用ThreadFactory來使自身創建一個線程,並賦值給worker的成員變量thread
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

//重寫Runnable的run方法
/** Delegates main run loop to outer runWorker */
public void run() {
//調用ThreadPoolExecutor的runWorker方法
runWorker(this);
}

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
//代表是否獨佔鎖,0-非獨佔 1-獨佔
protected boolean isHeldExclusively() {
return getState() != 0;
}
//重寫AQS的tryAcquire方法嘗試獲取鎖
protected boolean tryAcquire(int unused) {
//嘗試將AQS的同步狀態從0改為1
if (compareAndSetState(0, 1)) {
//如果改變成,則將當前獨佔模式的線程設置為當前線程並返回true
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
//否則返回false
return false;
}

//重寫AQS的tryRelease嘗試釋放鎖
protected boolean tryRelease(int unused) {
//設置當前獨佔模式的線程為null
setExclusiveOwnerThread(null);
//設置AQS同步狀態為0
setState(0);
//返回true
return true;
}


//獲取鎖
public void lock() { acquire(1); }
//嘗試獲取鎖
public boolean tryLock() { return tryAcquire(1); }
//釋放鎖
public void unlock() { release(1); }
//是否被獨佔
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

小結

寫這個線程池就真的是不容易了,歷時兩個星期,中途有很多的地方不懂,而且《Java併發編程的藝術》的這本書當中對線程池的介紹其實並不算多,所以自己看起來也挺痛苦的,還經常會看了這個方法就不知道為什麼要調用這個以及調用這個方法是出何用意。而且在這學習的過程當中,有在懷疑自己的學習方法對不對,因為也有人跟我說不需要一句句去看去分析源碼,只需要知道流程就可以了,但是後來還是想想按照自己的學習路線走,多讀源碼總是有好處的,在這裡我也給程序猿一些建議,有自己的學習方法的時候,按照自己的方式堅定走下去。


分享到:


相關文章: