Java多線程開發之Callable與線程池

Java多線程開發之Callable與線程池

鎮樓小姐姐

可獲得兩大新人禮包

36份一線互聯網Java面試電子書

84個Java稀缺面試題視頻

一、前言

我們常見的創建線程的方式有 2 種:繼承 Thread 和 實現 Runnable 接口。

其實,在 JDK 中還提供了另外 2 種 API 讓開發者使用。

二、簡單介紹

2.1 Callable

Java 5.0 在 java.util.concurrent 提供了一個新的創建執行線程的方式: 實現 Callable 接口。

Callable 接口類似於 Runnable,但是 Runnable 不會返回結果,並且無法拋出經過檢查的異常,而 Callable 依賴 FutureTask 類獲取返回結果。

代碼演示:

public class CallableTest {

public static void main(String[] args) throws Exception {

MyThread mt = new MyThread();

FutureTask result = new FutureTask(mt);

new Thread(result).start();

// 獲取運算結果是同步過程,即 call 方法執行完成,才能獲取結果

Integer sum = result.get();

System.out.println(sum);

}

}

class MyThread implements Callable {

@Override

public Integer call() throws Exception {

int sum = 0;

for (int i = 1; i <= 100; i++) {

sum += i;

}

return sum;

}

}

當某個請求需要在後端完成 N 次統計結果時,我們就可以使用該方式創建 N 個線程進行(並行)統計,而不需要同步等待其他統計操作完成後才統計另一個結果。

2.2 線程池

第四種獲取線程的方法:線程池。

通過重用現有的線程而不是創建新的線程可以在處理多個請求時分攤在線程創建和銷燬過程中產生的巨大開銷,同時當請求到達時,工作線程已經存在,因此不會由於等待創建線程而延遲任何的執行,從而提高系統的響應性。

2.2.1 線程池體系結構

線程池體系結構:

Java多線程開發之Callable與線程池

2.2.2 ThreadPoolExecutor API

ThreadPoolExecutor 用於創建線程池,它有 4 個重載構造器,我們以最多參數的構造器講解:

ThreadPoolExecutor(int corePoolSize, # 線程池核心線程個數,默認線程池線程個數為 0,只有接到任務才新建線程


int maximumPoolSize, # 線程池最大線程數量


long keepAliveTime, # 線程池空閒時,線程存活的時間,當線程池中的線程數大於 corePoolSize 時才會起作用


TimeUnit unit, # 時間單位


BlockingQueue workQueue, # 阻塞隊列,當達到線程數達到 corePoolSize 時,將任務放入隊列等待線程處理


ThreadFactory threadFactory, # 線程工廠


  1. RejectedExecutionHandler handler) # 線程拒絕策略,當隊列滿了並且線程個數達到 maximumPoolSize 後採取的策略

阻塞隊列有以下 4 種:

ArrayBlockingQueue:基於數組、有界,按 FIFO(先進先出)原則對元素進行排序;


LinkedBlockingQueue:基於鏈表,按FIFO (先進先出) 排序元素,吞吐量通常要高於 ArrayBlockingQueue;


SynchronousQueue:每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於 LinkedBlockingQueue;


  1. PriorityBlockingQueue:具有優先級的、無限阻塞隊列。

線程拒絕策略有以下 4 種:

CallerRunsPolicy:如果發現線程池還在運行,就直接運行這個線程;


DiscardOldestPolicy:在線程池的等待隊列中,將頭取出一個拋棄,然後將當前線程放進去;


DiscardPolicy:默默丟棄,不拋出異常;


  1. AbortPolicy:java默認,拋出一個異常(RejectedExecutionException)。

實現原則:

如果當前池大小 poolSize 小於 corePoolSize ,則創建新線程執行任務;


如果當前池大小 poolSize 大於 corePoolSize ,且等待隊列未滿,則進入等待隊列;


如果當前池大小 poolSize 大於 corePoolSize 且小於 maximumPoolSize ,且等待隊列已滿,則創建新線程執行任務;


如果當前池大小 poolSize 大於 corePoolSize 且大於 maximumPoolSize ,且等待隊列已滿,則調用拒絕策略來處理該任務;


  1. 線程池裡的每個線程執行完任務後不會立刻退出,而是會去檢查下等待隊列裡是否還有線程任務需要執行,如果在 keepAliveTime 裡等不到新的任務了,那麼線程就會退出。

2.2.3 內置線程池

在 java.util.concurrent 包中已經提供為大多數使用場景的內置線程池:

Executors.newSingleThreadExecutor() # 單條線程


Executors.newFixedThreadPool(int n) # 固定數目線程的線程池


Executors.newCachedThreadPool() # 創建一個可緩存的線程池,調用execute 將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則創建一個新線程並添加到池中。終止並從緩存中移除那些已有 60 秒鐘未被使用的線程。


  1. Executors.newScheduledThreadPool(int n) # 支持定時及週期性的任務執行的線程池,多數情況下可用來替代 Timer 類。

上述 4 種線程池底層都是通過創建 ThreadPoolExecutor 獲取線程池。

1) newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {

return new Executors.FinalizableDelegatedExecutorService(

new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())

);

}

主要用於串行(順序執行)操作場景。

2) newFixedThreadPool

public static ExecutorService newFixedThreadPool(int var0) {

return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());

}

主要用於負載比較重的場景,為了資源的合理利用,需要限制當前線程數量。

3) newCachedThreadPool

public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());

}

主要用於併發執行大量短期的小任務,或者是負載較輕的服務器。

4) newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int var0) {

return new ScheduledThreadPoolExecutor(var0);

}

其中,ScheduledExecutorService 繼承 ThreadPoolExecutor。

2.2.4 提交任務

獲取 ExecutorService 對象後,我們需要提交任務來讓線程池中的線程執行,提交任務的方法有 2 種:

void execute():提交不需要返回值的任務

Future submit(): 提交需要返回值的任務

代碼演示:

// 創建 1 個線程

ExecutorService service = Executors.newSingleThreadExecutor();

service.execute(new Runnable() {

@Override

public void run() {

System.out.println(Thread.currentThread().getName() + ":hello world");

}

});

// 關閉線程池

service.shutdown();

// 創建 5 個線程

ExecutorService service = Executors.newFixedThreadPool(5);

List> list = new ArrayList<>(5);

for (int i = 0; i < 5; i++) {

Future future = service.submit(new Callable() {

@Override

public String call() throws Exception {

return Thread.currentThread().getName() + ":hello world";

}

});

list.add(future);

}

// 打印結果

for (Future future : list) {

System.out.println(future.get());

}

// 關閉線程池

service.shutdown();

// 創建 1 個線程

ScheduledExecutorService service = Executors.newScheduledThreadPool(1);

service.schedule(new Runnable() {

@Override

public void run() {

System.out.println(Thread.currentThread().getName() + ":hello world");

}

}, 2, TimeUnit.SECONDS);

其他線程池使用方式類似,此處不再列舉代碼。


分享到:


相關文章: