分析源碼,學會正確使用 Java 線程池

在日常的開發工作當中,線程池往往承載著一個應用中最重要的業務邏輯,因此我們有必要更多地去關注線程池的執行情況,包括異常的處理和分析等。本文主要聚焦在如何正確使用線程池上,以及提供一些實用的建議。文中會稍微涉及到一些線程池實現原理方面的知識,但是不會過多展開。

線程池的異常處理

UncaughtExceptionHandler

我們都知道Runnable接口中的run方法是不允許拋出異常的,因此派生出這個線程的主線程可能無法直接獲得該線程在執行過程中的異常信息。如下例:

public static void main(String[] args) throws Exception {

Thread thread = new Thread(() -> {

Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);

System.out.println(1 / 0); // 這行會導致報錯!

});

thread.setUncaughtExceptionHandler((t, e) -> {

e.printStackTrace(); //如果你把這一行註釋掉,這個程序將不會拋出任何異常.

});

thread.start();

}

為什麼會這樣呢?其實我們看一下Thread中的源碼就會發現,Thread在執行過程中如果遇到了異常,會先判斷當前線程是否有設置UncaughtExceptionHandler,如果沒有,則會從線程所在的ThreadGroup中獲取。

注意:每個線程都有自己的ThreadGroup,即使你沒有指定,並且它實現了UncaughtExceptionHandler接口。

我們看下ThreadGroup中默認的對UncaughtExceptionHandler接口的實現:

public void uncaughtException(Thread t, Throwable e) {
if (parent != null) {
parent.uncaughtException(t, e);
} else {
Thread.UncaughtExceptionHandler ueh =
Thread.getDefaultUncaughtExceptionHandler();
if (ueh != null) {
ueh.uncaughtException(t, e);
} else if (!(e instanceof ThreadDeath)) {
System.err.print("Exception in thread \\""
+ t.getName() + "\\" ");
e.printStackTrace(System.err);
}
}
}

這個ThreadGroup如果有父ThreadGroup,則調用父ThreadGroup的uncaughtException,否則調用全局默認的Thread.DefaultUncaughtExceptionHandler,如果全局的handler也沒有設置,則只是簡單地將異常信息定位到System.err中,這就是為什麼我們應當在創建線程的時候,去實現它的UncaughtExceptionHandler接口的原因,這麼做可以讓你更方便地去排查問題。

通過execute提交任務給線程池

回到線程池這個話題,如果我們向線程池提交的任務中,沒有對異常進行try...catch處理,並且運行的時候出現了異常,那會對線程池造成什麼影響呢?答案是沒有影響,線程池依舊可以正常工作,但是異常卻被吞掉了。這通常來說不是一個好事情,因為我們需要拿到原始的異常對象去分析問題。

那麼怎樣才能拿到原始的異常對象呢?我們從線程池的源碼著手開始研究這個問題。當然網上關於線程池的源碼解析文章有很多,這裡限於篇幅,直接給出最相關的部分代碼:

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) {

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}

這個方法就是真正去執行提交給線程池的任務的代碼。

這裡我們略去其中不相關的邏輯,重點關注第19行到第32行的邏輯,其中第23行是真正開始執行提交給線程池的任務,那麼第20行是幹什麼的呢?其實就是在執行提交給線程池的任務之前可以做一些前置工作,同樣的,我們看到第31行,這個是在執行完提交的任務之後,可以做一些後置工作。

beforeExecute這個我們暫且不管,重點關注下afterExecute這個方法。我們可以看到,在執行任務過程中,一旦拋出任何類型的異常,都會提交給afterExecute這個方法,然而查看線程池的源代碼我們可以發現,默認的afterExecute是個空實現,因此,我們有必要繼承ThreadPoolExecutor去實現這個afterExecute方法。

看源碼我們可以發現這個afterExecute方法是protected類型的,從官方註釋上也可以看到,這個方法就是推薦子類去實現的。

當然,這個方法不能隨意去實現,需要遵循一定的步驟,具體的官方註釋也有講,這裡摘抄如下:


*
 {@code
* class ExtendedExecutor extends ThreadPoolExecutor {
* // ...
* protected void afterExecute(Runnable r, Throwable t) {
* super.afterExecute(r, t);
* if (t == null && r instanceof Future>) {
* try {
* Object result = ((Future>) r).get();
* } catch (CancellationException ce) {

* t = ce;
* } catch (ExecutionException ee) {
* t = ee.getCause();
* } catch (InterruptedException ie) {
* Thread.currentThread().interrupt(); // ignore/reset
* }
* }
* if (t != null)
* System.out.println(t);
* }
* }}

那麼通過這種方式,就可以將原先可能被線程池吞掉的異常成功捕獲到,從而便於排查問題。

但是這裡還有個小問題,我們注意到在runWorker方法中,執行task.run();語句之後,各種類型的異常都被拋出了,那這些被拋出的異常去了哪裡?事實上這裡的異常對象最終會被傳入到Thread的dispatchUncaughtException方法中,源碼如下:

private void dispatchUncaughtException(Throwable e) {

getUncaughtExceptionHandler().uncaughtException(this, e);

}

可以看到它會去獲取UncaughtExceptionHandler的實現類,然後調用其中的uncaughtException方法,這也就回到了我們上一小節所分析的UncaughtExceptionHandler實現的具體邏輯。那麼為了拿到最原始的異常對象,除了實現UncaughtExceptionHandler接口之外,也可以考慮實現afterExecute方法。

通過submit提交任務到線程池

這個同樣很簡單,我們還是先回到submit方法的源碼:

public Future submit(Callable task) {

if (task == null) throw new NullPointerException();

RunnableFuture ftask = newTaskFor(task);

execute(ftask);

return ftask;

}

這裡的execute方法調用的是ThreadPoolExecutor中的execute方法,執行邏輯跟通過execute提交任務到線程池是一樣的。我們先重點關注這裡的newTaskFor方法,其源碼如下:

protected  RunnableFuture newTaskFor(Callable callable) {
return new FutureTask(callable);
}

可以看到提交的Callable對象用FutureTask封裝起來了。我們知道最終會執行到上述runWorker這個方法中,並且最核心的執行邏輯就是task.run();這行代碼。我們知道這裡的task其實是FutureTask類型,因此我們有必要看一下FutureTask中的run方法的實現:

public void run() {

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset,

null, Thread.currentThread()))

return;

try {

Callable c = callable;

if (c != null && state == NEW) {

V result;

boolean ran;

try {

result = c.call();

ran = true;

} catch (Throwable ex) {

result = null;

ran = false;

setException(ex);

}

if (ran)

set(result);

}

} finally {

// runner must be non-null until state is settled to

// prevent concurrent calls to run()

runner = null;

// state must be re-read after nulling runner to prevent

// leaked interrupts

int s = state;

if (s >= INTERRUPTING)

handlePossibleCancellationInterrupt(s);

}

}

可以看到這其中跟異常相關的最關鍵的代碼就在第17行,也就是setException(ex);這個地方。我們看一下這個地方的實現:

protected void setException(Throwable t) {

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

outcome = t;

UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state

finishCompletion();

}

}

這裡最關鍵的地方就是將異常對象賦值給了outcome,outcome是FutureTask中的成員變量,我們通過調用submit方法,拿到一個Future對象之後,再調用它的get方法,其中最核心的方法就是report方法,下面給出每個方法的源碼:

首先是get方法:

public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

可以看到最終調用了report方法,其源碼如下:

private V report(int s) throws ExecutionException {

Object x = outcome;

if (s == NORMAL)

return (V)x;

if (s >= CANCELLED)

throw new CancellationException();

throw new ExecutionException((Throwable)x);

}

上面是一些狀態判斷,如果當前任務不是正常執行完畢,或者被取消的話,那麼這裡的x其實就是原始的異常對象,可以看到會被ExecutionException包裝。因此在你調用get方法時,可能會拋出ExecutionException異常,那麼調用它的getCause方法就可以拿到最原始的異常對象了。

綜上所述,針對提交給線程池的任務可能會拋出異常這一問題,主要有以下兩種處理思路:

1:在提交的任務當中自行try...catch,但這裡有個不好的地方就是如果你會提交多種類型的任務到線程池中,每種類型的任務都需要自行將異常try...catch住,比較繁瑣。而且如果你只是catch(Exception e),可能依然會漏掉一些包括Error類型的異常,那為了保險起見,可以考慮catch(Throwable t)。

2:自行實現線程池的afterExecute方法,或者實現Thread的UncaughtExceptionHandler接口。

下面給出我個人創建線程池的一個示例,供大家參考:

BlockingQueue<runnable> queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE);
statisticsThreadPool = new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE,
60, TimeUnit.SECONDS, queue, new ThreadFactoryBuilder()
.setThreadFactory(new ThreadFactory() {
private int count = 0;
private String prefix = "StatisticsTask";
@Override
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + count++);
}
}).setUncaughtExceptionHandler((t, e) -> {
String threadName = t.getName();
logger.error("statisticsThreadPool error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e);
}).build(), (r, executor) -> {
if (!executor.isShutdown()) {
logger.warn("statisticsThreadPool is too busy! waiting to insert task to queue! ");
Uninterruptibles.putUninterruptibly(executor.getQueue(), r);
}
}) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future>) {
try {
Future> future = (Future>) r;
future.get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null) {
logger.error("statisticsThreadPool error msg: {}", t.getMessage(), t);
}
}
};
statisticsThreadPool.prestartAllCoreThreads();
/<runnable>

線程數的設置

我們知道任務一般有兩種:CPU密集型和IO密集型。那麼面對CPU密集型的任務,線程數不宜過多,一般選擇CPU核心數+1或者核心數的2倍是比較合理的一個值。因此我們可以考慮將corePoolSize設置為CPU核心數+1,maxPoolSize設置為核心數的2倍。

同樣的,面對IO密集型任務時,我們可以考慮以核心數乘以4倍作為核心線程數,然後核心數乘以5倍作為最大線程數的方式去設置線程數,這樣的設置會比直接拍腦袋設置一個值會更合理一些。

當然總的線程數不宜過多,控制在100個線程以內比較合理,否則線程數過多可能會導致頻繁地上下文切換,導致系統性能反不如前。


如何正確關閉一個線程池

說到如何正確去關閉一個線程池,這裡面也有點講究。為了實現優雅停機的目標,我們應當先調用shutdown方法,調用這個方法也就意味著,這個線程池不會再接收任何新的任務,但是已經提交的任務還會繼續執行,包括隊列中的。所以,之後你還應當調用awaitTermination方法,這個方法可以設定線程池在關閉之前的最大超時時間,如果在超時時間結束之前線程池能夠正常關閉,這個方法會返回true,否則,一旦超時,就會返回false。通常來說我們不可能無限制地等待下去,因此需要我們事先預估一個合理的超時時間,然後去使用這個方法。

如果awaitTermination方法返回false,你又希望儘可能在線程池關閉之後再做其他資源回收工作,可以考慮再調用一下shutdownNow方法,此時隊列中所有尚未被處理的任務都會被丟棄,同時會設置線程池中每個線程的中斷標誌位。shutdownNow並不保證一定可以讓正在運行的線程停止工作,除非提交給線程的任務能夠正確響應中斷。到了這一步,可以考慮繼續調用awaitTermination方法,或者直接放棄,去做接下來要做的事情。


線程池中的其他有用方法

大家可能有留意到,我在創建線程池的時候,還調用了這個方法:prestartAllCoreThreads。這個方法有什麼作用呢?我們知道一個線程池創建出來之後,在沒有給它提交任何任務之前,這個線程池中的線程數為0。有時候我們事先知道會有很多任務會提交給這個線程池,但是等它一個個去創建新線程開銷太大,影響系統性能,因此可以考慮在創建線程池的時候就將所有的核心線程全部一次性創建完畢,這樣系統起來之後就可以直接使用了。

其實線程池中還提供了其他一些比較有意思的方法。比如我們現在設想一個場景,當一個線程池負載很高,快要撐爆導致觸發拒絕策略時,有沒有什麼辦法可以緩解這一問題?其實是有的,因為線程池提供了設置核心線程數和最大線程數的方法,它們分別是

setCorePoolSize方法setMaximumPoolSize方法。是的,線程池創建完畢之後也是可以更改其線程數的!因此,面對線程池高負荷運行的情況,我們可以這麼處理:

  1. 起一個定時輪詢線程(守護類型),定時檢測線程池中的線程數,具體來說就是調用getActiveCount方法。
  2. 當發現線程數超過了核心線程數大小時,可以考慮將CorePoolSize和MaximumPoolSize的數值同時乘以2,當然這裡不建議設置很大的線程數,因為並不是線程越多越好的,可以考慮設置一個上限值,比如50、100之類的。
  3. 同時,去獲取隊列中的任務數,具體來說是調用getQueue方法再調用size方法。當隊列中的任務數少於隊列大小的二分之一時,我們可以認為現在線程池的負載沒有那麼高了,因此可以考慮在線程池先前有擴容過的情況下,將CorePoolSize和MaximumPoolSize還原回去,也就是除以2。

具體來說如下圖:


分析源碼,學會正確使用 Java 線程池


以上是我個人建議的一種使用線程池的方式.


線程池一定是最佳方案嗎?

線程池並非在任何情況下都是性能最優的方案。如果是一個追求極致性能的場景,可以考慮使用Disruptor,這是一個高性能隊列。排除Disruptor不談,單純基於JDK的話會不會有更好的方案?答案是有的。

我們知道在一個線程池中,多個線程是共用一個隊列的,因此在任務很多的情況下,需要對這個隊列進行頻繁讀寫,為了防止衝突因此需要加鎖。事實上在閱讀線程池源代碼的時候就可以發現,裡面充斥著各種加鎖的代碼,那有沒有更好的實現方式呢?

其實我們可以考慮創建一個由單線程線程池構成的列表,每個線程池都使用有界隊列這種方式去實現多線程。這麼做的好處是,每個線程池中的隊列都只會被一個線程去操作,這樣就沒有競爭的問題。

其實這種用空間換時間的思路借鑑了Netty中EventLoop的實現機制。試想,如果線程池的性能真的有那麼好,為什麼Netty不用呢?


其他需要注意的地方

1:任何情況下都不應該使用可伸縮線程池(線程的創建和銷燬開銷是很大的)。

2:任何情況下都不應該使用無界隊列,單測除外。有界隊列常用的有ArrayBlockingQueue和LinkedBlockingQueue,前者基於數組實現,後者基於鏈表。從性能表現上來看,LinkedBlockingQueue的吞吐量更高但是性能並不穩定,實際情況下應當使用哪一種建議自行測試之後決定。順便說一句,Executors的newFixedThreadPool採用的是LinkedBlockingQueue。

3:推薦自行實現RejectedExecutionHandler,JDK自帶的都不是很好用,你可以在裡面實現自己的邏輯。如果需要一些特定的上下文信息,可以在Runnable實現類中添加一些自己的東西,這樣在RejectedExecutionHandler中就可以直接使用了。


怎樣做到不丟任務

這裡其實指的是一種特殊情況,就是比如突然遇到了一股流量尖峰,導致線程池負載已經非常高了,即快要觸發拒絕策略的時候,我們可以怎麼做來儘量防止提交的任務丟失。一般來說當遇到這種情況的時候,應當儘快觸發報警通知研發人員來處理。之後不管是限流也好,還是增加機器也好,甚至是上Kafka、Redis甚至是數據庫用來暫存任務數據也是可以的,但畢竟遠水救不了近火,如果我們希望在正式解決這個問題之前,先儘可能地緩解,可以考慮怎麼做呢?

首先可以考慮的就是我前面提到的動態增大線程池中的線程數,但是假如已經擴容過了,此時不應繼續擴容,否則可能導致系統的吞吐量更低。在這種情況下,應當自行實現RejectedExecutionHandler,具體來說就是在實現類中,單獨開一個單線程的線程池,然後調用原線程池的getQueue方法的put方法,將塞不進去的任務再次嘗試塞進去。當然在隊列滿的時候是塞不進去的,但那至少也只是阻塞了這個單獨的線程而已,並不影響主流程。

當然,這種方案是治標不治本的,面對流量激增這種場景其實業界有很多成熟的做法,只是單純從線程池的角度來看的話,這種方式不失為一種臨時有效的解決方案。

分析源碼,學會正確使用 Java 線程池

"


分享到:


相關文章: