通過源碼深入分析Java 8 中的Stream實現原理(下)

上一片文章我們介紹了創建Stream部分的實現原理,今天我們接著分析Stream剩下的內容。

首先我們看看和Stream相關的內容都有哪些:

通過源碼深入分析Java 8 中的Stream實現原理(下)

Stream上的操作

前面我們介紹過Stream上的操作分為兩大類:中間操作和終端操作。其中中間操作的作用是把Stream轉換為另外一個Stream,終端操作是開啟Stream的計算過程。所以要區分一個操作是中間操作還是終端操作,只需要看看它的方法返回值是不是Stream,返回值是Stream的就是中間操作了,不是的話就是終端操作了。

下面我們就分別對中間操作和終端操作進行分析。

中間操作的實現原理

我們隨便找一箇中間操作來看看它的實現過程,就以filter為例吧。Stream是一個接口類,它的filter方法具體的實現實際上是ReferencePipeline.filter,下面就是它的實現代碼:

通過源碼深入分析Java 8 中的Stream實現原理(下)

我們先忽略掉那些周邊代碼,直接去看最為核心的一小段代碼:

通過源碼深入分析Java 8 中的Stream實現原理(下)

這段代碼邏輯非常的簡單:調用predicate來判斷元素u是否需要過濾,如果不需要過濾就調用下一個Stream的accept方法,參數predicte就是我們調用filter方法是傳入的參數。對於熟悉Java職責鏈模式的朋友一定非常清楚這個過程的作用。

filter操作的核心就這麼兩三行代碼,接下來我們開看看核心代碼周圍的那些代碼是幹嘛的。分析代碼後我們發現:

  • accept是Sink.ChainedReference類的一個方法。
  • Sink.ChainedReference實例是在opWrapSink方法中被創建的。
  • opWrapSink方法則是StatelessOp類的一個方法。
  • StatelessOp實例則是我們在調用filter方法的時候創建的。

所以當我們調用filter方法的時候實際是創建了一個StatelessOp的實例,並被返回了,毫無疑問StatelessOp類是實現了Stream接口的。

終端操作

講完了中間操作的內容後,我們在看看終端操作又是個什麼樣子的,就以count操作為例。

首先我們看看count操作的源碼:

通過源碼深入分析Java 8 中的Stream實現原理(下)

count操作其實是有兩個操作組成的,mapToLong操作是一箇中間操作,作用是把每一個元素都轉換為1,sum操作則是一個終端操作,作用是對所有元素求和。所以要分析終端操作,我們只需要分析sum操作就可以了。

下面是sum操作所對應的源碼:

通過源碼深入分析Java 8 中的Stream實現原理(下)

sum方法最終調用的是reduce方法,在這個方法中會做兩個事情:

  1. 創建一個ReduceOps實例,ReduceOps實際是TerminalOp接口的一個實現。
  2. 調用evaluate方法執行執行終端操作。

從這裡我們就發現終端操作和中間操作的區別了,在終端操作中不僅僅會創建一個操作實例,還會去執行它,在中間操作中是不會有這個執行過程的。

Stream的執行過程

分析完Stream的兩大類操作後,我們就需要進一步分析一下它們是如何被執行並最終操作集合中的元素的。上面的分析我們知道中間操作是不會觸發任何執行的,只有終端操作才會調用evaluate方法執行整個操作鏈,這就是延遲執行了。首先我們看看evaluate方法的實現代碼:

通過源碼深入分析Java 8 中的Stream實現原理(下)

這段代碼主要功能有:判斷Stream是否計算過了,如果沒有計算過才會放行並標記(所以一個流只能被執行一次),然後根據是否標記了並行處理來調用不同的方法(並行與否可以進行多次設置,以最後一次為準)。

我們先分析串行的情況,並行的內容後續詳細來分析。
ReduceOp.evaluateSequential相關的源碼:

通過源碼深入分析Java 8 中的Stream實現原理(下)

一路跟著代碼走,我們可以發現整個執行過程包括兩部分:

  • 調用wrapSink方法構建整個職責鏈。這個方法中會反向遍歷操作鏈表,並調用每一個操作的opWrapSink方法,這個方法我們在前面講中間操作的時候已經介紹過了,它會創建一個Sink.ChainedReference實例。
  • 構建完職責鏈以後,就從職責鏈最前端啟動它。職責鏈調用分為三個階段:begin--操作開始,forEachRemaining--操作元素,會去調用sink的accept方法,end--操作結束。

下面我們用一個流程圖來直觀的描述整個過程:

通過源碼深入分析Java 8 中的Stream實現原理(下)

並行操作

講完了基礎的串行流,我們接下來講一講並行流。所謂的並行流,就是我們的流操作上的任務是被拆分成多個子任務,然後同時執行。Stream的並行功能是基於Java的Fork/Join框架的,什麼是Fork/Join框架呢?我們這裡就不詳細介紹了,只需要知道它是一個把任務拆分執行的框架就可以了,後續有機會再做詳細的介紹。

Fork/Join框架使用的關鍵就是定義一個ForkJoinTask的派生類,然後調用它的invoke方法。

下面是並行流的執行方法:

通過源碼深入分析Java 8 中的Stream實現原理(下)

這個ReduceTask就是ForkJoinTask的派生類。Fork/Join框架的task最為關鍵的就是大任務拆分的過程了(fork過程),我們看看ReduceTask的拆分過程的源碼:

通過源碼深入分析Java 8 中的Stream實現原理(下)

這段代碼有一丟丟長,其主要操作包括下面幾個部分:

  1. 計算元素個數,以及分片閾值,只有元素個數大於這個閾值才會進行任務拆分。
  2. 構建任務二叉樹,並進行fork
  3. 最後對最小的分片進行計算並join結果

任務拆分執行過程:

通過源碼深入分析Java 8 中的Stream實現原理(下)


分享到:


相關文章: