使用Java8的CompletableFuture實現函數式的回調

最近,在準備一個關於 Java 並行流相關的演講時,我意識到“The Free Lunch is Over”(TFLiO)這篇經典的文章已經有超過十年的歷史了。對於大多數程序員來說,這篇文章的廣泛傳播使他們第一次認識到持續四十年的處理器呈指數增長的趨勢將要終結——實際上,它已經終結了。取而代之的是另外一種趨勢,那就是在每個芯片上增加處理器的數量,按照 Herb Sutter 的話來講,程序員必須要“從根本上轉向併發”。

在 TFLiO 這篇文章中,Sutter 觀察到絕大多數的程序員“並沒有深刻了解併發”,但是他接著說到“一旦瞭解之後,其實基於鎖的編程並不會比 OO 難太多”。毫無疑問,第一句話是完全正確的,但是十年來,基於鎖的併發體驗並沒有證明第二句話的正確性。幸好,Java 程序員可以在很大程度上避免這樣的驗證,因為 TFLiO 發佈的時候,Java 5 也剛剛可用,它所提供的一些高層級的併發工具開始得到運用。藉助它們,能夠讓 Java 開發人員避免細粒度地分析同步功能和關鍵的代碼片段。

併發與並行

Java 5 併發庫主要關注於異步任務的處理,它採用了這樣一種模式,producer 線程創建任務並且利用阻塞隊列將其傳遞給任務的 consumer。這種模型在 Java 7 和 8 中進一步發展,並且開始支持另外一種風格的任務執行,那就是將任務的數據集分解為子集,每個子集都可以由獨立且同質的子任務來負責處理。

這種風格的基礎庫也就是 fork/join 框架,它允許程序員規定數據集該如何進行分割,並且支持將子任務提交到默認的標準線程池中,也就是“通用的”ForkJoinPool。(在本文中,非全限定的類和接口名指的都是 java.util.concurrent 包中的類型。)在 Java 8 中,fork/join 並行功能借助並行流的機制變得更加具有可用性。但是,不是所有的問題都適合這種風格的並行處理:所處理的元素必須是獨立的,數據集要足夠大,並且在並行加速方面,每個元素的處理成本要足夠高,這樣才能補償建立 fork/join 框架所消耗的成本。

同時,Java 8 在並行流方面的革新得到了廣泛的關注,這導致大家忽略了併發庫中一項新增的重要功能,那就是CompletableFuture類。本文將會探討CompletableFuture類,有一些系統會依賴於不同類型的異步執行任務,本文將會闡述該類為什麼會對這種類型的系統如此重要,並介紹了它是如何補充 fork/join 風格的並行機制和並行流的。

頁面渲染器

我們的起始點將是“Java Concurrency in Practice”(JCiP)一書中的樣例,這個樣例非常經典地闡述了 Java 5 中的併發工具類。在 JCiP 的第 6.3 節中,Brian Goetz 探討了如何開發一個 Web 頁面的渲染器,對於每個頁面來說,它的任務就是渲染文本,並下載和渲染圖片。圖片的下載會耗費較長的時間,在這段時間內 CPU 無事可做,只能等待。所以,在渲染頁面時,一個很明顯的策略就是首先初始化所有圖片的下載,然後利用它們完成之前的這段時間渲染頁面文本,最後渲染下載的圖片。

在 JCiP 中,第一版本的頁面渲染器使用了Future的理念,這個接口暴露了一些方法,允許客戶端監控任務的執行進度,這裡的任務是在一個不同的進程中執行的。在程序清單 1 中,Callable代表了下載頁面中所有圖片的任務,它被提交到了 Executor 中,然後返回一個 Future 對象,通過它就能詢問下載任務的狀態。當主線程渲染完頁面的文本後,會調用Future.get方法,這個方法會一直阻塞直到所有下載的結果均可用為止,在本例中這個結果是以List<imagedata>的形式來表示的。這種方式一個很明顯的缺點在於下載任務的粗粒度性,在所有的圖片下載完成之前,我們一張圖片也不能渲染。接下來,我們看一下如何緩解這個問題。/<imagedata>

public void renderPage(CharSequence source) {
List<imageinfo> info = scanForImageInfo(source);
// 創建 Callable,它代表了下載所有的圖片
final Callable<list>> task = () ->
info.stream()
.map(ImageInfo::downloadImage)
.collect(Collectors.toList());
// 將下載任務提交到 executor
Future<list>> images = executor.submit(task);
// renderText(source);
try {
// 獲得所有下載的圖片(在所有圖片可用之前會一直阻塞)
final List<imagedata> imageDatas = images.get();
// 渲染圖片
imageDatas.forEach(this::renderImage);
} catch (InterruptedException e) {
// 重新維護線程的中斷狀態
Thread.currentThread().interrupt();
// 我們不需要結果,所以取消任務
images.cancel(true);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause()); }
}
/<imagedata>/<list>/<list>/<imageinfo>

程序清單 1:使用 Future 等待所有的圖片下載完成

為了讓這個樣例及其後面的變種易於管理,這裡有一個前提條件:我們假設類型ImageInfo(簡單來講,就是一個 URL)和ImageData(圖片的二進制數據)以及方法scanForImageInfo、downloadImage、renderText、renderImage、launderThrowable和ImageInfo.downloadImage都已經存在了。實例變量executor是通過ExecutorService類型聲明的並進行了恰當的初始化。在本文中,我將 JCiP 中最初的樣例利用 Java 8 lambda 表達式和流進行了現代化。

在這段代碼中,必須要等待所有下載都完成的原因在於,它使用Future接口來代表下載任務,作為異步執行的任務模型,它有很大的侷限性。Future允許客戶端詢問任務執行的結果,如果必要的話,將會產生阻塞,另外還可以詢問任務的狀態,判斷它已經完成還是被取消了。但是,Future本身並不能提供回調方法,假設能夠這樣做的話,當每個圖片下載完成的時候,就能通知頁面的渲染線程了。

程序清單 2 改善了之前樣例中粒度較粗的問題,它將頁面下載的任務提交到了CompletionService類中,這個類的poll和take方法會產生對應的Future實例,這些實例是按照任務完成的順序排列的,而不是像程序清單 1 那樣任務是按照提交的順序處理的。在ExecutorCompletionService接口的平臺實現中,為了實現該功能,每項任務都會包裝在一個FutureTask中,FutureTask是Future的一個實現,它允許提供完成時的回調。Future 的回調行為是在 ExecutorCompletionService 中創建的,完成的任務會封裝到一個隊列中,供客戶端詢問時使用。

 public void renderPage(CharSequence source) { 
List<imageinfo> info = scanForImageInfo(source);
CompletionService<imagedata> completionService =
new ExecutorCompletionService<>(executor);
// 將每個下載任務提交到 completion service 中
info.forEach(imageInfo ->
completionService.submit(imageInfo::downloadImage));
renderText(source);
// 當每個 RunnableFuture 可用時(並且我們也準備處理它的時候),

// 將它們檢索出來
for (int t = 0; t < info.size(); t++) {
Future<imagedata> imageFuture = completionService.take();
renderImage(imageFuture.get());
}
}
/<imagedata>/<imagedata>/<imageinfo>

程序清單 2:藉助CompletionService,當圖片可用時立即將其渲染出來(為了保持簡潔性,省略掉了中斷和錯誤處理的代碼)

CompletableFuture 簡介

程序清單 2 代表了 Java 5 所能達到的水準,不過 2014 年之後,在 Java 中,編寫異步系統的表現性得到了巨大的提升,這是通過引入CompletableFuture (CF)類實現的。這個類是Future的實現,它能夠將回調放到與任務不同的線程中執行,也能將回調作為繼續執行的同步函數,在與任務相同的線程中執行。它避免了傳統回調最大的問題,那就是能夠將控制流分離到不同的事件處理器中,而這是通過允許CF實例與回調方法進行組合形成新的CF來實現的。

作為樣例,可以參考thenAccept方法,它接受一個 Consumer(用戶提供的且沒有返回值的函數)並返回一個新的CF。這個新CF所能達到的效果就是在最初CF完成時所得到的結果上,運用Consumer。與很多其他的CF方法類似,thenAccept有兩個變種形式,在第一個中,Consumer會由通用 fork/join 池中的某一個線程來執行;在第二個中,它會由Executor中的某一個線程來負責執行,而Executor是我們在調用時提供的。這形成了三種重載形式:同步運行、在ForkJoinPool中異步運行以及在調用時所提供的線程池中異步運行,CompletableFuture中有近 60 個方法,上述的這三種重載形式佔了絕大多數。

如下是thenAccept的一個樣例,藉助它重新實現了頁面渲染器的功能:

public void renderPage(CharSequence source) { 
List<imageinfo> info = scanForImageInfo(source);
info.forEach(imageInfo ->
CompletableFuture
.supplyAsync(imageInfo::downloadImage)
.thenAccept(this::renderImage));
renderText(source);
}
/<imageinfo>

程序清單 3:使用CompletableFuture來實現頁面渲染功能

儘管程序清單 3 比前面的形式更加簡潔,但是我們需要練習一下才能更好地閱讀它。工廠方法supplyAsync返回一個新的CF,它會在通用的 ForkJoinPool中運行指定的Supplier,完成時,Supplier的結果將會作為CF的結果。方法thenAccept會返回一個新的CF,它將會執行指定的Consumer,在本例中也就是渲染給定的圖片,即supplyAsync方法所產生的CF的結果。

需要澄清的是,thenAccept並不是將CF與函數組合起來的唯一方式。將CF與函數組合起來可以接受如下的參數:

  • 應用於CF操作結果的函數。此時,可以採用的方法包括:
  • thenCompose:針對返回值為CompletableFuture的函數;
  • thenApply:針對返回值為其他類型的函數;
  • thenAccept:針對返回值為 void 的函數;
  • Runnable。通過thenRun方法,可以接受Runnable參數;
  • 函數在處理的過程中,可能正常結束也可能異常退出。CF能夠通過方法來分別組合這兩種情況:
  • handle,針對接受一個值和一個 Throwable,並有返回值的函數;
  • whenComplete,針對接受一個值和一個 Throwable,並返回 void 的函數。

擴展頁面渲染器

擴展該樣例能夠闡述CompletableFuture的其他特性。比如,當圖片下載超時或失敗時,我們想使用一個圖標作為可見的指示器。CF暴露了一個名為get(long, TimeUnit)的方法,如果 CF在指定的時間內沒有完成的話,將會拋出TimeoutException異常。我們可以使用它來定義一個函數,這個函數會將 ImageInfo轉換為ImageData (程序清單 4)。

Function<imageinfo> infoToData = imageInfo -> { 
CompletableFuture<imagedata> imageDataFuture =
CompletableFuture.supplyAsync(imageInfo::downloadImage, executor);
try {
return imageDataFuture.get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
imageDataFuture.cancel(true);
return ImageData.createIcon(e);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
} catch (TimeoutException e) {

return ImageData.createIcon(e);
}
}
/<imagedata>/<imageinfo>

程序清單 4:使用CompletableFuture.get來實現超時

現在,頁面可以通過連續調用infoToData來進行渲染。其中每個調用都會同步返回一個下載的圖片,所以要並行下載的話,需要為它們各自創建一個新的異步任務。要實現這一功能,合適的工廠方法是CompletableFuture.runAsync(),它與supplyAsync類似,但是接受的參數是Runnable而不是Supplier:

public void renderPage(CharSequence source) throws InterruptedException { 
List<imageinfo> info = scanForImageInfo(source);
info.forEach(imageInfo ->
CompletableFuture.runAsync(() ->
renderImage(infoToData.apply(imageInfo)), executor));
}
/<imageinfo>

現在,我們考慮進一步的需求,當所有的請求完成或超時後,在頁面上顯示一個指示器,如果對應的所有CompletableFuture都從join方法中返回,就能表示出現了這種場景。靜態方法allOf就是為這種需求而提供的,它能夠創建一個返回值為空的CompletableFuture,當其所有的組件均完成時,它也會達到完成狀態。(join方法通常用來返回某個CF的結果,為了查看allOf方法所組合起來的所有CF的結果,必須要對其進行單獨地查詢。)

public void renderPage(CharSequence source) { 
List<imageinfo> info = scanForImageInfo(source);

CompletableFuture[] cfs = info.stream()
.map(ii -> CompletableFuture.runAsync(
() -> renderImage(mapper.apply(ii)), executor))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(cfs).join();
renderImage(ImageData.createDoneIcon());
}
/<imageinfo>

聯合多個 CompletableFuture

另外一組方法允許將多個CF聯合在一起。我們已經看見過靜態方法allOf,當其所有的組件均完成時,它就會處於完成狀態,與之對應的方法也就是anyOf,返回值同樣是 void,當其任意一個組件完成時,它就會完成。除了這兩個方法以外,這個組中其他的方法都是實例方法,它們能夠將 receiver 按照某種方式與另外一個CF聯合在一起,然後將結果傳遞到給定的函數中。

為了展現它們是如何運行的,我們擴展一下 JCiP 中的另一個例子,這是一個旅行預訂的門戶,我們將互相關聯的訂購過程記錄在TripPlan對象中,它包含了總價以及所使用服務供應商的列表:

 interface TripPlan { 
List<servicesupplier> getSuppliers();
int getPrice();
TripPlan combine(TripPlan);
}
/<servicesupplier>

ServiceSupplier(比如說某個航線或酒店)能夠創建一個TripPlan:(當然,在現實中,ServiceSupplier.createPlan 將會接受參數,來反映對應的目的地、旅行等級等信息。)

interface ServiceSupplier { 
TripPlan createPlan();
String getAlliance(); // 稍後使用
}

為了選擇最佳的旅行計劃,需要查詢每個服務供應商為我們的旅行所給定的規劃,然後使用Comparator來對比每個規劃結果,這個Comparator反映了我們的選擇標準(在本例中,只是簡單的選擇價格最低者):

TripPlan selectBestTripPlan(List<servicesupplier> serviceList) { 
List<completablefuture>> tripPlanFutures = serviceList.stream()
.map(svc -> CompletableFuture.supplyAsync(svc::createPlan, executor))
.collect(toList());
return tripPlanFutures.stream()
.min(Comparator.comparing(cf -> cf.join().getPrice()))
.get().join();
}
/<completablefuture>/<servicesupplier>

請注意中間的collect操作,在流處理裡面,由於中間操作的延遲性(laziness of intermediate operation),它就變得非常必要了。如果沒有它的話,流的終止操作(terminal operation)將會是min,它如果要執行的話,首先需要針對tripPlanFutures的每個元素執行join操作。如上述的代碼所示,我們並沒有這樣做,終止操作是collect,它會將map操作所形成的CF值累積起來,這個過程中沒有阻塞,因此允許底層的任務併發執行。

如果獲取航線和酒店最佳旅行計劃的任務是獨立的,那麼我們會希望它們能夠同時初始化,就像前文所述的圖片下載一樣。要將兩個CF按照這種方式聯合在一起,我們需要使用CompletableFuture.thenCombine方法,它會並行地執行 receiver 以及所提供的CF,然後將它們的結果使用給定的函數組合起來(在這裡,假設變量 airlines、hotels和(稍後使用的)cars都是以List<travelservice> 類型進行聲明的,並且已經進行了恰當的初始化):/<travelservice>

CompletableFuture 
.supplyAsync(() -> selectBestTripPlan(airlines))
.thenCombine(
CompletableFuture.supplyAsync(() -> selectBestTripPlan(hotels)),
TripPlan::combine);

對這個樣例進行擴展,我們將會學到更多的內容。假設每個服務供應商都屬於某一個旅行聯盟(travel alliance),通過String類型的屬性alliance來表示。在獨立訂購完航線和酒店後,我們將會確定它們是否屬於同一個聯盟,如果是的話,那麼只有屬於同一聯盟的租車服務,才在我們的考慮範圍之內:

 private TripPlan addCarHire(TripPlan p) { 
List<string> alliances = p.getSuppliers().stream()
.map(ServiceSupplier::getAlliance)
.distinct()
.collect(toList());
if (alliances.size() == 1) {
return p.combine(selectBestTripPlan(cars, alliances.get(0)));
} else {
return p.combine(selectBestTripPlan(cars));
}
}
/<string>

selectBestTripPlan方法新的重載形式將會接受一個String類型作為偏愛的聯盟,如果這個值存在的話,會使用它來過濾流中的服務:

 private TripPlan selectBestTripPlan( 
List<servicesupplier> serviceList, String favoredAlliance) {
List<completablefuture>> tripPlanFutures = serviceList.stream()
.filter(ts ->
favoredAlliance == null || ts.getAlliance().equals(favoredAlliance))
.map(svc -> CompletableFuture.supplyAsync(svc::createPlan, executor))
.collect(toList());
...
}

/<completablefuture>/<servicesupplier>

在本例中,選擇租車服務的CF要依賴於航班和酒店預訂任務組合所形成的CF。只有航班和酒店都預訂之後,它才能完成。實現這種關聯關係的方法就是thenCompose:

CompletableFuture.supplyAsync(() -> selectBestTripPlan(airlines)) 
.thenCombine(
CompletableFuture.supplyAsync(() -> selectBestTripPlan(hotels)),
TripPlan::combine)
.thenCompose(p -> CompletableFuture.supplyAsync(() -> addCarHire(p)));

預訂航班和酒店聯合形成的CF會執行,並且它的結果,也就是聯合後的TripPlan,將會作為thenCompose函數參數的輸入。結果形成的CF非常簡潔地封裝了不同異步服務之間的依賴關係。這段代碼如此簡潔的原因在於,儘管thenCompose聯合了兩個CF,但是它所返回的並不是我們預期的CompletableFuture<completablefuture>>,而是CompletableFuture<tripplan>。所以,不管在創建CF的時候使用了多少層級的組合,它並不是嵌套的,而是扁平的,要獲取它的結果只需要一步操作。這是 monad“綁定(bind)”操作(這個名稱來源於 Haskell)的特性,CF就是這種 monad,並且闡明瞭 monad 一些非常積極的特徵:比如,在本例中,我們能夠按照函數式的形式進行編寫,如果沒有這項功能的話,就需要在各個回調中非常繁瑣地顯式編寫任務定義。/<tripplan>/<completablefuture>

thenCombine方法只是將兩個CF聯合起來的方法之一,其他的方法包括:

  • thenAcceptBoth:與thenCombine類似,但是它接受一個返回值為 void 的函數;
  • runAfterBoth:接受一個 Runnable,在兩個CF都完成後執行;
  • applyToEither:接受一個一元函數(unary function),會將首先完成的 CF 的結果提供給它;
  • acceptEither:與 applyToEither 類似,接受一個一元函數,但是結果為 void;
  • runAfterEither:接受一個 Runnable,在其中一個 CF 完成後就執行。

結論

我們不可能在一篇短文中,完整地闡述像CompletableFuture這樣的 API,但是我希望這裡的樣例能夠讓你對它所能實現的併發編程形式有一個直觀印象。將CompletableFuture與其他CompletableFuture組合,以及與其他函數組合,能夠為多項任務構建類似流水線的方案,這樣能夠控制同步和異步執行以及它們之間的依賴。你想更加詳細瞭解的內容可能會包括異常處理、選擇和配置 executor 的實際經驗以及設計異步 API 所面臨的挑戰。

我希望已經解釋清楚了 Java 8 所提供的兩種異步編程風格之間的聯繫。在使用 fork/join 並行機制(包括並行流)的場景中,能夠非常高效的將工作內容進行跨核心分發。但是,它的適用條件卻非常有限:數據集很大並且能夠高效地分割,對某個數據元素的操作與其他元素是(相對)獨立的,這些操作的成本應該是比較高昂的,並且應該是 CPU 密集型的。如果這些條件無法滿足的話,尤其是如果你的任務會花費很多時間阻塞在 I/O 或網絡請求上的話,那麼 CompletableFuture是更好的替代方案。作為 Java 程序員,我們非常幸運地有這樣一個平臺庫,它將這些補充的方式集成在了一起。

關於作者

Maurice Naftalin在軟件領域已經工作了四十年,擔任過開發者、研究員、培訓師以及圖書作者。他是一位 Java Champion,曾經作為作者和合著者編寫過介紹 Java 5 和 Java 8 特性的圖書,並且是 2013 和 2014 年度的 JavaOne Rockstar。Maurice 對 Java Lambdas 的理解包含在了他的新書Mastering Lambdas - Java Programming in a Multicore World中。

查看英文原文:Functional-Style Callbacks Using Java 8's CompletableFuture

http://www.infoq.com/articles/Functional-Style-Callbacks-Using-CompletableFuture


分享到:


相關文章: