asyncTool解決任意的多線程並行、串行、阻塞、依賴、回調的框架

該框架目前正在 京東App後臺 接受苛刻、高併發、海量用戶等複雜場景業務的檢驗測試,隨時會根據實際情況發佈更新和bugFix。

有對區塊鏈感興趣的,可以參考項目作者另一個GVP項目,java區塊鏈底層入門。

如果只是需要用這個框架,請往下看即可。如果需要深入瞭解這個框架是如何一步一步實現的,從接到需求,到每一步的思考,每個類為什麼這麼設計,為什麼有這些方法,也就是如何從0到1開發出這個框架,作者在csdn開了專欄專門講中間件如何從0開發,包括並不限於這個小框架。京東內部同事可在cf上搜索erp也能看到。

並行常見的場景

1 客戶端請求服務端接口,該接口需要調用其他N個微服務的接口

譬如 請求我的訂單,那麼就需要去調用用戶的rpc、商品詳情的rpc、庫存rpc、優惠券等等好多個服務。同時,這些服務還有相互依賴關係,譬如必須先拿到用戶的某個字段後,再去某rpc服務請求數據。 最終全部獲取完畢後,或超時了,就彙總結果,返回給客戶端。

2 並行執行N個任務,後續根據這1-N個任務的執行結果來決定是否繼續執行下一個任務

如用戶可以通過郵箱、手機號、用戶名登錄,登錄接口只有一個,那麼當用戶發起登錄請求後,我們需要並行根據郵箱、手機號、用戶名來同時查數據庫,只要有一個成功了,都算成功,就可以繼續執行下一步。而不是先試郵箱能否成功、再試手機號……

再如某接口限制了每個批次的傳參數量,每次最多查詢10個商品的信息,我有45個商品需要查詢,就可以分5堆並行去查詢,後續就是統計這5堆的查詢結果。就看你是否強制要求全部查成功,還是不管有幾堆查成功都給客戶做返回

再如某個接口,有5個前置任務需要處理。其中有3個是必須要執行完畢才能執行後續的,另外2個是非強制的,只要這3個執行完就可以進行下一步,到時另外2個如果成功了就有值,如果還沒執行完,就是默認值。

3 需要進行線程隔離的多批次任務

如多組任務, 各組任務之間彼此不相關,每組都需要一個獨立的線程池,每組都是獨立的一套執行單元的組合。有點類似於hystrix的線程池隔離策略。

4 單機工作流任務編排

5 其他有順序編排的需求

並行場景之核心——任意編排

1 多個執行單元的串行請求

asyncTool解決任意的多線程並行、串行、阻塞、依賴、回調的框架

2 多個執行單元的並行請求

asyncTool解決任意的多線程並行、串行、阻塞、依賴、回調的框架

3 阻塞等待,串行的後面跟多個並行

asyncTool解決任意的多線程並行、串行、阻塞、依賴、回調的框架

4 阻塞等待,多個並行的執行完畢後才執行某個

asyncTool解決任意的多線程並行、串行、阻塞、依賴、回調的框架

5 串並行相互依賴

asyncTool解決任意的多線程並行、串行、阻塞、依賴、回調的框架

6 複雜場景

asyncTool解決任意的多線程並行、串行、阻塞、依賴、回調的框架

並行場景可能存在的需求之——每個執行結果的回調

傳統的Future、CompleteableFuture一定程度上可以完成任務編排,並可以把結果傳遞到下一個任務。如CompletableFuture有then方法,但是卻無法做到對每一個執行單元的回調。譬如A執行完畢成功了,後面是B,我希望A在執行完後就有個回調結果,方便我監控當前的執行狀況,或者打個日誌什麼的。失敗了,我也可以記錄個異常信息什麼的。

此時,CompleteableFuture就無能為力了。

我的框架提供了這樣的回調功能。並且,如果執行異常、超時,可以在定義這個執行單元時就設定默認值。

並行場景可能存在的需求之——執行順序的強依賴和弱依賴

如上圖的3,A和B併發執行,最後是C。

有些場景下,我們希望A和B都執行完畢後,才能執行C,CompletableFuture裡有個allOf(futures...).then()方法可以做到。

有些場景下,我們希望A或者B任何一個執行完畢,就執行C,CompletableFuture裡有個anyOf(futures...).then()方法可以做到。

我的框架同樣提供了類似的功能,通過設定wrapper裡的addDepend依賴時,可以指定依賴的任務是否must執行完畢。如果依賴的是must要執行的,那麼就一定會等待所有的must依賴項全執行完畢,才執行自己。

如果依賴的都不是must,那麼就可以任意一個依賴項執行完畢,就可以執行自己了。

注意:這個依賴關係是有必須和非必須之分的,還有一個重要的東西是執行單元不能重複執行。譬如圖4,如果B執行完畢,然後執行了A,此時C終於執行完了,然後也到了A,此時就會發現A已經在執行,或者已經完畢(失敗),那麼就不應該再重複執行A。

還有一種場景,如下圖,A和D並行開始,D先執行完了,開始執行Result任務,此時B和C都還沒開始,然後Result執行完了,雖然B和C都還沒執行,但是已經沒必要執行了。B和C這些任務是可以被跳過的,跳過的原則是他們的NextWrapper已經有結果了或者已經在執行了。我提供了checkNextWrapperResult方法來控制,當後面的任務已經執行了,自己還要不要執行的邏輯控制。當然,這個控制僅限於nextWrapper只有一個時才有成立。

asyncTool解決任意的多線程並行、串行、阻塞、依賴、回調的框架

併發場景可能存在的需求之——依賴上游的執行結果作為入參

譬如A-B-C三個執行單元,A的入參是String,出參是int,B呢它需要用A的結果作為自己的入參。也就是說A、B並不是獨立的,而是有結果依賴關係的。

在A執行完畢之前,B是取不到結果的,只是知道A的結果類型。

那麼,我的框架也支持這樣的場景。可以在編排時,就取A的結果包裝類,作為B的入參。雖然此時尚未執行,必然是空,但可以保證A執行完畢後,B的入參會被賦值。

併發場景可能存在的需求之——全組任務的超時

一組任務,雖然內部的各個執行單元的時間不可控,但是我可以控制全組的執行時間不超過某個值。通過設置timeOut,來控制全組的執行閾值。

併發場景可能存在的需求之——高性能、低線程數

該框架全程無鎖,沒有一個加鎖的地方。

創建線程量少。

asyncTool解決任意的多線程並行、串行、阻塞、依賴、回調的框架

如這樣的,A會運行在B、C執行更慢的那個單元的線程上,而不會額外創建線程。

asyncTool特點

解決任意的多線程並行、串行、阻塞、依賴、回調的併發框架,可以任意組合各線程的執行順序,帶全鏈路回調和超時控制。

其中的A、B、C分別是一個最小執行單元(worker),可以是一段耗時代碼、一次Rpc調用等,不侷限於你做什麼。

該框架可以將這些worker,按照你想要的各種執行順序,加以組合編排。最終得到結果。

並且,該框架 為每一個worker都提供了執行結果的回調和執行失敗後自定義默認值 。譬如A執行完畢後,A的監聽器會收到回調,帶著A的執行結果(成功、超時、異常)。

根據你的需求,將各個執行單元組合完畢後,開始在主線程執行並阻塞,直到最後一個執行完畢。並且 可以設置全組的超時時間

該框架支持後面的執行單元以前面的執行單元的結果為自己的入參 。譬如你的執行單元B的入參是ResultA,ResultA就是A的執行結果,那也可以支持。在編排時,就可以預先設定B或C的入參為A的result,即便此時A尚未開始執行。當A執行完畢後,自然會把結果傳遞到B的入參去。

該框架全程無鎖。


分享到:


相關文章: