編程老司機帶你玩轉 CompletableFuture 異步編程

導讀

本文從實例出發,介紹 CompletableFuture 基本用法。不過講的再多,不如親自上手練習一下。所以建議各位小夥伴看完,上機練習一把,快速掌握 CompletableFuture。

全文摘要:

  • Future VS CompletableFuture
  • CompletableFuture 基本用法

原文鏈接https://juejin.im/post/5e6588fc6fb9a07cbe347747

0x00. 前言

一些業務場景我們需要使用多線程異步執行任務,加快任務執行速度。 Java 提供 Runnable Future 兩個接口用來實現異步任務邏輯。

雖然 Future 可以獲取任務執行結果,但是獲取方式十方不變。我們不得不使用Future#get 阻塞調用線程,或者使用輪詢方式判斷 Future#isDone 任務是否結束,再獲取結果。

這兩種處理方式都不是很優雅,JDK8 之前併發類庫沒有提供相關的異步回調實現方式。沒辦法,我們只好藉助第三方類庫,如 Guava,擴展 Future,增加支持回調功能。相關代碼如下:


編程老司機帶你玩轉 CompletableFuture 異步編程


雖然這種方式增強了 Java 異步編程能力,但是還是無法解決多個異步任務需要相互依賴的場景。

舉一個生活上的例子,假如我們需要出去旅遊,需要完成三個任務:

  • 任務一:訂購航班
  • 任務二:訂購酒店
  • 任務三:訂購租車服務

很顯然任務一和任務二沒有相關性,可以單獨執行。但是任務三必須等待任務一與任務二結束之後,才能訂購租車服務。

為了使任務三時執行時能獲取到任務一與任務二執行結果,我們還需要藉助 CountDownLatch 。


編程老司機帶你玩轉 CompletableFuture 異步編程


0x01. CompletableFuture

JDK8 之後,Java 新增一個功能十分強大的類:CompletableFuture。單獨使用這個類就可以輕鬆的完成上面的需求:


編程老司機帶你玩轉 CompletableFuture 異步編程


大家可以先不用管 CompletableFuture 相關 API,下面將會具體講解。

對比 Future,CompletableFuture 優點在於:

  • 不需要手工分配線程,JDK 自動分配
  • 代碼語義清晰,異步任務鏈式調用
  • 支持編排異步任務

怎麼樣,是不是功能很強大?接下來抓穩了,小黑哥要發車了。


編程老司機帶你玩轉 CompletableFuture 異步編程


1.1 方法一覽

首先來通過 IDE 查看下這個類提供的方法:


編程老司機帶你玩轉 CompletableFuture 異步編程


稍微數一下,這個類總共有 50 多個方法,我的天。。。


編程老司機帶你玩轉 CompletableFuture 異步編程


不過也不要怕,已經幫你們歸納好了,跟著我的節奏,帶你們掌握 CompletableFuture。


編程老司機帶你玩轉 CompletableFuture 異步編程


1.2 創建 CompletableFuture 實例

創建 CompletableFuture 對象實例我們可以使用如下幾個方法:


編程老司機帶你玩轉 CompletableFuture 異步編程


第一個方法創建一個具有默認結果的 CompletableFuture,這個沒啥好講。我們重點講述下下面四個異步方法。

前兩個方法 runAsync 不支持返回值,而 supplyAsync可以支持返回結果。

這個兩個方法默認將會使用公共的 ForkJoinPool 線程池執行,這個線程池默認線程數是 CPU 的核數。

可以設置 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設置 ForkJoinPool 線程池的線程數

使用共享線程池將會有個弊端,一旦有任務被阻塞,將會造成其他任務沒機會執行。所以強烈建議使用後兩個方法,根據任務類型不同,主動創建線程池,進行資源隔離,避免互相干擾。

1.3 設置任務結果

CompletableFuture 提供以下方法,可以主動設置任務結果。

<code> boolean complete(T value)
boolean completeExceptionally(Throwable ex)
複製代碼/<code>

第一個方法,主動設置 CompletableFuture 任務執行結果,若返回 true,表示設置成功。如果返回 false,設置失敗,這是因為任務已經執行結束,已經有了執行結果。

示例代碼如下:

<code>// 執行異步任務 

CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
System.out.println("cf 任務執行開始");
sleep(10, TimeUnit.SECONDS);
System.out.println("cf 任務執行結束");
return "小黑哥";
});
//
Executors.newSingleThreadScheduledExecutor().execute(() -> {
sleep(5, TimeUnit.SECONDS);
System.out.println("主動設置 cf 任務結果");
// 設置任務結果,由於 cf 任務未執行結束,結果返回 true
cf.complete("程序通事");
});
// 由於 cf 未執行結束,將會被阻塞。5 秒後,另外一個線程主動設置任務結果
System.out.println("get:" + cf.get());
// 等待 cf 任務執行結束
sleep(10, TimeUnit.SECONDS);
// 由於已經設置任務結果,cf 執行結束任務結果將會被拋棄
System.out.println("get:" + cf.get());
/***
* cf 任務執行開始
* 主動設置 cf 任務結果
* get:程序通事
* cf 任務執行結束
* get:程序通事
*/
複製代碼/<code>

這裡需要注意一點,一旦 complete 設置成功,CompletableFuture 返回結果就不會被更改,即使後續 CompletableFuture 任務執行結束。

第二個方法,給 CompletableFuture 設置異常對象。若設置成功,如果調用 get 等方法獲取結果,將會拋錯。

示例代碼如下:

<code>// 執行異步任務
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
System.out.println("cf 任務執行開始");
sleep(10, TimeUnit.SECONDS);
System.out.println("cf 任務執行結束");
return "樓下小黑哥";
});
//
Executors.newSingleThreadScheduledExecutor().execute(() -> {
sleep(5, TimeUnit.SECONDS);
System.out.println("主動設置 cf 異常");
// 設置任務結果,由於 cf 任務未執行結束,結果返回 true
cf.completeExceptionally(new RuntimeException("啊,掛了"));
});
// 由於 cf 未執行結束,前 5 秒將會被阻塞。後續程序拋出異常,結束
System.out.println("get:" + cf.get());
/***
* cf 任務執行開始
* 主動設置 cf 異常
* java.util.concurrent.ExecutionException: java.lang.RuntimeException: 啊,掛了
* ......
*/
複製代碼/<code>

1.4 CompletionStage

CompletableFuture 分別實現兩個接口 Future與 CompletionStage。


編程老司機帶你玩轉 CompletableFuture 異步編程


Future 接口大家都比較熟悉,這裡主要講講 CompletionStage。

CompletableFuture 大部分方法來自CompletionStage 接口,正是因為這個接口,CompletableFuture才有如從強大功能。

想要理解 CompletionStage 接口,我們需要先了解任務的時序關係的。我們可以將任務時序關係分為以下幾種:

  • 串行執行關係
  • 並行執行關係
  • AND 匯聚關係
  • OR 匯聚關係

1.5 串行執行關係

任務串行執行,下一個任務必須等待上一個任務完成才可以繼續執行。


編程老司機帶你玩轉 CompletableFuture 異步編程


CompletionStage 有四組接口可以描述串行這種關係,分別為:


編程老司機帶你玩轉 CompletableFuture 異步編程


thenApply 方法需要傳入核心參數為 Function類型。這個類核心方法為:

<code> R apply(T t)
複製代碼/<code>

所以這個接口將會把上一個任務返回結果當做入參,執行結束將會返回結果。

thenAccept 方法需要傳入參數對象為 Consumer類型,這個類核心方法為:

<code>void accept(T t)
複製代碼/<code>

返回值 void 可以看出,這個方法不支持返回結果,但是需要將上一個任務執行結果當做參數傳入。

thenRun 方法需要傳入參數對象為 Runnable 類型,這個類大家應該都比較熟悉,核心方法既不支持傳入參數,也不會返回執行結果。

thenCompose 方法作用與 thenApply 一樣,只不過 thenCompose 需要返回新的 CompletionStage。這麼理解比較抽象,可以集合代碼一起理解。


編程老司機帶你玩轉 CompletableFuture 異步編程


方法中帶有 Async ,代表可以異步執行,這個系列還有重載方法,可以傳入自定義的線程池,上圖未展示,讀者只可以自行查看 API。

最後我們通過代碼展示 thenApply 使用方式:

<code>CompletableFuture<string> cf
= CompletableFuture.supplyAsync(() -> "hello,樓下小黑哥")// 1
.thenApply(s -> s + "@程序通事") // 2
.thenApply(String::toUpperCase); // 3
System.out.println(cf.join());
// 輸出結果 HELLO,樓下小黑哥@程序通事
複製代碼/<string>/<code>

這段代碼比較簡單,首先我們開啟一個異步任務,接著串行執行後續兩個任務。任務 2 需要等待任務1 執行完成,任務 3 需要等待任務 2。

上面方法,大家需要記住了 Function,Consumer,Runnable 三者區別,根據場景選擇使用。

1.6 AND 匯聚關係

AND 匯聚關係代表所有任務完成之後,才能進行下一個任務。


編程老司機帶你玩轉 CompletableFuture 異步編程


如上所示,只有任務 A 與任務 B 都完成之後,任務 C 才會開始執行。

CompletionStage 有以下接口描述這種關係。


編程老司機帶你玩轉 CompletableFuture 異步編程


thenCombine 方法核心參數 BiFunction ,作用與 Function一樣,只不過 BiFunction 可以接受兩個參數,而 Function 只能接受一個參數。

thenAcceptBoth 方法核心參數BiConsumer 作用也與 Consumer一樣,不過其需要接受兩個參數。

runAfterBoth 方法核心參數最簡單,上面已經介紹過,不再介紹。

這三組方法只能完成兩個任務 AND 匯聚關係,如果需要完成多個任務匯聚關係,需要使用 CompletableFuture#allOf,不過這裡需要注意,這個方法是不支持返回任務結果。

AND 匯聚關係相關示例代碼,開頭已經使用過了,這裡再粘貼一下,方便大家理解:


編程老司機帶你玩轉 CompletableFuture 異步編程


1.7 OR 匯聚關係

有 AND 匯聚關係,當然也存在 OR 匯聚關係。OR 匯聚關係代表只要多個任務中任一任務完成,就可以接著接著執行下一任務。


編程老司機帶你玩轉 CompletableFuture 異步編程


CompletionStage 有以下接口描述這種關係:


編程老司機帶你玩轉 CompletableFuture 異步編程


前面三組接口方法傳參與 AND 匯聚關係一致,這裡也不再詳細解釋了。

當然 OR 匯聚關係可以使用 CompletableFuture#anyOf 執行多個任務。

下面示例代碼展示如何使用 applyToEither 完成 OR 關係。

<code>CompletableFuture<string> cf
= CompletableFuture.supplyAsync(() -> {
sleep(5, TimeUnit.SECONDS);
return "hello,樓下小黑哥";
});// 1

CompletableFuture<string> cf2 = cf.supplyAsync(() -> {
sleep(3, TimeUnit.SECONDS);
return "hello,程序通事";
});
// 執行 OR 關係
CompletableFuture<string> cf3 = cf2.applyToEither(cf, s -> s);

// 輸出結果,由於 cf2 只休眠 3 秒,優先執行完畢
System.out.println(cf2.join());
// 結果:hello,程序通事
複製代碼/<string>/<string>/<string>/<code>

1.8 異常處理

CompletableFuture 方法執行過程若產生異常,當調用 get,join獲取任務結果才會拋出異常。


編程老司機帶你玩轉 CompletableFuture 異步編程


上面代碼我們顯示使用 try..catch 處理上面的異常。不過這種方式不太優雅,CompletionStage 提供幾個方法,可以優雅處理異常。


編程老司機帶你玩轉 CompletableFuture 異步編程


exceptionally 使用方式類似於 try..catch 中 catch代碼塊中異常處理。

whenComplete 與 handle 方法就類似於 try..catch..finanlly 中 finally 代碼塊。無論是否發生異常,都將會執行的。這兩個方法區別在於 handle 支持返回結果。

下面示例代碼展示 handle 用法:CompletableFuture<integer> f0 = CompletableFuture.supplyAsync(() -> (7 / 0)) .thenApply(r -> r * 10) .handle((integer, throwable) -> { // 如果異常存在,打印異常,並且返回默認值 if (throwable != null) { throwable.printStackTrace(); return 0; } else { // 如果 return integer; } });System.out.println(f0.join());/** *java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero * ..... * * 0 */複製代碼/<integer>

0x02. 總結

JDK8 提供 CompletableFuture 功能非常強大,可以編排異步任務,完成串行執行,並行執行,AND 匯聚關係,OR 匯聚關係。

不過這個類方法實在太多,且方法還需要傳入各種函數式接口,新手剛開始使用會直接會被弄懵逼。這裡幫大家在總結一下三類核心參數的作用

  • Function 這類函數接口既支持接收參數,也支持返回值
  • Consumer 這類接口函數只支持接受參數,不支持返回值
  • Runnable 這類接口不支持接受參數,也不支持返回值

搞清楚函數參數作用以後,然後根據串行,AND 匯聚關係,OR 匯聚關係歸納一下相關方法,這樣就比較好理解了

最後再貼一下,文章開頭的思維導圖,希望對你有幫助。


編程老司機帶你玩轉 CompletableFuture 異步編程


0x03. 幫助文檔

  1. 極客時間-併發編程專欄
  2. colobu.com/2016/02/29/…
  3. www.ibm.com/developerwo…

最後說一句(求關注)

CompletableFuture 很早之前就有關注,本以為跟 Future一樣,使用挺簡單,誰知道學的時候才發現好難。各種 API 方法看的頭有點大。

後來看到極客時間-『併發編程』專欄使用歸納方式分類 CompletableFuture 各種方法,一下子就看懂了。所這篇文章也參考這種歸納方式。

這篇文章找資料,整理一個星期,幸好今天順利產出。

才疏學淺,難免會有紕漏,如果你發現了錯誤的地方,還請你留言給我指出來,我對其加以修改。



分享到:


相關文章: