java多線程系列:Executors框架

目錄

  1. ExecutorService常用接口介紹
  2. 創建線程池的一些方法介紹
  • 3.1newFixedThreadPool方法
  • 3.2newCachedThreadPool方法
  • 3.3newScheduledThreadPool方法
  • 4.1.Runable接口和Callable接口

Executor接口介紹

Executor是一個接口,裡面提供了一個execute方法,該方法接收一個Runable參數,如下

public interface Executor { void execute(Runnable command);}

Executor框架的常用類和接口結構圖

java多線程系列:Executors框架

線程對象及線程執行返回的對象

java多線程系列:Executors框架

線程對象

線程對象就是提交給線程池的任務,可以實現Runable接口或Callable接口。或許這邊會產生一個疑問,為什麼Runable接口和Callable接口沒有任何關聯,卻都能作為任務來執行?大家可以思考下,文章的結尾會對此進行說明

Future接口

Future接口和FutureTask類是用來接收線程異步執行後返回的結果,可以看到下方ExecutorService接口的submit方法返回的就是Future。

ExecutorService常用接口介紹

接下來我們來看看繼承了Executor接口的ExecutorService

public interface ExecutorService extends Executor { //正常關閉(不再接收新任務,執行完隊列中的任務) void shutdown(); //強行關閉(關閉當前正在執行的任務,返回所有尚未啟動的任務清單) List shutdownNow(); boolean isShutdown(); boolean isTerminated();  Future submit(Callable task);  Future submit(Runnable task, T result); Future> submit(Runnable task); ...} 

ThreadPoolExecutor構造函數介紹

在介紹穿件線程池的方法之前要先介紹一個類ThreadPoolExecutor,應為Executors工廠大部分方法都是返回ThreadPoolExecutor對象,先來看看它的構造函數吧

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {...}

參數介紹

參數類型含義corePoolSizeint核心線程數maximumPoolSizeint最大線程數keepAliveTimelong存活時間unitTimeUnit時間單位workQueueBlockingQueue存放線程的隊列threadFactoryThreadFactory創建線程的工廠handlerRejectedExecutionHandler多餘的的線程處理器(拒絕策略)

創建線程池的一些方法介紹

為什麼要講ExecutorService接口呢?是因為我們使用Executors的方法時返回的大部分都是ExecutorService。

Executors提供了幾個創建線程池方法,接下來我就介紹一下這些方法

newFixedThreadPool(int nThreads)創建一個線程的線程池,若空閒則執行,若沒有空閒線程則暫緩在任務隊列中。newWorkStealingPool()創建持有足夠線程的線程池來支持給定的並行級別,並通過使用多個隊列,減少競爭,它需要穿一個並行級別的參數,如果不傳,則被設定為默認的CPU數量。newSingleThreadExecutor()該方法返回一個固定數量的線程池 該方法的線程始終不變,當有一個任務提交時,若線程池空閒,則立即執行,若沒有,則會被暫緩在一個任務隊列只能怪等待有空閒的線程去執行。newCachedThreadPool() 返回一個可根據實際情況調整線程個數的線程池,不限制最大線程數量,若有空閒的線程則執行任務,若無任務則不創建線程,並且每一個空閒線程會在60秒後自動回收。newScheduledThreadPool(int corePoolSize)返回一個SchededExecutorService對象,但該線程池可以設置線程的數量,支持定時及週期性任務執行。 newSingleThreadScheduledExecutor()創建一個單例線程池,定期或延時執行任務。 

下面講解下幾個常用的方法,創建單個的就不說明了

newFixedThreadPool方法

該方法創建指定線程數量的線程池,沒有限制可存放的線程數量(無界隊列),適用於線程任務執行較快的場景。

java多線程系列:Executors框架

看看Executors工廠內部是如何實現的

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue 
());}

可以看到返回的是一個ThreadPoolExecutor對象,核心線程數和是最大線程數都是傳入的參數,存活時間是0,時間單位是毫秒,阻塞隊列是無界隊列LinkedBlockingQueue。

由於隊列採用的是無界隊列LinkedBlockingQueue,最大線程數maximumPoolSize和keepAliveTime都是無效參數,拒絕策略也將無效,為什麼?

這裡又延伸出一個問題,無界隊列說明任務沒有上限,如果執行的任務比較耗時,那麼新的任務會一直存放在線程池中,線程池的任務會越來越多,將會導致什麼後果?下面的代碼可以試試

public class Main { public static void main(String[] args){ ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); while (true){ pool.submit(new Runnable() { @Override public void run() { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }}

示例代碼

public class Main { public static void main(String[] args){ ExecutorService pool = Executors.newFixedThreadPool(4); for (int i = 0; i < 8; i++) { int finalI = i + 1; pool.submit(() -> { try { System.out.println("任務"+ finalI +":開始等待2秒,時間:"+LocalTime.now()+",當前線程名:"+Thread.currentThread().getName()); Thread.sleep(2000); System.out.println("任務"+ finalI +":結束等待2秒,時間:"+LocalTime.now()+",當前線程名:"+Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }); } pool.shutdown(); }}

輸出結果

任務4:開始等待2秒,時間:17:13:22.048,當前線程名:pool-1-thread-4任務2:開始等待2秒,時間:17:13:22.048,當前線程名:pool-1-thread-2任務3:開始等待2秒,時間:17:13:22.048,當前線程名:pool-1-thread-3任務1:開始等待2秒,時間:17:13:22.048,當前線程名:pool-1-thread-1任務2:結束等待2秒,時間:17:13:24.048,當前線程名:pool-1-thread-2任務3:結束等待2秒,時間:17:13:24.048,當前線程名:pool-1-thread-3任務1:結束等待2秒,時間:17:13:24.048,當前線程名:pool-1-thread-1任務4:結束等待2秒,時間:17:13:24.048,當前線程名:pool-1-thread-4任務6:開始等待2秒,時間:17:13:24.049,當前線程名:pool-1-thread-4任務7:開始等待2秒,時間:17:13:24.049,當前線程名:pool-1-thread-1任務5:開始等待2秒,時間:17:13:24.049,當前線程名:pool-1-thread-3任務8:開始等待2秒,時間:17:13:24.049,當前線程名:pool-1-thread-2任務5:結束等待2秒,時間:17:13:26.050,當前線程名:pool-1-thread-3任務7:結束等待2秒,時間:17:13:26.050,當前線程名:pool-1-thread-1任務8:結束等待2秒,時間:17:13:26.051,當前線程名:pool-1-thread-2任務6:結束等待2秒,時間:17:13:26.050,當前線程名:pool-1-thread-4 

可以看出任務1-4在同一時間執行,在2秒後執行完畢,同時開始執行任務5-8。說明方法內部只創建了4個線程,其他任務存放在隊列中等待執行。

newCachedThreadPool方法

newCachedThreadPool方法創建的線程池會根據需要自動創建新線程。

java多線程系列:Executors框架

看看Executors工廠內部是如何實現的

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());}

newCachedThreadPool方法也是返回ThreadPoolExecutor對象,核心線程是0,最大線程數是Integer的最MAX_VALUE,存活時間是60,時間單位是秒,SynchronousQueue隊列。

從傳入的參數可以得知,在newCachedThreadPool方法中的空閒線程存活時間時60秒,一旦超過60秒線程就會被終止。這邊還隱含了一個問題,如果執行的線程較慢,而提交任務的速度快於線程執行的速度,那麼就會不斷的創建新的線程,從而導致cpu和內存的增長。

代碼和newFixedThreadPool一樣循環添加新的線程任務,我的電腦運行就會出現如下錯誤

An unrecoverable stack overflow has occurred.Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at com.learnConcurrency.executor.cachedThreadPool.Main.main(Main.java:11)Process finished with exit code -1073741571 (0xC00000FD)

關於SynchronousQueue隊列,它是一個沒有容量的阻塞隊列,任務傳遞的示意圖如下

java多線程系列:Executors框架

示例代碼

public class Main { public static void main(String[] args) throws Exception{ ExecutorService pool = Executors.newCachedThreadPool(); for (int i = 0; i < 8; i++) { int finalI = i + 1; pool.submit(() -> { try { System.out.println("任務"+ finalI +":開始等待60秒,時間:"+LocalTime.now()+",當前線程名:"+Thread.currentThread().getName()); Thread.sleep(60000); System.out.println("任務"+ finalI +":結束等待60秒,時間:"+LocalTime.now()+",當前線程名:"+Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }); //睡眠10秒 Thread.sleep(10000); } pool.shutdown(); }}

執行結果

任務1:開始等待60秒,時間:17:15:21.570,當前線程名:pool-1-thread-1任務2:開始等待60秒,時間:17:15:31.553,當前線程名:pool-1-thread-2任務3:開始等待60秒,時間:17:15:41.555,當前線程名:pool-1-thread-3任務4:開始等待60秒,時間:17:15:51.554,當前線程名:pool-1-thread-4任務5:開始等待60秒,時間:17:16:01.554,當前線程名:pool-1-thread-5任務6:開始等待60秒,時間:17:16:11.555,當前線程名:pool-1-thread-6任務7:開始等待60秒,時間:17:16:21.555,當前線程名:pool-1-thread-7任務1:結束等待60秒,時間:17:16:21.570,當前線程名:pool-1-thread-1任務2:結束等待60秒,時間:17:16:31.554,當前線程名:pool-1-thread-2任務8:開始等待60秒,時間:17:16:31.556,當前線程名:pool-1-thread-2任務3:結束等待60秒,時間:17:16:41.555,當前線程名:pool-1-thread-3任務4:結束等待60秒,時間:17:16:51.556,當前線程名:pool-1-thread-4任務5:結束等待60秒,時間:17:17:01.556,當前線程名:pool-1-thread-5任務6:結束等待60秒,時間:17:17:11.555,當前線程名:pool-1-thread-6任務7:結束等待60秒,時間:17:17:21.556,當前線程名:pool-1-thread-7任務8:結束等待60秒,時間:17:17:31.557,當前線程名:pool-1-thread-2

示例代碼中每個任務都睡眠60秒,每次循環添加任務睡眠10秒,從執行結果來看,添加的7個任務都是由不同的線程來執行,而此時線程1和2都執行完畢,任務8添加進來由之前創建的pool-1-thread-2執行。

newScheduledThreadPool方法

這個線程池主要用來延遲執行任務或者定期執行任務。

看看Executors工廠內部是如何實現的

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}

這裡返回的是ScheduledThreadPoolExecutor對象,我們繼續深入進去看看

public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());}

這裡調用的是父類的構造函數,ScheduledThreadPoolExecutor的父類是ThreadPoolExecutor,所以返回的也是ThreadPoolExecutor對象。核心線程數是傳入的參數corePoolSize,線程最大值是Integer的MAX_VALUE,存活時間時0,時間單位是納秒,隊列是DelayedWorkQueue。

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {}

下面是ScheduledExecutorService的一些方法

public interface ScheduledExecutorService extends ExecutorService { //delay延遲時間,unit延遲單位,只執行1次,在經過delay延遲時間之後開始執行 public ScheduledFuture> schedule(Runnable command,long delay, TimeUnit unit); public  ScheduledFuture schedule(Callable callable,long delay, TimeUnit unit); //首次執行時間時然後在initialDelay之後,然後在initialDelay+period 後執行,接著在 initialDelay + 2 * period 後執行,依此類推 public ScheduledFuture> scheduleAtFixedRate(Runnable command,long initialDelay, long period, TimeUnit unit); //首次執行時間時然後在initialDelay之後,然後延遲delay時間執行 public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);}

疑問解答

Runable接口和Callable接口

那麼就從提交任務入口看看吧

submit方法是由抽象類AbstractExecutorService實現的

public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, null); execute(ftask); return ftask;}public  Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask;}

可以看出將傳入的Runnable對象和Callable傳入一個newTaskFor方法,然後返回一個RunnableFuture對象

我們再來看看newTaskFor方法

protected  RunnableFuture newTaskFor(Runnable runnable, T value) { return new FutureTask(runnable, value);}protected  RunnableFuture newTaskFor(Callable callable) { return new FutureTask(callable);}

這裡都是調用FutureTask的構造函數,我們接著往下看

private Callable callable;public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; }public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; }

FutureTask類中有個成員變量callable,而傳入的Runnable對象則繼續調用Executors工廠類的callable方法返回一個Callable對象

public static  Callable callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter(task, result);}//適配器static final class RunnableAdapter implements Callable { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; }}

好了,到這裡也就真相大白了,Runnable對象經過一系列的方法調用,最終被RunnableAdapter適配器適配成Callable對象。方法調用圖如下

java多線程系列:Executors框架

GitHub地址

目錄

  1. ExecutorService常用接口介紹
  2. 創建線程池的一些方法介紹
  • 3.1newFixedThreadPool方法
  • 3.2newCachedThreadPool方法
  • 3.3newScheduledThreadPool方法
  • 4.1.Runable接口和Callable接口

Executor接口介紹

Executor是一個接口,裡面提供了一個execute方法,該方法接收一個Runable參數,如下

public interface Executor { void execute(Runnable command);}

Executor框架的常用類和接口結構圖

java多線程系列:Executors框架

線程對象及線程執行返回的對象

java多線程系列:Executors框架

線程對象

線程對象就是提交給線程池的任務,可以實現Runable接口或Callable接口。或許這邊會產生一個疑問,為什麼Runable接口和Callable接口沒有任何關聯,卻都能作為任務來執行?大家可以思考下,文章的結尾會對此進行說明

Future接口

Future接口和FutureTask類是用來接收線程異步執行後返回的結果,可以看到下方ExecutorService接口的submit方法返回的就是Future。

ExecutorService常用接口介紹

接下來我們來看看繼承了Executor接口的ExecutorService

public interface ExecutorService extends Executor { //正常關閉(不再接收新任務,執行完隊列中的任務) void shutdown(); //強行關閉(關閉當前正在執行的任務,返回所有尚未啟動的任務清單) List shutdownNow(); boolean isShutdown(); boolean isTerminated();  Future submit(Callable task);  Future submit(Runnable task, T result); Future> submit(Runnable task); ...}

ThreadPoolExecutor構造函數介紹

在介紹穿件線程池的方法之前要先介紹一個類ThreadPoolExecutor,應為Executors工廠大部分方法都是返回ThreadPoolExecutor對象,先來看看它的構造函數吧

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue 
workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {...}

參數介紹

參數類型含義corePoolSizeint核心線程數maximumPoolSizeint最大線程數keepAliveTimelong存活時間unitTimeUnit時間單位workQueueBlockingQueue存放線程的隊列threadFactoryThreadFactory創建線程的工廠handlerRejectedExecutionHandler多餘的的線程處理器(拒絕策略)

創建線程池的一些方法介紹

為什麼要講ExecutorService接口呢?是因為我們使用Executors的方法時返回的大部分都是ExecutorService。

Executors提供了幾個創建線程池方法,接下來我就介紹一下這些方法

newFixedThreadPool(int nThreads)創建一個線程的線程池,若空閒則執行,若沒有空閒線程則暫緩在任務隊列中。newWorkStealingPool()創建持有足夠線程的線程池來支持給定的並行級別,並通過使用多個隊列,減少競爭,它需要穿一個並行級別的參數,如果不傳,則被設定為默認的CPU數量。newSingleThreadExecutor()該方法返回一個固定數量的線程池 該方法的線程始終不變,當有一個任務提交時,若線程池空閒,則立即執行,若沒有,則會被暫緩在一個任務隊列只能怪等待有空閒的線程去執行。newCachedThreadPool() 返回一個可根據實際情況調整線程個數的線程池,不限制最大線程數量,若有空閒的線程則執行任務,若無任務則不創建線程,並且每一個空閒線程會在60秒後自動回收。newScheduledThreadPool(int corePoolSize)返回一個SchededExecutorService對象,但該線程池可以設置線程的數量,支持定時及週期性任務執行。 newSingleThreadScheduledExecutor()創建一個單例線程池,定期或延時執行任務。 

下面講解下幾個常用的方法,創建單個的就不說明了

newFixedThreadPool方法

該方法創建指定線程數量的線程池,沒有限制可存放的線程數量(無界隊列),適用於線程任務執行較快的場景。

java多線程系列:Executors框架

看看Executors工廠內部是如何實現的

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue 
());}

可以看到返回的是一個ThreadPoolExecutor對象,核心線程數和是最大線程數都是傳入的參數,存活時間是0,時間單位是毫秒,阻塞隊列是無界隊列LinkedBlockingQueue。

由於隊列採用的是無界隊列LinkedBlockingQueue,最大線程數maximumPoolSize和keepAliveTime都是無效參數,拒絕策略也將無效,為什麼?

這裡又延伸出一個問題,無界隊列說明任務沒有上限,如果執行的任務比較耗時,那麼新的任務會一直存放在線程池中,線程池的任務會越來越多,將會導致什麼後果?下面的代碼可以試試

public class Main { public static void main(String[] args){ ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); while (true){ pool.submit(new Runnable() { @Override public void run() { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }}

示例代碼

public class Main { public static void main(String[] args){ ExecutorService pool = Executors.newFixedThreadPool(4); for (int i = 0; i < 8; i++) { int finalI = i + 1; pool.submit(() -> { try { System.out.println("任務"+ finalI +":開始等待2秒,時間:"+LocalTime.now()+",當前線程名:"+Thread.currentThread().getName()); Thread.sleep(2000); System.out.println("任務"+ finalI +":結束等待2秒,時間:"+LocalTime.now()+",當前線程名:"+Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }); } pool.shutdown(); }}

輸出結果

任務4:開始等待2秒,時間:17:13:22.048,當前線程名:pool-1-thread-4任務2:開始等待2秒,時間:17:13:22.048,當前線程名:pool-1-thread-2任務3:開始等待2秒,時間:17:13:22.048,當前線程名:pool-1-thread-3任務1:開始等待2秒,時間:17:13:22.048,當前線程名:pool-1-thread-1任務2:結束等待2秒,時間:17:13:24.048,當前線程名:pool-1-thread-2任務3:結束等待2秒,時間:17:13:24.048,當前線程名:pool-1-thread-3任務1:結束等待2秒,時間:17:13:24.048,當前線程名:pool-1-thread-1任務4:結束等待2秒,時間:17:13:24.048,當前線程名:pool-1-thread-4任務6:開始等待2秒,時間:17:13:24.049,當前線程名:pool-1-thread-4任務7:開始等待2秒,時間:17:13:24.049,當前線程名:pool-1-thread-1任務5:開始等待2秒,時間:17:13:24.049,當前線程名:pool-1-thread-3任務8:開始等待2秒,時間:17:13:24.049,當前線程名:pool-1-thread-2任務5:結束等待2秒,時間:17:13:26.050,當前線程名:pool-1-thread-3任務7:結束等待2秒,時間:17:13:26.050,當前線程名:pool-1-thread-1任務8:結束等待2秒,時間:17:13:26.051,當前線程名:pool-1-thread-2任務6:結束等待2秒,時間:17:13:26.050,當前線程名:pool-1-thread-4 

可以看出任務1-4在同一時間執行,在2秒後執行完畢,同時開始執行任務5-8。說明方法內部只創建了4個線程,其他任務存放在隊列中等待執行。

newCachedThreadPool方法

newCachedThreadPool方法創建的線程池會根據需要自動創建新線程。

java多線程系列:Executors框架

看看Executors工廠內部是如何實現的

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());}

newCachedThreadPool方法也是返回ThreadPoolExecutor對象,核心線程是0,最大線程數是Integer的最MAX_VALUE,存活時間是60,時間單位是秒,SynchronousQueue隊列。

從傳入的參數可以得知,在newCachedThreadPool方法中的空閒線程存活時間時60秒,一旦超過60秒線程就會被終止。這邊還隱含了一個問題,如果執行的線程較慢,而提交任務的速度快於線程執行的速度,那麼就會不斷的創建新的線程,從而導致cpu和內存的增長。

代碼和newFixedThreadPool一樣循環添加新的線程任務,我的電腦運行就會出現如下錯誤

An unrecoverable stack overflow has occurred.Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at com.learnConcurrency.executor.cachedThreadPool.Main.main(Main.java:11)Process finished with exit code -1073741571 (0xC00000FD)

關於SynchronousQueue隊列,它是一個沒有容量的阻塞隊列,任務傳遞的示意圖如下

java多線程系列:Executors框架

示例代碼

public class Main { public static void main(String[] args) throws Exception{ ExecutorService pool = Executors.newCachedThreadPool(); for (int i = 0; i < 8; i++) { int finalI = i + 1; pool.submit(() -> { try { System.out.println("任務"+ finalI +":開始等待60秒,時間:"+LocalTime.now()+",當前線程名:"+Thread.currentThread().getName()); Thread.sleep(60000); System.out.println("任務"+ finalI +":結束等待60秒,時間:"+LocalTime.now()+",當前線程名:"+Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }); //睡眠10秒 Thread.sleep(10000); } pool.shutdown(); }}

執行結果

任務1:開始等待60秒,時間:17:15:21.570,當前線程名:pool-1-thread-1任務2:開始等待60秒,時間:17:15:31.553,當前線程名:pool-1-thread-2任務3:開始等待60秒,時間:17:15:41.555,當前線程名:pool-1-thread-3任務4:開始等待60秒,時間:17:15:51.554,當前線程名:pool-1-thread-4任務5:開始等待60秒,時間:17:16:01.554,當前線程名:pool-1-thread-5任務6:開始等待60秒,時間:17:16:11.555,當前線程名:pool-1-thread-6任務7:開始等待60秒,時間:17:16:21.555,當前線程名:pool-1-thread-7任務1:結束等待60秒,時間:17:16:21.570,當前線程名:pool-1-thread-1任務2:結束等待60秒,時間:17:16:31.554,當前線程名:pool-1-thread-2任務8:開始等待60秒,時間:17:16:31.556,當前線程名:pool-1-thread-2任務3:結束等待60秒,時間:17:16:41.555,當前線程名:pool-1-thread-3任務4:結束等待60秒,時間:17:16:51.556,當前線程名:pool-1-thread-4任務5:結束等待60秒,時間:17:17:01.556,當前線程名:pool-1-thread-5任務6:結束等待60秒,時間:17:17:11.555,當前線程名:pool-1-thread-6任務7:結束等待60秒,時間:17:17:21.556,當前線程名:pool-1-thread-7任務8:結束等待60秒,時間:17:17:31.557,當前線程名:pool-1-thread-2

示例代碼中每個任務都睡眠60秒,每次循環添加任務睡眠10秒,從執行結果來看,添加的7個任務都是由不同的線程來執行,而此時線程1和2都執行完畢,任務8添加進來由之前創建的pool-1-thread-2執行。

newScheduledThreadPool方法

這個線程池主要用來延遲執行任務或者定期執行任務。

看看Executors工廠內部是如何實現的

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}

這裡返回的是ScheduledThreadPoolExecutor對象,我們繼續深入進去看看

public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());}

這裡調用的是父類的構造函數,ScheduledThreadPoolExecutor的父類是ThreadPoolExecutor,所以返回的也是ThreadPoolExecutor對象。核心線程數是傳入的參數corePoolSize,線程最大值是Integer的MAX_VALUE,存活時間時0,時間單位是納秒,隊列是DelayedWorkQueue。

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {}

下面是ScheduledExecutorService的一些方法

public interface ScheduledExecutorService extends ExecutorService { //delay延遲時間,unit延遲單位,只執行1次,在經過delay延遲時間之後開始執行 public ScheduledFuture> schedule(Runnable command,long delay, TimeUnit unit); public  ScheduledFuture schedule(Callable callable,long delay, TimeUnit unit); //首次執行時間時然後在initialDelay之後,然後在initialDelay+period 後執行,接著在 initialDelay + 2 * period 後執行,依此類推 public ScheduledFuture> scheduleAtFixedRate(Runnable command,long initialDelay, long period, TimeUnit unit); //首次執行時間時然後在initialDelay之後,然後延遲delay時間執行 public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);}

疑問解答

Runable接口和Callable接口

那麼就從提交任務入口看看吧

submit方法是由抽象類AbstractExecutorService實現的

public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, null); execute(ftask); return ftask;}public  Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask;}

可以看出將傳入的Runnable對象和Callable傳入一個newTaskFor方法,然後返回一個RunnableFuture對象

我們再來看看newTaskFor方法

protected  RunnableFuture newTaskFor(Runnable runnable, T value) { return new FutureTask(runnable, value);}protected  RunnableFuture newTaskFor(Callable callable) { return new FutureTask(callable);}

這裡都是調用FutureTask的構造函數,我們接著往下看

private Callable callable;public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; }public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; }

FutureTask類中有個成員變量callable,而傳入的Runnable對象則繼續調用Executors工廠類的callable方法返回一個Callable對象

public static  Callable callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter(task, result);}//適配器static final class RunnableAdapter implements Callable { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; }}

好了,到這裡也就真相大白了,Runnable對象經過一系列的方法調用,最終被RunnableAdapter適配器適配成Callable對象。方法調用圖如下

java多線程系列:Executors框架


分享到:


相關文章: