Java並發之Executor框架

Executor框架簡介

Java的線程既是工作單元,也是執行機制。從JDK5開始,把工作單元和執行機制分離開來。

Executor框架由3大部分組成

任務。

被執行任務需要實現的接口:Runnable接口或Callable接口

異步計算的結果。Future接口和FutureTask類。

任務的執行。兩個關鍵類ThreadPoolExecutor和SeheduledThreadPoolExecutor。

任務

Runnable接口

不含有運行結果

public interface Runnable {

public abstract void run();

}

Callable接口

含有運行結果

public interface Callable {

V call() throws Exception;

}

Runnable與Callable的區別

Runnable和Callable的區別是,

(1)Callable規定的方法是call(),Runnable規定的方法是run().

(2)Callable的任務執行後可返回值,而Runnable的任務是不能返回值的

(3)call方法可以拋出異常,run方法不可以

(4)運行Callable任務可以拿到一個Future對象,Future 表示異步計算的結果。

Future接口

Future接口代表異步計算的結果,通過Future接口提供的方法可以查看異步計算是否執行完成,或者等待執行結果並獲取執行結果,同時還可以取消執行。

public interface Future {

boolean cancel(boolean mayInterruptIfRunning);//取消任務

boolean isCancelled();//是否取消了

boolean isDone();//任務是否完成

//獲取任務執行結果,如果任務還沒完成則會阻塞等待直到任務執行完成

V get() throws InterruptedException, ExecutionException;

//等待一段時間嘗試獲取執行結果,

V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

}

一個使用Runnable的簡單例子

import java.util.concurrent.*;

public class Test {

public static void main(String[] args) throws InterruptedException, ExecutionException {

final ExecutorService exec = Executors.newFixedThreadPool(5);

Runnable task = new Runnable() {

public void run() {

try {

System.out.println(Thread.currentThread().getName() + " is running");

Thread.sleep(1000 * 10);//休眠指定的時間,此處表示該操作比較耗時

} catch (InterruptedException e) {

e.printStackTrace();

}

}

};

exec.submit(task);

//關閉線程池

exec.shutdown();

}

}

一個使用Callable的簡單例子

import java.util.concurrent.*;

public class Test {

public static void main(String[] args) throws InterruptedException, ExecutionException {

final ExecutorService exec = Executors.newFixedThreadPool(5);

Callable call = new Callable() {

public String call() throws Exception {

Thread.sleep(1000 * 10);//休眠指定的時間,此處表示該操作比較耗時

return "Other less important but longtime things.";

}

};

Future task = exec.submit(call);

//重要的事情

System.out.println("Let's do important things. start");

Thread.sleep(1000 * 3);

System.out.println("Let's do important things. end");

//不重要的事情

while(!task.isDone()){

System.out.println("still waiting....");

Thread.sleep(1000 * 1);

}

System.out.println("get sth....");

String obj = task.get();

System.out.println(obj);

//關閉線程池

exec.shutdown();

}

}

運行結果

Let's do important things. start

Let's do important things. end

still waiting....

still waiting....

still waiting....

still waiting....

still waiting....

still waiting....

still waiting....

get sth....

Other less important but longtime things.

任務的執行機制

ThreadPoolExecutor

ThreadPoolExecutor是Executor框架最核心的類,是線程池的實現類。

核心配置參數包括

corePoolSize:核心線程池的大小

maximumPoolSize:最大線程池的大小

BlockingQueue:暫時保存任務的工作隊列

RejectedExecutionHandler:當ThreadPoolExecutor已經飽和時(達到了最大線程池大小且工作隊列已滿)將執行的Handler。

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue workQueue) {

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

Executors.defaultThreadFactory(), defaultHandler);

}

可以創建3種類型的ThreadPoolExecutor。

FixedThreadPool

SingleThreadExecutor

CachedThreadPool

1.FixedThreadPool

FixedThreadPool是固定線程數的線程池,最多線程池中有nThreads個線程。

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads, //nThreads為固定線程數

0L, TimeUnit.MILLISECONDS, //空閒線程的等待時間為0ms,表示立刻被終止

new LinkedBlockingQueue()); //工作隊列

}

Java併發之Executor框架

FixedThreadPool的execute()方法內部執行過程

當新任務被提交時,如果當前運行線程數小於nTheads,創建新線程執行任務

如果當前運行線程數等於設置的最大線程數nThreads,將新任務加入到工作隊列LinkedBlockingQueue中

線程執行完任務後會反覆從LinkedBlockingQueue中獲取新任務執行

LinkedBlockingQueue中沒有新任務,線程空閒,線程將被終止。

注意點:

由於工作隊列使用的是無界隊列LinkedBlockingQueue,FixedThreadPool不會拒絕任務(不會調用RejectedExecutionHandler.rejectedExecution()方法)。

2.SingleThreadExecutor

SingleThreadExecutor是FixedThreadPool的特例,線程池中線程的固定數量為1,即最多有一個線程。

public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1, //nThreads為固定線程數

0L, TimeUnit.MILLISECONDS, //空閒線程的等待時間為0ms,表示立刻被終止

new LinkedBlockingQueue())); //工作隊列

}

Java併發之Executor框架

SingleThreadExecutor的execute()方法內部執行過程與注意事項可參考FixedThreadPool的。

3.CachedThreadPool

CachedThreadPool是根據需要創建新線程的線程池。

public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE, //coolPoolSize為0,maxinumPoolSize為Integer.MAX_VALUE

60L, TimeUnit.SECONDS, //空閒線程的等待時間,空閒60s後被終止

new SynchronousQueue()); //工作隊列

}

Java併發之Executor框架

CachedThreadPool的execute()方法的內部運行過程

當新任務被提交時,主線程將任務插入到工作隊列中(SynchronousQueue的offer()方法),如果線程池有空閒線程在等待任務,新任務交給空閒線程處理。

如果線程池中沒有在等待任務的空閒線程,創建新線程執行任務

線程執行完任務後,等待60s(SynchronousQueue.poll(60, TimeUnit.SECONDS)方法),如果沒有等待新任務,線程終止

SynchronousQueue是一個沒有容量的BlockingQueue。每一個插入操作必須等待另一個線程的移除操作。

CachedThreadPool使用SynchronousQueue,把主線程提交的任務傳遞給空閒線程。

Java併發之Executor框架

注意點:

CachedThreadPool的線程池是無解的,沒有限制數量,如果主線程提交任務的速度高於線程處理任務的速度,將不斷創建新線程。

極端情況下,會因創建過多線程耗盡CPU和內存。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor主要用於定時任務(定期執行,給定延遲後執行)

執行方式

1.scheduleAtFixedRate

任務按照固定週期執行,比如設定每10分鐘執行一次,第8分鐘時候執行了第一次,後續執行時間點為第18分鐘,第28分鐘,第38分鐘

public ScheduledFuture scheduleAtFixedRate(Runnable command, //任務

long initialDelay, //初始延遲

long period, //任務執行週期

TimeUnit unit)

2.scheduleWithFixedDelay

任務按照固定延遲執行,比如設定延遲時間為10是分鐘,第8分鐘時候執行了第一次,任務執行完成後,再等待10分鐘,執行下一次。如果任務執行了2分鐘,則下一次為第20分鐘

public ScheduledFuture scheduleWithFixedDelay(Runnable command,//任務

long initialDelay, //初始延遲

long delay, //任務執行延遲

TimeUnit unit)

實現原理

Java併發之Executor框架

數據結構

ScheduledThreadPoolExecutor:定時任務執行器

DelayQueue:使用DelayQueue作為任務隊列,保存待調度的任務,任務按照執行的時間點排序。DelayQueue內部是用PriorityQueue實現。

ScheduledFutureTask:待調度任務。

ScheduledFutureTask的成員變量:

time:任務將被執行的具體時間

sequenceNumber:任務序號,time相同時,序號小的先執行

period:任務執行的間隔週期


分享到:


相關文章: