《Java核心技术系列二》ThreadPoolExecutor 使用注意事项

该系列统一使用java8的源码进行讲解

上一篇中对ThreadPoolExecutor的源码以及工作原理进行了讲解。今天来讲解一下在使用的过程中我们应该注意哪些问题

一. 参数冲突问题

在上一篇文章中,我们对ThreadPoolExecutor的构造函数的参数进行了讲解。其中

  1. 当corePoolSize==maximumPoolSize,且核心线程数不允许超时时,设置keepAliveTime与unit是没有意义的(因为没有需要超时的Worker)
  2. 当workQueue为无界队列时,其永远也放不满,这种情况maximumPoolSize就不会生效

二. Exectors 提供的三种线程池能不能用

2.1. 固定大小的线程池
<code>    public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<runnable>());
}
/<runnable>/<code>

看实现可知,线程数固定,且采用的是无界阻塞队列。当任务提交的速度快Worker消耗任务的速度时,任务会在workQueue中不断挤压,最终撑垮整个进程的内存空间,造成OOM,因此线上谨慎使用

2.2 单个线程的线程池
<code>    public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<runnable>()));
}
/<runnable>/<code>

与1相比都是固定大小的线程池,唯一不同的是该线程数为1,存在于1一样的问题。

2.3 Cached线程池
<code>    public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<runnable>());
}
/<runnable>/<code>

与固定大小的线程池正好相反,核心线程数为0,阻塞队列容量为1,有任务就开辟新的Worker执行,空闲时Worker会自动销毁。如果任务提交的任务非常的多,应用进程达到开辟线程的最大上限开辟不了线程抛出OOM,或者因为线程占用太多的内存空间,导致内存被耗尽抛出OOM(一个线程栈的默认大小为1m),或者因为线程太多,线程上下文切换的开销不可忽略,导致任务执行缓存从而拖垮整个应用进程。

综上:上面三种线程池都不推荐使用,而是应该由我们自己制定线程池参数,保证线程个数与任务队列的长度都在可控的范围内,而不是任由其无限制增长,在线上应用中,是不推荐存在这种不可控的情况出现的。

资源都可控制了,那就延伸出另一种问题,达到了线程池的最大处理能力了怎么办?

三. 拒绝策略 RejectedExecutionHandler

当线程池达到自己最大处理能力时(任务队列满,每个Worker都在不停工作),再添加新任务时就会触发拒绝策略。JDK提供了4中拒绝策略

3.1. AbortPolicy 默认

直接拒绝,就是会抛出异常

<code>public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
/<code>

注意是谁抛出异常,是执行提交任务的线程抛出异常。如果提交任务的线程正在循环的处理一批数据,且没有对异常进行捕获处理,那么在执行拒绝策略抛出异常后,后面的任务都不会被执行。如果提交任务的线程是一个用户请求线程,且业务中没有捕获该异常进行处理,用户请求会因为该异常而请求失败。并且这是默认的拒绝策略,初学者很容易掉进这个坑

3.2 DiscardPolicy

直接丢弃,实现就是什么都不做

<code>public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
/<code>

针对不能对任务的场景是不能使用的

3.3 DiscardOldestPolicy

丢弃最老的一个任务

<code>       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
/<code>

先从workQueue中弹出一个任务,然后再提交任务。同样会丢失数据,在不允许丢数据的场景不能使用

3.4 CallerRunsPolicy

调用者线程执行,谁提交任务,谁执行

<code>        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
/<code>

通过直接调用任务run方法的方式自己执行,虽然会延长线程的执行时间,但是这样不会丢失数据

四. 线程池如何关闭

4.1 shutdown

将线程池的状态修改为SHUTDOWN状态,在该状态不允许提交新任务

<code>    public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改线程池的状态
advanceRunState(SHUTDOWN);
//终端空闲的worker
interruptIdleWorkers();
//空方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
/<code>

首先将线程池状态修改为shutdown状态,然后中断所有空闲的Worker。为什么这里需要中断线程呢,因为调用线程的interrupt方法,能够让阻塞在getTask中的take方法与poll方法上的Worker唤醒,唤醒之后,重新检查线程池的状态为shutdown状态,getTash方法就会返回null,从而Thread的run方法,线程销毁。最后调用tryTerminate尝试终止,只要有任务没有执行完成,有Worker在运行就不会转为terminate状态。看下tryTerimate的源码:

<code>    final void tryTerminate() {
for (;;) {

int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// stop 状态 或者shutdown状态,且workQueue没有内容
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// stop状态 或者shutdown状态,worker都已经销毁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//将状态修改为tidying状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//将状态修改为terminate状态
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
/<code>
  1. 首先判断状态,如果是running, tidying,terminate或者shutdown状态且workQueue不为空,就直接返回。总结就是:只有状态为stop 或者 shutdown且workQueue为空了,才能往下走。
  2. 如果Worker还存在,则中断一个空闲的worker,然后返回。
  3. 将状态修改为tidying状态,接着修改为terminate状态,在ThreadPoolExecutor中由于terminated方法时空方法,所以这两个状态可以认为是一样的。

什么样的状态才能转化为tidying状态, 首先前提是Worker都关闭了,其次是 stop,或者 shutdown且队列中没有任务了

4.2 shutdownNow

不同于shutdown的是,这里将状态修改为stop状态

<code>    public List<runnable> shutdownNow() {
List<runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
/<runnable>/<runnable>/<code>

首先将线程池状态修改为stop状态,然后唤醒所有的worker,然后调用tryTerminate方法,该方法已经讲过了。shutdownNow方法会返回阻塞队列中所有的任务。这些任务都没有被执行。

首先我们了解了shutdown与shutdownNow两个方法一个将状态修改为shutdown,一个将状态修改为stop,一个没有返回值,一直返回了workQueue中的任务列表。为了能够更加清晰的认清这两个方法,我们还是有必要再看一次getTask方法对于shutdown状态与stop状态有什么不同的处理操作。

<code>// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
/<code>

如果是shutdown状态,且任务队列中有任务,则继续执行。如果是stop状态,直接返回null,销毁worker。总结:两个方法修改任务状态后都不允许再提交新的任务,shutdown方法将线程池的状态修改为shutdown状态,会等待所有已经提交的任务全部执行完成然后各个worker自动退出,shutdownNow不会执行完成所有的任务,但是会将没有执行完成的任务列表返回,其在getTask中会直接返回null也会引导各个worker自动退出。最终Worker全部销毁,整个线程池关闭

4.3 线程池都是事后自行退出,那是谁在修改线程池的状态到最终terminate状态?

这个问题的答案隐藏在Worker的run方法中,run方法调用了runWorker方法,在runWorker方法中,退出循环后会执行finnaly语句,在这里会调用processWorkerExit方法,而在这个方法中会调用tryTerminate方法,也就是说每有一个Worker执行完成,都会去执行下tryTerminate方法,尝试将线程池的状态修改为terminate状态。代码如下:

<code>private void processWorkerExit(Worker w, boolean completedAbruptly) {
... 省略
tryTerminate();
... 省略

}
/<code>

在我们调用shutdown或者shutdownNow方法后,可以调用awaitTerminate方法,等待线程池关闭。

总结

针对ThreadPoolExecutor的使用事项,就先总结到这,大家如果有好的意见也可以提出来,我们一起探讨。


分享到:


相關文章: