09.04 Java 併發包之線程池綜述

■ 線程池的創建

在Java中,您可以通過調整-Xss參數來調節每個線程棧的大小(64bit系統默認1024KB),當減小該值時意味著可以創建更多的線程數,但問題是JVM資源是有限的,線程不能

無限創建!

從筆者開發經驗來看,線程池應該是併發包中使用頻率和運用場景最多的併發框架,幾乎所有併發/異步執行任務的需求都需要用到線程池,線程複用,以內部線程池的形式對外提供管理任務執行,線程調度,線程池管理等等服務。合理的使用線程池可以帶來如下三個好處:

1.降低資源消耗:通過重用已創建的線程來降低線程創建和銷燬的消耗

2.提高響應速度:任務到達時不需要等待線程創建就可以立即執行

3.提高線程的可管理性:線程池可以統一管理、分配、調優和監控

■ ThreadPoolExecutor —— 線程池最核心的類

- 類定義: 實現了 AbstractExecutorService 類,ExecutorService,Executor 接口

public class ThreadPoolExecutor extends AbstractExecutorService implements ExecutorService,Executor {

- 構造器:通過觀察每個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工作

Java 併發包之線程池綜述

/**
* 線程工廠默認為DefaultThreadFactory
* 飽和策略默認為AbortPolicy
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
* 線程工廠可配置
* 飽和策略默認為AbortPolicy
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
/**
* 線程工廠默認為DefaultThreadFactory
* 飽和策略可配置
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* 線程工廠可配置
* 飽和策略可配置
*/
public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
/<runnable>/<runnable>/<runnable>/<runnable>
Java 併發包之線程池綜述

- 重要變量

Java 併發包之線程池綜述

//線程池控制器
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//任務隊列
private final BlockingQueue<runnable> workQueue;
//全局鎖
private final ReentrantLock mainLock = new ReentrantLock();
//工作線程集合
private final HashSet<worker> workers = new HashSet<worker>();
//終止條件 - 用於等待任務完成後才終止線程池
private final Condition termination = mainLock.newCondition();
//曾創建過的最大線程數

private int largestPoolSize;
//線程池已完成總任務數
private long completedTaskCount;
//工作線程創建工廠
private volatile ThreadFactory threadFactory;
//飽和拒絕策略執行器
private volatile RejectedExecutionHandler handler;
//工作線程活動保持時間(超時後會被回收) - 納秒
private volatile long keepAliveTime;
/**
* 允許核心工作線程響應超時回收
* false:核心工作線程即使空閒超時依舊存活
* true:核心工作線程一旦超過keepAliveTime仍然空閒就被回收
*/
private volatile boolean allowCoreThreadTimeOut;
//核心工作線程數
private volatile int corePoolSize;
//最大工作線程數
private volatile int maximumPoolSize;
//默認飽和策略執行器 - AbortPolicy -> 直接拋出異常
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
/<worker>/<worker>/<runnable>
Java 併發包之線程池綜述

ThreadPoolExecutor 的使用

- 創建線城池實際上就是實例化一個線程池對象,這裡我們使用最完整的構造器來描述最完整的創建過程:

1. corePoolSize(核心工作線程數):無任務時,線程池允許(維護)的最小空閒線程池數;當一個任務被提交到線程池就新建一個工作線程來執行任務(即使此時有空閒的核心工作線程)直到(實際工作線程數 >= 核心工作線程數)為止;調用 prestartAllCoreThreads()方法會提前創建並啟動所有核心工作線程

2. maximumPoolSize(最大工作線程數):線程池允許創建的最大工作線程數;當(隊列已滿 && 實際工作線程數 < 最大工作線程數)時,線程池會創建新的工作線程(即使此時仍有空閒的工作線程)執行任務直到最大工作線程數為止;設置無界隊列時該參數其實無效

3. keepAliveTime(工作線程最大空閒時間):單位納秒,滿足超時條件且空閒的工作線程會被回收;超時的非核心工作線程會被回收,核心工作線程不會被回收;當allowCoreThreadTimeOut=true 時,則超時的核心工作線程也會被回收;若該值沒有設置則線程會永遠存活;建議當場景為任務短而多時,可以調高時間以提高線程利用率

4. unit(線程活動保持時間單位): 線程活動保持時間單位,可選的包括NANOSECONDS納秒、MICROSECONDS微秒、MILLISECONDS毫秒、SECONDS秒、MINUTES分、HOURS時、DAYS天

5. workQueue(任務隊列): 用來保存等待執行的任務的阻塞隊列;當 (實際工作線程數 >= 核心工作線程數) && (任務數 < 任務隊列長度)時,任務會offer()入隊等待;關於任務隊列詳見下文的任務隊列與排隊策略

6. threadFactory(線程創建工廠): 顧名思義,就是用於創建線程的工廠,允許自定義創建工廠,可以線程進行初始化配置,比如名字、守護線程、異常處理等等

7. handler(飽和策略執行器): 當線程池和隊列都已滿,此時說明線程已無力再接收更多的任務,即任務數飽和,沒法接單了;此時需要使用一種飽和策略處理新提交的任務,默認是Abort(直拋Reject異常),還包括Discard(LIFO規則丟棄)、DiscardOldest(LRU規則丟棄) 以及 CallerRuns(調用者線程執行),允許自定義執行器

從上面給出的 ThreadPoolExecutor 類的代碼可以知道,

ThreadPoolExecutor 繼承了 AbstractExecutorService,我們來看一下 AbstractExecutorService 的實現:

Java 併發包之線程池綜述

public abstract class AbstractExecutorService implements ExecutorService {
protected RunnableFuture newTaskFor(Runnable runnable, T value) { };
protected RunnableFuture newTaskFor(Callable callable) { };
public Future> submit(Runnable task) {};
public Future submit(Runnable task, T result) { };

public Future submit(Callable task) { };
private T doInvokeAny(Collection extends Callable> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
};
public T invokeAny(Collection extends Callable> tasks)
throws InterruptedException, ExecutionException {
};
public T invokeAny(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public List<future>> invokeAll(Collection extends Callable> tasks)
throws InterruptedException {
};
public List<future>> invokeAll(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}
/<future>
/<future>
Java 併發包之線程池綜述

AbstractExecutorService 是一個抽象類,它實現了ExecutorService 接口:

Java 併發包之線程池綜述

public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

Future submit(Callable task);
Future submit(Runnable task, T result);
Future> submit(Runnable task);

List<future>> invokeAll(Collection extends Callable> tasks)
throws InterruptedException;
List<future>> invokeAll(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

T invokeAny(Collection extends Callable> tasks)
throws InterruptedException, ExecutionException;
T invokeAny(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
/<future>
/<future>
Java 併發包之線程池綜述

ExecutorService 又是繼承了Executor 頂層接口:

public interface Executor {
void execute(Runnable command);
}

- 提交、執行和關閉任務 (重要方法)

1. execute(): 適用於提交無須返回值的任務

- 該方法是無法判斷任務是否被線程池執行成功

2. submit(): 適用於提交需要返回值的任務

-可以通過返回的Future對象得知任務是否已經執行成功

-get() 方法會阻塞當前線程直到任務完成,但要注意防範無限阻塞!!!

-使用 get(long timeout,TimeUnit unit) 方法會阻塞當前線程直到任務完成或超時,不會有無限阻塞的發生但需要注意超時後任務可能還沒完成!!!

3. shutdown() : 有序地關閉線程池,已提交的任務會被執行(包含正在執行和任務隊列中的),但會拒絕新任務

shutdownNow(): 立即(嘗試)停止執行所有任務(包含正在執行和任務隊列中的),並返回待執行任務列表

ThreadPoolExecutor 實現原理

- 流程圖

Java 併發包之線程池綜述

- 線程池的狀態

線程狀態的流轉遵循如下順序,即由小到大順序排列:

RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED

* 補充:數值的變遷感覺就好比我們的年齡,越大離上帝就越近 = =

Java 併發包之線程池綜述

//線程池狀態控制器,用於保證線程池狀態和工作線程數 ps:低29位為工作線程數量,高3位為線程池狀態
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//設定偏移量 Integer.SIZE = 32 -> 即COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//確定最大的容量2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//獲取線程池狀態,取高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取工作線程數量,取低29位
private static int workerCountOf(int c) { return c & CAPACITY; }
/**
* 獲取線程池狀態控制器
* @param rs 表示runState 線程池狀態
* @param wc 表示workerCount 工作線程數量
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
Java 併發包之線程池綜述

這裡補充一點二進制運算符基礎知識方便忘卻的讀者理解一下:

&:與運算符,同位都為1才為1,否則為0

|:或運算符,同位有一個為1即為1,否則為0

~:非運算符,0和1互換,即若是0變成1,1則變成0

^:異或運算符,同位相同則為0,不同則為1

- 工人生產(生產者與消費者模式)

之前每個變量的作用都已經標明出來了,這裡通過實例展示其作用:

Java 併發包之線程池綜述

/** 
  假如有一個工廠,工廠裡面有10個工人,每個工人同時只能做一件任務。
  因此只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人做;
  當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;
  如果說新任務數目增長的速度遠遠大於工人做任務的速度,那麼此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;
  然後就將任務也分配給這4個臨時工人做;
  如果說著14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。
  當這14個工人當中有人空閒時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。
**/
Java 併發包之線程池綜述

那麼我們知道其實線程就相當於工人,所以我們來看下線程池的內部類 Worker:

  1. 繼承AQS類: 實現簡單的不可重入互斥鎖,以提供便捷的鎖操作,目的用於處理中斷情況
  2. 實現Runnable接口: "投機取巧"的設計,主要是借用Runnable接口的統一寫法,好處是不用重新寫一個同功能接口
  3. 工作線程: Worker會通過thread變量綁定一個真正執行任務的工作線程(一對一),初始化時就由線程工廠分配好,它會反覆地獲取和執行任務
  4. 任務: Worker每次都會將新任務賦值給firstTask變量,工作線程每次通過該變量處理新獲取到的任務(初始化時該值允許為null,有特殊作用,下文會詳述)
Java 併發包之線程池綜述

/**
  Worker類封裝了 ( 鎖 + 線程 + 任務 ) 這三個部分,從而成為了一個多面手的存在
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
/** 實際上真正的工作線程 - 幕後大佬,但可能因線程工廠創建失敗而為null */
final Thread thread;
/** 待執行任務,可能為null */
Runnable firstTask;
/** 該工作線程已完成的任務數 -- 論KPI的重要性 */
volatile long completedTasks;
Worker(Runnable firstTask) {
//設置鎖狀態為-1,目的是為了阻止在runWorker()之前被中斷
setState(-1);
/**
* 新任務,任務來源有兩個:
* 1.調用addWorker()方法新建線程時傳入的第一個任務
* 2.調用runWorker()方法時內部循環調用getTask() -- 這就是線程複用的具現
*/
this.firstTask = firstTask;
/**
* 創建一個新的線程 -> 這個是真正的工作線程
* 注意Worker本身就是個Runnable對象
* 因此newThread(this)中的this也是個Runnable對象
*/
this.thread = getThreadFactory().newThread(this);
}
}
Java 併發包之線程池綜述

- 執行任務

Java 併發包之線程池綜述

/**
* 工作線程運行
* runWorker方法內部會通過輪詢的方式
* 不停地獲取任務和執行任務直到線程被回收
*/
public void run() {
runWorker(this);
}
Java 併發包之線程池綜述

(重點) 這裡簡單介紹一下線程在線程池執行任務的

工作流程

1.工作線程開始執行前,需先對worker加鎖,任務完成解鎖

2.任務執行前後分別執行beforeExecute()和afterExecute()方法

3.執行中遇到異常會向外拋出,線程是否死亡取決於您對於異常的處理

4.每個任務執行完後,當前工作線程任務完成數自增,同時會循環調用getTask()從任務隊列中反覆獲取任務並執行,無任務可執行時線程會阻塞在該方法上

5.當工作線程因各種理由退出時,會執行processWorkerExit()回收線程(核心是將該worker從workers集合中移除,注意之前worker已經退出任務循環,因此已經不再做工了,從集合移除後就方便gc了)

- 鎖方法

Java 併發包之線程池綜述

// Lock methods
// The value 0 represents the unlocked state. 0表示未鎖定
// The value 1 represents the locked state. 1表示已鎖定
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
//鎖狀態非0即1,即不可重入
//特殊情況:只有初始化時才為-1,目的是防止線程初始化階段被中斷
if (compareAndSetState(0, 1)) {
//當前線程佔有鎖
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
//釋放鎖
setExclusiveOwnerThread(null);
//狀態恢復成未鎖定狀態
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null
&& !t.isInterrupted()){
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
Java 併發包之線程池綜述

- 動態控制

Java 併發包之線程池綜述

/**
* 設置核心工作線程數
* 1.若新值 * 2.若新值>當前值時,新創建的線程(若有必要)直接會處理隊列中的任務
*/
public void setCorePoolSize(int corePoolSize)
/**
* 設置是否響應核心工作線程超時處理
* 1.設置false時,核心工作線程不會因為任務數不足(空閒)而被終止

* 2.設置true時,核心工作線程和非核心工作線程待遇一樣,會因為超時而終止
* 注意:為了禁止出現持續性的線程替換,當設置true時,超時時間必須>0
* 注意:該方法通常應在線程池被使用之前調用
*/
public void allowCoreThreadTimeOut(boolean value)
/**
* 設置最大工作線程數
* 1.若新值 * 注意:當新值>當前值時是無需做任何處理的,跟設置核心工作線程數不一樣
*/
public void setMaximumPoolSize(int maximumPoolSize)
/**
* 設置超時時間,超時後工作線程將被終止
* 注意:若實際工作線程數只剩一個,除非線程池被終止,否則無須響應超時
*/
public void setKeepAliveTime(long time, TimeUnit unit)
Java 併發包之線程池綜述

■ 任務提交與執行

- execute() - 提交任務

Java 併發包之線程池綜述

/**
* 在未來的某個時刻執行給定的任務
* 這個任務由一個新線程執行,或者用一個線程池中已經存在的線程執行
* 如果任務無法被提交執行,要麼是因為這個Executor已經被shutdown關閉

* 要麼是已經達到其容量上限,任務會被當前的RejectedExecutionHandler處理
*/
public void execute(Runnable command) {
//新任務不允許為空,空則拋出NPE
if (command == null)
throw new NullPointerException();
/**
* 1.若實際工作線程數 < 核心工作線程數,會嘗試創建一個工作線程去執行該
* 任務,即該command會作為該線程的第一個任務,即第一個firstTask
*
* 2.若任務入隊成功,仍需要執行雙重校驗,原因有兩點:
* - 第一個是去確認是否需要新建一個工作線程,因為可能存在
* 在上次檢查後已經死亡died的工作線程
* - 第二個是可能在進入該方法後線程池被關閉了,
* 比如執行shutdown()
* 因此需要再次檢查state狀態,並分別處理以上兩種情況:
* - 若線程池中已無可用工作線程了,則需要新建一個工作線程
* - 若線程池已被關閉,則需要回滾入隊列(若有必要)
*
* 3.若任務入隊失敗(比如隊列已滿),則需要新建一個工作線程;
* 若新建線程失敗,說明線程池已停止或者已飽和,必須執行拒絕策略

*/
int c = ctl.get();
/**
* 情況一:當實際工作線程數 < 核心工作線程數時
* 執行方案:會創建一個新的工作線程去執行該任務
* 注意:此時即使有其他空閒的工作線程也還是會新增工作線程,
* 直到達到核心工作線程數為止
*/
if (workerCountOf(c) < corePoolSize) {
/**
* 新增工作線程,true表示要對比的是核心工作線程數
* 一旦新增成功就開始執行當前任務
* 期間也會通過自旋獲取隊列任務進行執行
*/
if (addWorker(command, true))
return;
/**
* 需要重新獲取控制器狀態,說明新增線程失敗
* 線程失敗的原因可能有兩種:
* - 1.線程池已被關閉,非RUNNING狀態的線程池是不允許接收新任務的
* - 2.併發時,假如都通過了workerCountOf(c) < corePoolSize校驗,但其他線程
* 可能會在addWorker先創建出線程,導致workerCountOf(c) >= corePoolSize,
* 即實際工作線程數 >= 核心工作線程數,此時需要進入情況二
*/
c = ctl.get();

}
/**
* 情況二:當實際工作線程數>=核心線程數時,新提交任務需要入隊
* 執行方案:一旦入隊成功,仍需要處理線程池狀態突變和工作線程死亡的情況
*/
if (isRunning(c) && workQueue.offer(command)) {
//雙重校驗
int recheck = ctl.get();
/**
* recheck的目的是為了防止線程池狀態的突變 - 即被關閉
* 一旦線程池非RUNNING狀態時,除了從隊列中移除該任務(回滾)外
* 還需要執行任務拒絕策略處理新提交的任務
*/
if (!isRunning(recheck) && remove(command))
//執行任務拒絕策略
reject(command);
/**
* 若線程池還是RUNNING狀態 或 隊列移除失敗(可能正好被一個工作線程拿到處理了)
* 此時需要確保至少有一個工作線程還可以幹活
* 補充一句:之所有無須與核心工作線程數或最大線程數相比,而只是比較0的原因是
* 只要保證有一個工作線程可以幹活就行,它會自動去獲取任務
*/
else if (workerCountOf(recheck) == 0)
/**
* 若工作線程都已死亡,需要新增一個工作線程去幹活

* 死亡原因可能是線程超時或者異常等等複雜情況
*
* 第一個參數為null指的是傳入一個空任務,
* 目的是創建一個新工作線程去處理隊列中的剩餘任務
* 第二個參數為false目的是提示可以擴容到最大工作線程數
*/
addWorker(null, false);
}
/**
* 情況三:一旦線程池被關閉 或者 新任務入隊失敗(隊列已滿)
* 執行方案:會嘗試創建一個新的工作線程,並允許擴容到最大工作線程數
* 注意:一旦創建失敗,比如超過最大工作線程數,需要執行任務拒絕策略
*/
else if (!addWorker(command, false))
//執行任務拒絕策略
reject(command);
}
Java 併發包之線程池綜述

- addWorker() - 新增工作線程

Java 併發包之線程池綜述

/**
* 新增工作線程需要遵守線程池控制狀態規定和邊界限制
*
* @param core core為true時允許擴容到核心工作線程數,否則為最大工作線程數
* @return 新增成功返回true,失敗返回false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
//重試標籤

retry:
/***
* 外部自旋 -> 目的是確認是否能夠新增工作線程
* 允許新增線程的條件有兩個:
* 1.滿足線程池狀態條件 -> 條件一
* 2.實際工作線程滿足數量邊界條件 -> 條件二
* 不滿足條件時會直接返回false,表示新增工作線程失敗
*/
for (;;) {
//讀取原子控制量 - 包含workerCount(實際工作線程數)和runState(線程池狀態)
int c = ctl.get();
//讀取線程池狀態
int rs = runStateOf(c);
/**
* 條件一.判斷是否滿足線程池狀態條件
* 1.只有兩種情況允許新增線程:
* 1.1 線程池狀態==RUNNING
* 1.2 線程池狀態==SHUTDOWN且firstTask為null同時隊列非空
*
* 2.線程池狀態>=SHUTDOWN時不允許接收新任務,具體如下:
* 2.1 線程池狀態>SHUTDOWN,即為STOP、TIDYING、TERMINATED
* 2.2 線程池狀態==SHUTDOWN,但firstTask非空
* 2.3 線程池狀態==SHUTDOWN且firstTask為空,但隊列為空
* 補充:針對1.2、2.2、2.3的情況具體請參加後面的"小問答"環節
*/
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))

return false;
/***
* 內部自旋 -> 條件二.判斷實際工作線程數是否滿足數量邊界條件
* -數量邊界條件滿足會對嘗試workerCount實現CAS自增,否則新增失敗
* -當CAS失敗時會再次重新判斷是否滿足新增條件:
* 1.若此期間線程池狀態突變(被關閉),重新判斷線程池狀態條件和數量邊界條件
* 2.若此期間線程池狀態一致,則只需重新判斷數量邊界條件
*/
for (;;) {
//讀取實際工作線程數
int wc = workerCountOf(c);
/**
* 新增工作線程會因兩種實際工作線程數超標情況而失敗:
* 1.實際工作線程數 >= 最大容量
* 2.實際工作線程數 > 工作線程比較邊界數(當前最大擴容數)
* -若core = true,比較邊界數 = 核心工作線程數
* -若core = false,比較邊界數 = 最大工作線程數
*/
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/**
* 實際工作線程計數CAS自增:
* 1.一旦成功直接退出整個retry循環,表明新增條件都滿足
* 2.因併發競爭導致CAS更新失敗的原因有三種:

* 2.1 線程池剛好已新增一個工作線程
* -> 計數增加,只需重新判斷數量邊界條件
* 2.2 剛好其他工作線程運行期發生錯誤或因超時被回收
* -> 計數減少,只需重新判斷數量邊界條件
* 2.3 剛好線程池被關閉
* -> 計數減少,工作線程被回收,
* 需重新判斷線程池狀態條件和數量邊界條件
*/
if (compareAndIncrementWorkerCount(c))
break retry;
//重新讀取原子控制量 -> 原因是在此期間可能線程池被關閉了
c = ctl.get();
/**
* 快速檢測是否發生線程池狀態突變
* 1.若狀態突變,重新判斷線程池狀態條件和數量邊界條件
* 2.若狀態一致,則只需重新判斷數量邊界條件
*/
if (runStateOf(c) != rs)
continue retry;
}
}
/**
* 這裡是addWorker方法的一個分割線
* 前面的代碼的作用是決定了線程池接受還是拒絕新增工作線程
* 後面的代碼的作用是真正開始新增工作線程並封裝成Worker接著執行後續操作

* PS:雖然筆者覺得這個方法其實可以拆分成兩個方法的(在break retry的位置)
*/
//記錄新增的工作線程是否開始工作
boolean workerStarted = false;
//記錄新增的worker是否成功添加到workers集合中
boolean workerAdded = false;
Worker w = null;
try {
//將新提交的任務和當前線程封裝成一個Worker
w = new Worker(firstTask);
//獲取新創建的實際工作線程
final Thread t = w.thread;
/**
* 檢測是否有可執行任務的線程,即是否成功創建了新的工作線程
* 1.若存在,則選擇執行任務
* 2.若不存在,則需要執行addWorkerFailed()方法
*/
if (t != null) {
/**
* 新增工作線程需要加全局鎖
* 目的是為了確保安全更新workers集合和largestPoolSize
*/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/**
* 獲得全局鎖後,需再次檢測當前線程池狀態
* 原因在於預防兩種非法情況:
* 1.線程工廠創建線程失敗
* 2.在鎖被獲取之前,線程池就被關閉了

*/
int rs = runStateOf(ctl.get());
/**
* 只有兩種情況是允許添加work進入works集合的
* 也只有進入workers集合後才是真正的工作線程,並開始執行任務
* 1.線程池狀態為RUNNING(即rs<shutdown> * 2.線程池狀態為SHUTDOWN且傳入一個空任務
* (理由參見:小問答之快速檢測線程池狀態?)
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
/**
* 若線程處於活動狀態時,說明線程已啟動,需要立即拋出"線程狀態非法異常"
* 原因是線程是在後面才被start的,已被start的不允許再被添加到workers集合中
* 換句話說該方法新增線程時,而線程是新的,本身應該是初始狀態(new)
* 可能出現的場景:自定義線程工廠newThread有可能會提前啟動線程
*/
if (t.isAlive())
throw new IllegalThreadStateException();
//由於加鎖,所以可以放心的加入集合
workers.add(w);
int s = workers.size();
//更新最大工作線程數,由於持有鎖,所以無需CAS
if (s > largestPoolSize)
largestPoolSize = s;
//確認新建的worker已被添加到workers集合中
workerAdded = true;

}
} finally {
//千萬不要忘記主動解鎖
mainLock.unlock();
}
/**
* 一旦新建工作線程被加入工作線程集合中,就意味著其可以開始幹活了
* 有心的您肯定發現在線程start之前已經釋放鎖了
* 原因在於一旦workerAdded為true時,說明鎖的目的已經達到
* 根據最小化鎖作用域的原則,線程執行任務無須加鎖,這是種優化
* 也希望您在使用鎖時儘量保證鎖的作用域最小化
*/
if (workerAdded) {
/**
* 啟動線程,開始幹活啦
* 若您看過筆者的"併發番@Thread一文通"肯定知道start()後,
* 一旦線程初始化完成便會立即調用run()方法
*/
t.start();
//確認該工作線程開始幹活了
workerStarted = true;
}
}
} finally {
//若新建工作線程失敗或新建工作線程後沒有成功執行,需要做新增失敗處理
if (!workerStarted)
addWorkerFailed(w);
}

//返回結果表明新建的工作線程是否已啟動執行
return workerStarted;
}
/<shutdown>
Java 併發包之線程池綜述

結論之啟動調用會經歷一下過程

(1) worker = new Worker(Runnable) --> (2) thread = newThread(worker) --> (3) thread.start() --> (4) thread.run()[JVM自動調用] --> (5) worker.run() --> (6) threadPoolExecuter.runWorker(worker)

- runWorker() - 執行任務

Java 併發包之線程池綜述

final void runWorker(Worker w) {
//讀取當前線程 -即調用execute()方法的線程(一般是主線程)
Thread wt = Thread.currentThread();
//讀取待執行任務
Runnable task = w.firstTask;
//清空任務 -> 目的是用來接收下一個任務
w.firstTask = null;
/**
* 注意Worker本身也是一把不可重入的互斥鎖!
* 由於Worker初始化時state=-1,因此此處的解鎖的目的是:

* 將state-1變成0,因為只有state>=0時才允許中斷;
* 同時也側面說明在worker調用runWorker()之前是不允許被中斷的,
* 即運行前不允許被中斷
*/
w.unlock();
//記錄是否因異常/錯誤突然完成,默認有異常/錯誤發生
boolean completedAbruptly = true;
try {
/**
* 獲取任務並執行任務,取任務分兩種情況:
* 1.初始任務:Worker被初始化時賦予的第一個任務(firstTask)
* 2.隊列任務:當firstTask任務執行好後,線程不會被回收,而是之後自動自旋從任務隊列中取任務(getTask)
* 此時即體現了線程的複用
*/
while (task != null || (task = getTask()) != null) {
/**
* Worker加鎖的目的是為了在shutdown()時不要立即終止正在運行的worker,
* 因為需要先持有鎖才能終止,而不是為了處理併發情況(注意不是全局鎖)
* 在shutdownNow()時會立即終止worker,因為其無須持有鎖就能終止
* 關於關閉線程池下文會再具體詳述
*/
w.lock();
/**
* 當線程池被關閉且主線程非中斷狀態時,需要重新中斷它

* 由於調用線程一般是主線程,因此這裡是主線程代指調用線程
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
/**
* 每個任務執行前都會調用"前置方法",
* 在"前置方法"可能會拋出異常,
* 結果是退出循環且completedAbruptly=true,
* 從而線程死亡,任務未執行(並被丟棄)
*/
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
/**
* 任務執行結束後,會調用"後置方法"
* 該方法也可能拋異常從而導致線程死亡
* 但值得注意的是任務已經執行完畢
*/
afterExecute(task, thrown);
}
} finally {
//清空任務 help gc
task = null;
//無論成功失敗任務數都要+1,由於持有鎖所以無須CAS

w.completedTasks++;
//必須要主動釋放鎖
w.unlock();
}
}
//無異常時需要清除異常狀態
completedAbruptly = false;
} finally {
/**
* 工作線程退出循環的原因有兩個:
* 1.因意外的錯誤/異常退出
* 2.getTask()返回空 -> 原因有四種,下文會詳述
* 工作線程退出循環後,需要執行相對應的回收處理
*/
processWorkerExit(w, completedAbruptly);
}
}
Java 併發包之線程池綜述

- getTask() - 獲取任務

造成getTask()方法返回null的原因有5種:

1.線程池被關閉,狀態為(STOP || TIDYING || TERMINATED)

2.線程池被關閉,狀態為SHUTDOWN且任務隊列為空

3.實際工作線程數超過最大工作線程數

4.工作線程滿足超時條件後,同時符合下述的任意一種情況:

4.1 線程池中還存在至少一個其他可用的工作線程

4.2 線程池中已沒有其他可用的工作線程但任務隊列為空

Java 併發包之線程池綜述

private Runnable getTask() {
// 記錄任務隊列的poll()是否超時,默認未超時
boolean timedOut = false;
//自旋獲取任務
for (;;) {
/**
* 線程池會依次判斷五種情況,滿足任意一種就返回null:
* 1.線程池被關閉,狀態為(STOP || TIDYING || TERMINATED)
* 2.線程池被關閉,狀態為SHUTDOWN且任務隊列為空
* 3.實際工作線程數超過最大工作線程數
* 4.工作線程滿足超時條件後,同時符合下述的任意一種情況:
* 4.1 線程池中還存在至少一個其他可用的工作線程
* 4.2 線程池中已沒有其他可用的工作線程但任務隊列為空
*/
int c = ctl.get();
int rs = runStateOf(c);
/**
* 判斷線程池狀態條件,有兩種情況直接返回null
* 1.線程池狀態大於SHUTDOWN(STOP||TIDYING||TERMINATED),說明不允許再執行任務
* - 因為>=STOP以上狀態時不允許接收新任務同時會中斷正在執行中的任務,任務隊列的任務也不執行了
*
* 2.線程池狀態為SHUTDOWN且任務隊列為空,說明已經無任務可執行

* - 因為SHUTDOWN時還需要執行任務隊列的剩餘任務,只有當無任務才可退出
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
/**
* 減少一個工作線程數
* 值得注意的是工作線程的回收是放在processWorkerExit()中進行的
* decrementWorkerCount()方法是內部不斷循環執行CAS的,保證最終一定會成功
* 補充:因線程池被關閉而計數減少可能與addWorker()的
* 計數CAS自增發生併發競爭
*/
decrementWorkerCount();
return null;
}
//讀取實際工作線程數
int wc = workerCountOf(c);
/**
* 判斷是否需要處理超時:
* 1.allowCoreThreadTimeOut = true 表示需要回收空閒超時的核心工作線程
* 2.wc > corePoolSize 表示存在空閒超時的非核心工作線程需要回收
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 有三種情況會實際工作線程計數-1且直接返回null
*
* 1.實際工作線程數超過最大線程數
* 2.該工作線程滿足空閒超時條件需要被回收:
* 2.1 當線程池中還存在至少一個其他可用的工作線程

* 2.2 線程池中已沒有其他可用的工作線程但任務隊列為空
*
* 結合2.1和2.2我們可以推導出:
*
* 1.當任務隊列非空時,線程池至少需要維護一個可用的工作線程,
* 因此此時即使該工作線程超時也不會被回收掉而是繼續獲取任務
*
* 2.當實際工作線程數超標或獲取任務超時時,線程池會因為
* 一直沒有新任務可執行,而逐漸減少線程直到核心線程數為止;
* 若設置allowCoreThreadTimeOut為true,則減少到1為止;
*
* 提示:由於wc > maximumPoolSize時必定wc > 1,因此無須比較
* (wc > maximumPoolSize && workQueue.isEmpty()) 這種情況
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
/**
* CAS失敗的原因還是出現併發競爭,具體參考上文
* 當CAS失敗後,說明實際工作線程數已經發生變化,
* 必須重新判斷實際工作線程數和超時情況
* 因此需要countinue
*/
if (compareAndDecrementWorkerCount(c))
return null;
/**
*/
continue;

}
//若滿足獲取任務條件,根據是否需要超時獲取會調用不同方法
try {
/**
* 從任務隊列中取任務分兩種:
* 1.timed=true 表明需要處理超時情況
* -> 調用poll(),超過keepAliveTime返回null
* 2.timed=fasle 表明無須處理超時情況
* -> 調用take(),無任務則掛起等待
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//一旦獲取到任務就返回該任務並退出循環
if (r != null)
return r;
//當任務為空時說明poll超時
timedOut = true;
/**
* 關於中斷異常獲取簡單講一些超出本章範疇的內容
* take()和poll(long timeout, TimeUnit unit)都會throws InterruptedException
* 原因在LockSupport.park(this)不會拋出異常但會響應中斷;
* 但ConditionObject的await()會通過reportInterruptAfterWait()響應中斷
* 具體內容筆者會在阻塞隊列相關番中進一步介紹
*/
} catch (InterruptedException retry) {
/**
* 一旦該工作線程被中斷,需要清除超時標記
* 這表明當工作線程在獲取隊列任務時被中斷,

* 若您不對中斷異常做任務處理,線程池就默認
* 您希望線程繼續執行,這樣就會重置之前的超時標記
*/
timedOut = false;
}
}
}
Java 併發包之線程池綜述

■ 關閉線程池

- 使用shutdown()關閉線程池最主要執行5個操作:

1.獲取全局鎖

2.CAS自旋變更線程池狀態為SHUTDOWN

3.中斷所有空閒工作線程(設置中斷標記) -> 注意是空閒

4.釋放全局鎖

5.嘗試終止線程池

Java 併發包之線程池綜述

/**
* 有序關閉線程池
* 在關閉過程中,之前已提交的任務將被執行(包括正在和隊列中的),
* 但新提交的任務會被拒絕
* 如果線程池已經被關閉,調用該方法不會有任何附加效果
*/
public void shutdown() {
//1.獲取全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//2.CAS自旋變更線程池狀態為SHUTDOWN
advanceRunState(SHUTDOWN);
//3.中斷所有空閒工作線程
interruptIdleWorkers();
//專門提供給ScheduledThreadPoolExecutor的鉤子方法
onShutdown();
} finally {
//4.釋放全局鎖
mainLock.unlock();
}
/**
* 5.嘗試終止線程池,此時線程池滿足兩個條件:
* 1.線程池狀態為SHUTDOWN
* 2.所有空閒工作線程已被中斷
*/
tryTerminate();
}
Java 併發包之線程池綜述

- 使用shutdownNow()關閉線程池最主要執行六個操作:

1.獲取全局鎖

2.CAS自旋變更線程池狀態為SHUTDOWN

3.中斷所有工作線程(設置中斷標記)

4.將剩餘任務重新放入一個list中並清空任務隊列

5.釋放全局鎖

6.嘗試終止線程池

Java 併發包之線程池綜述

/**
* 嘗試中斷所有工作線程,並返回待處理任務列表集合(從任務隊列中移除)
*
* 1.若想等待執行中的線程完成任務,可使用awaitTermination()
* 2.由於取消任務操作是通過Thread#interrupt實現,因此
* 響應中斷失敗的任務可能永遠都不會被終止(謹慎使用!!!)
* 響應中斷失敗指的是您選擇捕獲但不處理該中斷異常
*/
public List<runnable> shutdownNow() {
List<runnable> tasks;
//1.獲取全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//2.CAS自旋更新線程池狀態為STOP
advanceRunState(STOP);
//3.中斷所有工作線程
interruptWorkers();
//4.將剩餘任務重新放入一個list中並清空任務隊列
tasks = drainQueue();
} finally {
//5.釋放全局鎖
mainLock.unlock();
}
/**
* 6.嘗試終止線程池,此時線程池滿足兩個條件:
* 1.線程池狀態為STOP
* 2.任務隊列為空

* 注意:此時不一定所有工作線程都被中斷回收,詳述見
* 7.3 tryTerminate
*/
tryTerminate();
//5.返回待處理任務列表集合
return tasks;
}
/<runnable>/<runnable>
Java 併發包之線程池綜述

■ 飽和拒絕策略

線程池的飽和拒絕策略主要用於拒絕任務(但這並不意味著該任務不會被執行),線程池原生提供了

四種飽和拒絕策略,基本涵蓋常見的飽和處理場景:

AbortPolicy:默認策略,直接拋出異常

CallerRunsPolicy:只用調用線程執行該任務

DiscardPolicy:直接丟棄任務

DiscardOldestPolicy:丟棄隊尾任務並用線程池重新嘗試執行該任務

所有的拒絕策略都需要實現該拒絕處理器接口,以統一口徑:

Java 併發包之線程池綜述

/**
* 用於拒絕線程池任務的處理器
*/
public interface RejectedExecutionHandler {
/**
* 該方法用於拒絕接受線程池任務
*
* 有三種情況可能調用該方法:
* 1.沒有更多的工作線程可用
* 2.任務隊列已滿
* 3.關閉線程池
*
* 當沒有其他處理選擇時,該方法會選擇拋出RejectedExecutionException異常
* 該異常會向上拋出直到execute()的調用者
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Java 併發包之線程池綜述

- CallerRunsPolicy

處理規則:新提交任務由調用者線程直接執行

 推薦:拒絕策略推薦使用CallerRunsPolicy,理由是該策略不會拋棄任務,也不會拋出異常,而是將任務回退到調用者線程中執行

Java 併發包之線程池綜述

/**
* 不會直接丟棄,而是直接用調用execute()方法的線程執行該方法
* 當然一旦線程池已經被關閉,還是要丟棄的
*
* 補充:值得注意的是所有策略類都是public的靜態內部類,
* 其目的應該是告知使用者 -> 該類與線程池相關但無需線程池實例便可直接使用
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
/**
* 直接使用調用該方法的線程執行任務
* 除非線程池被關閉時才會丟棄該任務
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//一旦線程池被關閉,丟棄該任務
if (!e.isShutdown()) {
//注意此時不是線程池執行該任務
r.run();
}
}
}
Java 併發包之線程池綜述

- AbortPolicy

處理規則:直接拋出RejectedExecutionException異常

Java 併發包之線程池綜述

/**
* 簡單、粗暴的直接拋出RejectedExecutionException異常
*/
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/**
* 直接拋出異常,但r.toString()方法會告訴你哪個任務失敗了

* 更人性化的一點是 e.toString()方法還會告訴你:
* 線程池的狀態、工作線程數、隊列長度、已完成任務數
* 建議若是不處理異常起碼也要在日誌裡面打印一下,留個案底
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException(
"Task " + r.toString() + " rejected from " + e.toString());
}
}
Java 併發包之線程池綜述

- DiscardPolicy

處理規則:根據LIFO(後進先出)規則直接丟棄最新提交的任務

Java 併發包之線程池綜述

/**
* 直接丟棄任務
* 這個太狠了,連個案底都沒有,慎用啊
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
/**
* 無作為即為丟棄
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

}
}
Java 併發包之線程池綜述

- DiscardOldestPolicy

處理規則:根據LRU(最近最少使用)規則丟棄最後一個任務,然後嘗試執行新提交的任務

Java 併發包之線程池綜述

/**
* 比起直接丟棄,該類會丟棄隊列裡最後一個但仍未被處理的任務,
* 然後會重新調用execute()方法處理當前任務
* 除非線程池被關閉時才會丟棄該任務
* 此類充分證明了"來得早不如來的巧"
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
/**
* 丟棄隊列裡最近的一個任務,並執行當前任務
* 除非線程池被關閉時才會丟棄該任務
* 原因是隊列是遵循先進先出FIFO原則,poll()會彈出隊尾元素
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//一旦線程池被關閉,直接丟棄
if (!e.isShutdown()) {
//彈出隊尾元素
e.getQueue().poll();
//直接用線程池執行當前任務
e.execute(r);
}
}
}
Java 併發包之線程池綜述


分享到:


相關文章: