《Java核心技术系列一》ThreadPoolExecutor 源码剖析

该系列统一使用java8的源码进行讲解。

由于线程的创建与销毁是存在开销的,为了避免频繁的创建与销毁线程,Java采用了池化技术来管理线程资源。只要涉及到多线程、异步的场景,基本就会有线程池的存在。因此掌握好线程池实现原理对程序员来说非常的重要,也是通往高级程序员以及架构师的必经之路。 本文主要从以下几个方面对线程池技术进行讲解。

  • 剖析线程池的源码实现
  • 讲解使用线程池的注意事项
  • 线程池的变异使用方式(Tomcat与Netty如何使用线程池)
  • 面试中的线程池问答

一. 源码剖析

为了使线程池可以适用于多种场景,对于线程池的创建提供了多个参数,进行控制。各个参数的含义必须要非常的明确。

1.1 构造方法

  • corePoolSize 核心线程数
  • maximumPoolSize 最大线程数
  • keepAliveTime 保活时间
  • unit 保活时间的单位
  • workQueue 任务队列
  • threadFactory 线程工厂
  • handler 拒绝策略

结合参数描述一下线程的工作原理,以新来一个任务为例:

1. 新来任务后,如果线程数<corepoolsize>

除了上面步骤提到的参数外,还有

  1. keepAliveTime, unit 保活时间,如果Worker阻塞在从workQueue中获取任务的时间超过该时间,且线程数>corePoolSize,那么就会对该Worker进行销毁,避免过多的线程阻塞,浪费资源。
  2. threadFactory 线程工厂,用于创建线程对象

连接了各个参数的含义,看下构造函数的源码:

<code>    public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

/<runnable>/<code>

源码中只是进行参数取值范围控制,并赋值。

1.2 execute 提交任务

创建好线程池之后,我们就需要往线程池中提交任务,提交任务有两个方法(低级的面试也会问这两个方法有什么区别):

  1. submit() 有返回值,返回Future对象(Future后面再将)
  2. execute() 无返回值

其中 submit也只是任务包装成Future之后,调用execute,所以这里我们只需要看execute方法的实现即可。

<code>    public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//线程数小于核心线程数则新增worker执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//否则,扔到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//扔进阻塞队列后判断状态,如果线程池状态处于非运行状态,则执行拒绝策略handler
if (! isRunning(recheck) && remove(command))
reject(command);

//如果运行着,但是没有worker,那么新增worker执行,为什么会出现这种情况?
//因为有参数可以控制核心线程数也可以在超时的情况下被销毁:allowCoreThreadTimeOut这个参数控制
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 队列慢,则新增worker执行任务
else if (!addWorker(command, false))
//worker也达到上限,则执行拒绝策略
reject(command);
}
/<code>

其中,ctl一个线程安全的AtomicInteger变量,用一个整数来记录了线程池的状态(高三位)和目前线程池中线程(Worker)的个数(低29位)举例说明:ctl的值为:1000 0000 0000 0000 0000 0000 0000 0001 高三位100代表线程池处于运行状态,低29位为1,说明目前线程池中只有1个线程。workerCountOf(c) 返回的就是低29位表示的数,即线程个数isRuning(c) 就是判断高3位是否为100,100位运行状态然后上面的代码逻辑就是我们一开始整理的新来一个任务时,线程池的执行逻辑。非常的重要,几乎每次面试都会被问。

1.3 Worker 线程池中的工作者

线程池中的工作者是Worker,Worker不仅对Thread进行了包装,还继承了AbstractQueuedSynchronizer(AQS相关的知识简单讲,后面会有文章细讲)实现了Runnable,下面我们就带着问题一起来认识下Worker。

1.3.1. Worker为什么要实现Runnable接口?

Worker中封装了Thread,也就是在构造Worker的时候,会创建Thread对象,Thread对象又要关联一个任务去执行,那这个任务就是Worker自己本身。也就是说:Worker中的线程对象Thread执行的是Worker的run方法。这样的话,thread一旦执行,执行的就是Worker的run方法,看下Worker的构造方法:

<code>        Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/<code>

从构造方法中即可以看出,thread一旦启动,调用的就是Worker的run方法。

1.3.2 Worker为什么还实现了AbstractQueuedSynchronizer

这里主要是为了实现Worker的中断。从1.3.1 Worker的构造函数中可以看到,设置状态为-1, 相当于给Worker加了一把锁。那什么时候会解锁呢?简单看下runWork方法(也就是Worker的run方法),代码如下:

<code>final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
//...省略
}
/<code>

其中unlock()方法就是解锁,unlock方法会调用Worker的release方法,将state的值+1,这样state值就为0了。

因为Worker创建并不代表Thread执行,只有Thread线程真正执行了,才会响应中断。此外,在执行每一个task的过程中也不允许中断。响应中断的方法如下:

<code>      void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
/<code>

首先会判断getState(), 这个state就是AQS的值,当Thread线程开始执行后,该值就会变为0,那么在这个中断方法中就可以进入进行中断了。

1.3.3 Worker线程都做了哪些事情

这就要看runWork方法了,代码如下:

<code>    final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//首次task不为null执行自己的任务,此后从workQueue中去任务
while (task != null || (task = getTask()) != null) {
//上锁,不允许中断
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())

wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
/<code>

当worker创建时,firstTask是被赋值了的,所以先执行自己的任务,此后所有的任务都是通过getTask()从workQueue中获取。拿到任务后先lock加锁,然后通过调用task.run方法执行任务,执行完成后,解锁。从这里可以看出来,在一个任务任务的执行过程中是不需要中断的通过getTask方法,如果返回的是null,那么就要执行processWorkerExit,对该Worker进行退出

1.3.4 getTask只是从workerQueue中获取任务吗?

getTask除了从workerQueue中获取任务外,还会对worker的等待时间进行判断,释放掉多余的worker。看下getTask的实现:

<code>    private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 线程池关闭状态下,如果workQueue空,则减少Worker
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// 判断是否需要因为worker数>corePoolSize 而销毁worker
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//超时,且要多余1个线程,且目前没有任务需要处理,则进行销毁Worker
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//仅仅从数量上-1,销毁Worker的事情让runWork方法去做
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//去队列中获取数据,如果需要考虑超时,则按照超时返回的策略去获取任务
//如果不需要考虑超时,则直接使用take方法阻塞在workQueue上
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//任务存在,直接将任务返回,执行任务
if (r != null)
return r;

timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
/<code>

getTask方法会根据当前线程池的状态,去判断该Worker是否需要有限超时从workQueue中获取任务,这样可以让getTask提前退出,销毁多余的Worker。从这里也可以看出来并不会说先创建的线程就是核心线程,线程池只关心线程的数量,不关心哪些线程是因为<corepoolsize>=corePoolSize创建的,在销毁的时候是随机销毁的。/<corepoolsize>

1.4 Worker何时被启动的

当一个新任务被提交到线程池后,有三种情况会创建新的worker并启动worker

  1. 线程数<corepoolsize>
  2. 线程数>=corePoolSize,且workQueue满时
  3. 任务添加到阻塞队列后,发现线程数为0时
  4. /<corepoolsize>

会调用addWorker方法完成Worker的新增,代码如下:

<code>   private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//先通过死循环,保证在ctl上把worker数加上
for (;;) {

int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//构造一个worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//保证线程安全,只能有一个线程执行这段代码
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)

largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//添加成功后,通过线程启动worker
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
/<code>

addWorker做了几件事情

  1. 在死循环中完成对ctl数值+1,这里为什么不用加锁?因为这里使用的是cas操作,属于乐观锁,不需要加锁也能保证线程安全的修改ctl
  2. 创建worker,并加锁将worker放到workers列表中,然后通过执行线程的start方法,调用Worker的run方法,然后执行runWork方法,Worker就开始工作了

到此,关于线程池的核心源码部分就基本完成了,关于更细致的源码剖析,线程池各个状态的转换细节可以参考我的另一篇简书上的文章 https://www.jianshu.com/p/a52f438c16be,有关线程池相关的剩余部分限于篇幅问题,放在下一篇中继续剖析。如有问题欢迎大家指正,我们一起学习,共同进步。


分享到:


相關文章: