《Java核心技術系列一》ThreadPoolExecutor 源碼剖析

該系列統一使用java8的源碼進行講解。

由於線程的創建與銷燬是存在開銷的,為了避免頻繁的創建與銷燬線程,Java採用了池化技術來管理線程資源。只要涉及到多線程、異步的場景,基本就會有線程池的存在。因此掌握好線程池實現原理對程序員來說非常的重要,也是通往高級程序員以及架構師的必經之路。 本文主要從以下幾個方面對線程池技術進行講解。

  • 剖析線程池的源碼實現
  • 講解使用線程池的注意事項
  • 線程池的變異使用方式(Tomcat與Netty如何使用線程池)
  • 面試中的線程池問答

一. 源碼剖析

為了使線程池可以適用於多種場景,對於線程池的創建提供了多個參數,進行控制。各個參數的含義必須要非常的明確。

1.1 構造方法

  • corePoolSize 核心線程數
  • maximumPoolSize 最大線程數
  • keepAliveTime 保活時間
  • unit 保活時間的單位
  • workQueue 任務隊列
  • threadFactory 線程工廠
  • handler 拒絕策略

結合參數描述一下線程的工作原理,以新來一個任務為例:

1. 新來任務後,如果線程數<corepoolsize>

除了上面步驟提到的參數外,還有

  1. keepAliveTime, unit 保活時間,如果Worker阻塞在從workQueue中獲取任務的時間超過該時間,且線程數>corePoolSize,那麼就會對該Worker進行銷燬,避免過多的線程阻塞,浪費資源。
  2. threadFactory 線程工廠,用於創建線程對象

連接了各個參數的含義,看下構造函數的源碼:

<code>    public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

/<runnable>/<code>

源碼中只是進行參數取值範圍控制,並賦值。

1.2 execute 提交任務

創建好線程池之後,我們就需要往線程池中提交任務,提交任務有兩個方法(低級的面試也會問這兩個方法有什麼區別):

  1. submit() 有返回值,返回Future對象(Future後面再將)
  2. execute() 無返回值

其中 submit也只是任務包裝成Future之後,調用execute,所以這裡我們只需要看execute方法的實現即可。

<code>    public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//線程數小於核心線程數則新增worker執行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//否則,扔到阻塞隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//扔進阻塞隊列後判斷狀態,如果線程池狀態處於非運行狀態,則執行拒絕策略handler
if (! isRunning(recheck) && remove(command))
reject(command);

//如果運行著,但是沒有worker,那麼新增worker執行,為什麼會出現這種情況?
//因為有參數可以控制核心線程數也可以在超時的情況下被銷燬:allowCoreThreadTimeOut這個參數控制
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 隊列慢,則新增worker執行任務
else if (!addWorker(command, false))
//worker也達到上限,則執行拒絕策略
reject(command);
}
/<code>

其中,ctl一個線程安全的AtomicInteger變量,用一個整數來記錄了線程池的狀態(高三位)和目前線程池中線程(Worker)的個數(低29位)舉例說明:ctl的值為:1000 0000 0000 0000 0000 0000 0000 0001 高三位100代表線程池處於運行狀態,低29位為1,說明目前線程池中只有1個線程。workerCountOf(c) 返回的就是低29位表示的數,即線程個數isRuning(c) 就是判斷高3位是否為100,100位運行狀態然後上面的代碼邏輯就是我們一開始整理的新來一個任務時,線程池的執行邏輯。非常的重要,幾乎每次面試都會被問。

1.3 Worker 線程池中的工作者

線程池中的工作者是Worker,Worker不僅對Thread進行了包裝,還繼承了AbstractQueuedSynchronizer(AQS相關的知識簡單講,後面會有文章細講)實現了Runnable,下面我們就帶著問題一起來認識下Worker。

1.3.1. Worker為什麼要實現Runnable接口?

Worker中封裝了Thread,也就是在構造Worker的時候,會創建Thread對象,Thread對象又要關聯一個任務去執行,那這個任務就是Worker自己本身。也就是說:Worker中的線程對象Thread執行的是Worker的run方法。這樣的話,thread一旦執行,執行的就是Worker的run方法,看下Worker的構造方法:

<code>        Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/<code>

從構造方法中即可以看出,thread一旦啟動,調用的就是Worker的run方法。

1.3.2 Worker為什麼還實現了AbstractQueuedSynchronizer

這裡主要是為了實現Worker的中斷。從1.3.1 Worker的構造函數中可以看到,設置狀態為-1, 相當於給Worker加了一把鎖。那什麼時候會解鎖呢?簡單看下runWork方法(也就是Worker的run方法),代碼如下:

<code>final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
//...省略
}
/<code>

其中unlock()方法就是解鎖,unlock方法會調用Worker的release方法,將state的值+1,這樣state值就為0了。

因為Worker創建並不代表Thread執行,只有Thread線程真正執行了,才會響應中斷。此外,在執行每一個task的過程中也不允許中斷。響應中斷的方法如下:

<code>      void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
/<code>

首先會判斷getState(), 這個state就是AQS的值,當Thread線程開始執行後,該值就會變為0,那麼在這個中斷方法中就可以進入進行中斷了。

1.3.3 Worker線程都做了哪些事情

這就要看runWork方法了,代碼如下:

<code>    final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//首次task不為null執行自己的任務,此後從workQueue中去任務
while (task != null || (task = getTask()) != null) {
//上鎖,不允許中斷
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())

wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
/<code>

當worker創建時,firstTask是被賦值了的,所以先執行自己的任務,此後所有的任務都是通過getTask()從workQueue中獲取。拿到任務後先lock加鎖,然後通過調用task.run方法執行任務,執行完成後,解鎖。從這裡可以看出來,在一個任務任務的執行過程中是不需要中斷的通過getTask方法,如果返回的是null,那麼就要執行processWorkerExit,對該Worker進行退出

1.3.4 getTask只是從workerQueue中獲取任務嗎?

getTask除了從workerQueue中獲取任務外,還會對worker的等待時間進行判斷,釋放掉多餘的worker。看下getTask的實現:

<code>    private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 線程池關閉狀態下,如果workQueue空,則減少Worker
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// 判斷是否需要因為worker數>corePoolSize 而銷燬worker
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//超時,且要多餘1個線程,且目前沒有任務需要處理,則進行銷燬Worker
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//僅僅從數量上-1,銷燬Worker的事情讓runWork方法去做
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//去隊列中獲取數據,如果需要考慮超時,則按照超時返回的策略去獲取任務
//如果不需要考慮超時,則直接使用take方法阻塞在workQueue上
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//任務存在,直接將任務返回,執行任務
if (r != null)
return r;

timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
/<code>

getTask方法會根據當前線程池的狀態,去判斷該Worker是否需要有限超時從workQueue中獲取任務,這樣可以讓getTask提前退出,銷燬多餘的Worker。從這裡也可以看出來並不會說先創建的線程就是核心線程,線程池只關心線程的數量,不關心哪些線程是因為<corepoolsize>=corePoolSize創建的,在銷燬的時候是隨機銷燬的。/<corepoolsize>

1.4 Worker何時被啟動的

當一個新任務被提交到線程池後,有三種情況會創建新的worker並啟動worker

  1. 線程數<corepoolsize>
  2. 線程數>=corePoolSize,且workQueue滿時
  3. 任務添加到阻塞隊列後,發現線程數為0時
  4. /<corepoolsize>

會調用addWorker方法完成Worker的新增,代碼如下:

<code>   private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//先通過死循環,保證在ctl上把worker數加上
for (;;) {

int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//構造一個worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//保證線程安全,只能有一個線程執行這段代碼
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)

largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//添加成功後,通過線程啟動worker
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
/<code>

addWorker做了幾件事情

  1. 在死循環中完成對ctl數值+1,這裡為什麼不用加鎖?因為這裡使用的是cas操作,屬於樂觀鎖,不需要加鎖也能保證線程安全的修改ctl
  2. 創建worker,並加鎖將worker放到workers列表中,然後通過執行線程的start方法,調用Worker的run方法,然後執行runWork方法,Worker就開始工作了

到此,關於線程池的核心源碼部分就基本完成了,關於更細緻的源碼剖析,線程池各個狀態的轉換細節可以參考我的另一篇簡書上的文章 https://www.jianshu.com/p/a52f438c16be,有關線程池相關的剩餘部分限於篇幅問題,放在下一篇中繼續剖析。如有問題歡迎大家指正,我們一起學習,共同進步。


分享到:


相關文章: