阿里p8架構師談:Java8 線程池源碼學習

在分享這篇文章之前呢,我先分享下我總結的一些乾貨。

今年年初我花了一個月整理了一份最適合2018年學習的java乾貨,包括基礎+源碼+資料+視頻,都是免費的,在這裡相信有許多

想要學習Java的同學,關注小編頭條號,轉發本篇文章,私信"架構資料"即可

線程池

  • 2017年05月22日 19:13 第一次編輯
  • 2017年12月07日 15:18 完善了一些內容
  • 2018年12月09日 17:25:21 修改了在addWork的邏輯錯理解錯誤,明確worker和Task的區別,以及為什麼會創建一個firstTask為null的工作線程

0、前言

線程池,顧名思義就是線程的池子,在每次需要取線程去執行任務的時候,沒必要每次都創建新線程執行,線程池就是起著維護線程的作用,當有任務的時候就取出一個線程執行,如果任務執行完成則把線程放回到池子中,完成線程的複用,沒必要每次都去創建和銷燬現場,提高效率。

線程池為線程的生命週期的開銷和資源不足提供的解決方案。通過對多個任務的重用線程

那麼什麼是時候使用多線程呢?

  • 單個任務處理時間比較短
  • 處理的任務數比較大,創建和銷燬線程消耗資源過多

1、Exector 接口

阿里p8架構師談:Java8 線程池源碼學習

image.png

  • Exector 接口:運行新任務的簡單接口
  • ExectorService 接口:擴展了Exector接口,添加了一些用來管理執行器生命週期和任務生命週期的方法
  • ScheduledExecutorService 接口:擴展自ExectorService接口,支持Future和定期執行任務
  • Exectors 類:包裝了具體的幾個常用的線程池的定義,便於使用

2、ThreadPoolExecutor 類

java的線程都是在JUC包中,其中java.uitl.concurrent.ThreadPoolExecutor是最為核心的類

其包中包含的四種創建線程池的方法

public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<runnable> workQueue,ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<runnable> workQueue,RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
/<runnable>/<runnable>/<runnable>/<runnable>

具體的調用方法又可以是在Exectors類中,只是提供更加便利的線程池調用方法,如果希望使用更加靈活自定義的線程池,還是建議使用ThreadPoolExecutor

注意到默認由executors創建的線程池的工作隊列都是基於鏈表的阻塞隊列,沒有具體長度

public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<runnable>(),
threadFactory);
}

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<runnable>(),
threadFactory));
}

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<runnable>(),
threadFactory);
}
/<runnable>/<runnable>/<runnable>/<runnable>/<runnable>/<runnable>

2.1 參數說明

  • private volatile int corePoolSize;:核心池大小,噹噹前線程數小於核心池數字,則新加一個任務就創建一個線程處理,當超過了核心池大小則加入到任務隊列中,在創建了線程池後,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池後,線程池中的線程數為0,當有任務來之後,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中
  • private volatile int maximumPoolSize;:最大線程數,線程池最大能創建的線程數目
  • private volatile long keepAliveTime;:線程沒有需要處理的任務時最多保持多久會終止,默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0
  • private volatile ThreadFactory threadFactory;:創建線程的工廠bean,其中
    Executors.defaultThreadFactory()提供默認的線程工廠bean
  • private final BlockingQueue<runnable> workQueue;:任務隊列/<runnable>
  • private volatile RejectedExecutionHandler handler:拒絕處理任務時的策略

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。

ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。

ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)

ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

2.2 重要方法

  • execute()

Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。

  • submit()

ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是調用的execute()方法

  • shutdown()

關閉線程池的方法,不會接受新的任務,會等到已經存在的任務執行完成

  • shutdownNow()

關閉線程池的方法,不會接受新的任務,已經運行的任務也會被中斷執行

注意!當線程處於關閉、關閉中的狀態時,不會再接收新的任務,接收到的任務也是會被直接拒絕的,後續由拒絕Policy執行調用

3、線程池實現原理

3.1 線程池狀態

線程池的狀態,表示當前線程池的工作狀態,例如會不會接受新的任務,之前存在的任務是如何處理的

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits

private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl字段是組合為線程池的運行狀態和有效線程數異或生成的,高三位保存了runState(運行狀態),低29位保存了workerCount(工作線程個數)

  • RUNNING : 能接受新提交的任務,並且能處理阻塞隊列中的任務
  • SHUTDOWN : 關閉線程池,不再接受新提交的任務,但是可以繼續處理在阻塞線程中已經存在的任務,線程在 RUNNING 狀態調用shutdown()方法進入到SHUTDOWN狀態
  • STOP : 不接受新任務,也不處理阻塞隊列的任務,中斷正在處理的線程,線程處於 RUNNING 或者 SHUTDOWN 狀態 調用 shutdownNow()方法進入到STOP 狀態
  • TIDYING 所有的任務都終止了,workCount為0,線程池如果處於該情況下,調用terminated()方法,進入到 TERMINATED 狀態
  • TERMINATED TODO

3.2 任務添加執行

3.2.1 execute(Runnable command)

 public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

1、當前有效線程數小於核心線程數時,應該添加一個新線程addWorker(command, true),成功則退出,否則繼續進程

2、再次獲取ctl值(狀態、線程數可能發生變化)

3、如果線程池處於運行狀態,並且當前任務成功加入到workQueue中,再次進行double-check,理由同上

如果發現線程池不再處於運行狀態,但是任務已經添加到隊列中了,嘗試使用remove移除,如果移除成功,(存在已經被執行的情況)就拒絕該任務(這裡就是上面說的當前狀態是關閉時或者關閉,則拒絕該任務)

否則噹噹前有效線程數為0時,創建一個空的線程,會自動從workQueue中獲取任務去執行

4、運行最後一個else則有如下情況

線程不處於RUNNING狀態

線程是RUNNING狀態,但是工作線程數目已經超過了核心線程數,而且阻塞隊列已經滿了

這個時候通過調用addWorker(command, false),false表示線程上線設置為最大線程數去添加該任務

5、如果沒有正常添加,則拒絕該任務

這就是整個的線程池工作流程,先是核心線程,再是任務隊列,最後是最大線程

3.2.2 addWorker(Runnable firstTask, boolean core)

創建一個新線程去執行任務

firstTask表示需要執行的任務,如果為null,則是單獨的創建一個新線程

core 則是判斷線程數是否需要超過核心線程數目,true表示新增線程數前判斷是否小於核心線程數,false表示新增線程數前判斷是否小於最大線程數

 private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

/*
如果狀態 > SHUTDOWN 直接返回false,此時不執行新任務,也不執行阻塞隊列的任務
但是 在 = SHUTDOWN的狀態,不會處理新任務,還是可以繼續執行在阻塞隊列的任務
情況列為如下三種
1、如果狀態 > SHUTDOWN 直接返回false
2、如果狀態 = SHUTDOWN 而且 firstTask 不為 null 直接返回false(拒絕處理)
3、如果狀態 = SHUTDOWN 而且 firstTask 為 null 而且workQueue為空 直接返回false(workQueue都已經沒有數據了,也就沒必要創建新的空工作線程)

*/

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}

/*
很明顯採用的是經典的**死循環加CAS模式**去判斷當前是否可以新加線程
如果當前工作線程數目超過了最大值,就返回false,不再處理
採取CAS去添加線程數目,如果成功就跳出第一個for循環,這裡需要注意下,這裡**線程池狀態可能會發生變化**,所以有如下代碼

```java
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
```
在進行CAS操作的時候,是針對整個的ctl值進行操作,如果成功,**肯定能夠確保線程狀態沒有發生變化**!

CAS失敗,再次獲取ctl值,如果當前線程池狀態不等於rs,則說明線程池狀態發生變化,需要從for循環重新開始,否則一直在這個for循環內運行

for循環break之後就是表示當前任務可以被創建成一個線程去執行操作了


*/
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
/*
```java
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
```
新建一個Worker對象,其中Thread線程是通過線程工廠bean調用生成的
*/
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
// 利用可重入的排他鎖,確保線程安全
// 需要注意到添加了鎖確實不錯,但是隻是這線程池一塊添加了鎖,其他沒有加鎖的部分依舊可用
// 如果看線程池的showdown和shutdownnow方法,都使用了該鎖,那也就意味著,進入到這裡
// 就不會發生線程池關閉的情況了,AQS已經保證了該請求操作的安全性
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果線程池在RUNNING 運行狀態或者
// 在 SHUTDOWN 而且 firstTask為null,則需要創建一個新線程

// 其實在這裡有個疑問,為什麼在shutdown了之後,不檢查workQueue的長度?
if (t.isAlive()) // precheck that t is startable
// 如果該線程已經存活,則拋出異常,肯定不正常,線程還是剛創建,壓根沒啟動
throw new IllegalThreadStateException();
workers.add(w);
// workers是一個HashSet<worker>()
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// largestPoolSize 表示線程池中存在的最大線程數的情況
workerAdded = true;

// 往工作線程中加入了新的worker成功
}
} finally {
mainLock.unlock();
// 鎖被釋放,其他的線程可以進入了
}
if (workerAdded) {
t.start();
// 這裡才是真正的線程啟動的地方,由於Worker本身是繼承Runnable
// 所以這個start最終運行的也是Worker的run方法,也就是`runWorker`方法
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 沒有正常添加線程到worker中或者線程啟動失敗
addWorkerFailed(w);
/*
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
// 依舊可能存在線程安全問題

// 不得不說,這代碼對線程安全的粒度做的確實很細
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
// 嘗試更新線程池狀態到TERMINATED 狀態
} finally {
mainLock.unlock();
}
}
*/
}
return workerStarted;
}
/<worker>

3.2.3 runWorker(Worker w)

新創建了一個worker,然後執行,如果worker中不包含對於的任務,可以從阻塞隊列中獲取,一旦一個work開始啟動了,如果workQueue中一直存在數據,則其不會退出,會在while中一直循環的

 final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 當前需要執行的線程
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 解綁workder和當前任務的關係,需要執行的是task任務
// 線程已經創建完成wt,wt需要執行task
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// 這是一個死循環,直到無法獲得到有效的task
w.lock();

// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
// STOP 狀態 是調用了 shutdownNow() 之後的線程池狀態
try {
beforeExecute(wt, task);
// 這個可以自定義加一些線程監控mirror
Throwable thrown = null;
try {
task.run();
// 真正的運行線程
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 當前這個workder有效完成的任務數
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

3.2.4 getTask()

從阻塞隊列workQueue中獲取有效的任務

 private Runnable getTask() { 

boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
/*
1、如果當前線程並未STOP,但是卻進入了SHUTDOWN狀態,而且阻塞隊列也沒有任何,則減少workers數目,並且返回null
2、如果當前線程池>=STOP,則直接返回返回null
*/

int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
這個allowCoreThreadTimeOut 就是配套 keepAliveTime 使用
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
/*
wc > maximumPoolSize 工作線程數大於最大線程數,可能是在此時之前設置了最大線程數
(timed && timedOut) 需要設置超時
如果這種情況下,線程數>1或者阻塞隊列為空,則需要減少一個worker
成功了,則返回null,無有效task


線程池中無論是否存在有效任務,必須是存在一個線程在
*/
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 從阻塞隊列中獲取任務,可能是需要設置超時,如果一旦時間超時,則需要在規定的時間內獲取到任務
if (r != null)
return r;
// 這一步也感覺很妙,如果沒有有效的獲取到任務,(可能是這個時候阻塞隊列為空,也有可能是因為沒有設置超時而沒有獲取到),自動的設置這個timeOut為true,加上超時設置
timedOut = true;
} catch (InterruptedException retry) {
// 如果發生了中斷,則回覆這個timeOut設置,進行重試操作
timedOut = false;
}
}
}

至此核心的函數都已經介紹完畢,提交任務,可以立刻處理任務就立馬創建線程處理,如果當前無法馬上處理則加入到阻塞隊列中,如果加入到阻塞隊列都失敗,則調用相關的拒絕策略處理任務

Worker作為處理線程任務則一直在循環接受到新的任務或者從阻塞隊列獲取任務

3.3 其他方法

3.3.1 processWorkerExit(Worker w, boolean completedAbruptly)

此方法是在runWorker中被調用的,當runWorker無法獲取到有效執行的task或者執行任務期間拋出各種異常導致的此worker無法在繼續執行下去,該worker需要被消耗

 private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 如果這個值為true,則不是沒有task導致的,而是在執行任務期間出現問題,故減1
// 如果是正常執行到這裡的,則意味著task=null,在getTask方法中已經進行減一操作的
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// worker是HashSet ,非線程安全,加鎖保證安全
completedTaskCount += w.completedTasks;
workers.remove(w);
// 移除該worker,並彙總共完成的有效任務數,當然這需要確保線程安全
} finally {
mainLock.unlock();
}
tryTerminate();
// 嘗試使得線程池狀態變更為TERMINATED
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// 線程池狀態位RUNNING 或者 SHUTDOWN
if (!completedAbruptly) {
// 如果當前worker是由於沒有有效task而退出的
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())

// 如果設置了超時,而且有工作隊列還有任務存在,則最少保存min=1個worker
min = 1;
// 這裡min=1或者corePoolSize
if (workerCountOf(c) >= min)
// 當前的工作線程個數超過了min,那麼就無需操作工作線程格式,否則需要添加一個空的工作線程,所以min=1
return; // replacement not needed
}
// 直接創建一個新的空的worker
// 防止工作線程 減少到0了,在
addWorker(null, false);
}
}

3.3.2 tryTerminate()

嘗試讓線程池狀態變更為 TERMINATED,結束線程池

 final void tryTerminate() {
for (;;) {
int c = ctl.get();
/*
1、線程池在RUNNING 階段
2、線程池在TIDYING 或者 TERMINATED 階段
3、線程池在SHUTDOWN 而且工作線程不為空

以上幾種情況都直接返回,所有的情況都未發生變化
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;

// 工作線程數不為0,則中斷一個空閒的工作線程(因為工作線程每次工作時都會獲取鎖)

if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 到這裡來的可能就是SHUTDOWN 而且工作線程為空的情況

final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 加鎖操作
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
// 狀態和工作線程數變更成功
try {
terminated(); // 這是一個空方法,可以自定義實現
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

3.3.3 interruptIdleWorkers(boolean onlyOne)

中斷空閒的線程,如果onlyOne為true,則中斷其中的一個

 private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
// 如果t線程不處於中斷狀態,嘗試獲取線程成功
// 能獲取鎖成功,至少能夠說明其沒有進行工作,也就是一個空閒的工作線程

try {
// 調用中斷操作
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

3.4 線程池初始化

如上代碼所示,起初創建線程池的時候,是不包含任何線程的,但是可以人為的創建線程


// 創建一個線程
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
// 添加一個沒有任務的空現場
}
// 創建核心線程數目個線程,直到addWorker返回false
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}

3.5 任務隊列

workQueue的類型為BlockingQueue<runnable>,通常可以取下面三種類型:/<runnable>

  • ArrayBlockingQueue:基於數組的先進先出隊列,此隊列創建時必須指定大小;
  • LinkedBlockingQueue:基於鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;
  • synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

3.6 線程池監控

  • getTaskCount : 線程池已經執行和未執行的任務總數
  • getCompletedTaskCount : 線程池已經完成的任務數目
  • getLargestPoolSize : 線程池曾創建過線程數最多的線程數
  • getPoolSize : 線程池當前線程數目
  • getActiveCount : 當前線程池正在執行的任務數目

3.7 配置線程池大小

  • 如果是CPU密集型任務,就需要儘量壓榨CPU,參考值可以設為 NCPU+1
  • 如果是IO密集型任務,參考值可以設置為2*NCPU

當然,這只是一個參考值,具體的設置還需要根據實際情況進行調整,比如可以先將線程池大小設置為參考值,再觀察任務運行情況和系統負載、資源利用率來進行適當調整。


分享到:


相關文章: