關於Spark的RPC機制

一直以來,基於 Akka 實現的 RPC 通信框架是 Spark 引以為豪的主要特性,也是與 Hadoop 等分佈式計算框架對比過程中一大亮點,但是時代和技術都在演化,從 Spark1.3.1 版本開始,為了解決大塊數據(如 Shuffle )的傳輸問題, Spark 引入了 Netty 通信框架,到了 1.6.0 版本, Netty 完成取代了 Akka ,承擔 Spark 內部所有的 RPC 通信以及數據流傳輸。

Spark RPC 中最為重要的三個抽象(“三劍客”)為: RpcEnv 、 RpcEndpoint 、 RpcEndpointRef ,這樣做的好處有:

l 對上層的 API 來說,屏蔽了底層的具體實現,使用方便

l 可以通過不同的實現來完成指定的功能,方便擴展

l 促進了底層實現層的良性競爭, Spark 1.6.3 中默認使用了 Netty 作為底層的實現,但 Akka 的依賴仍然存在;而 Spark 2.1.0 中的底層實現只有 Netty ,這樣用戶可以方便的使用不同版本的 Akka 或者將來某種更好的底層實現。

關於Spark的RPC機制

Send a message locally

通過 Spark 源碼中的一個 Test ( RpcEnvSuite.scala )來分析一下發送本地消息的具體流程,源碼如下(對源碼做了一些修改):

<code>test( "send a message locally" ) {

@volatile var message: String = null

val rpcEndpointRef = env.setupEndpoint( "send-locally" , new RpcEndpoint {

override val rpcEnv = env


override def receive = {

//case msg: String => message = msg

case msg: String => println(message) // 我們直接將接收到的消息打印出來

}

})

rpcEndpointRef.send( "hello" )

// 下面是原來的代碼

//eventually(timeout(5 seconds), interval(10 millis)) {

// assert("hello" === message)

//}

}/<code>

首先是 RpcEndpoint 創建並註冊的流程:

· 1 、 創建 RpcEndpoint ,並初始化 rpcEnv 的引用( RpcEnv 已經創建好,底層實際上是實例化了一個 NettyRpcEnv ,而 NettyRpcEnv 是通過工廠方法 NettyRpcEnvFactory 創建的)

· 2 、 實例化 RpcEndpoint 之後需要向 RpcEnv 註冊該 RpcEndpoint ,底層實現是向 NettyRpcEnv 進行註冊,而實際上是通過調用 Dispatcher 的 registerRpcEndpoint 方法向 Dispatcher 進行註冊

· 3 、 具體的註冊就是向 endpoints 、 endpointRefs 、 receivers 中插入記錄:而 receivers 中插入的信息會被 Dispatcher 中的線程池中的線程執行:會將記錄 take 出來然後調用 Inbox 的 process 方法通過模式匹配的方法進行處理,註冊的時候通過匹配到 OnStart 類型的 message ,去執行 RpcEndpoint 的 onStart 方法(例如 Master 、 Worker 註冊時,就要執行各自的 onStart 方法),本例中未做任何操作

· 4 、 註冊完成後返回 RpcEndpointRef ,我們通過 RpcEndpointRef 就可以向其代表的 RpcEndpoint 發送消息


下面就是通過 RpcEndpointRef 向其代表的 RpcEndpoint 發送消息的具體流程:

· 1 、 2 、調用 RpcEndpointRef 的 send 方法,底層實現是調用 Netty 的 NettyRpcEndpointRef 的 send 方法,而實際上又是調用的 NettyRpcEnv 的 send 方法,發送的消息使用 RequestMessage 進行封裝:

<code>nettyEnv.send( RequestMessage (nettyEnv.address, this , message))/<code>

· 3 、 4 、 NettyRpcEnv 的 send 方法首先會根據 RpcAddress 判斷是本地還是遠程調用,此處是同一個 RpcEnv ,所以是本地調用,即調用 Dispatcher 的 postOneWayMessage 方法

· 5 、 postOneWayMessage 方法內部調用 Dispatcher 的 postMessage 方法

· 6 、 postMessage 會向具體的 RpcEndpoint 發送消息,首先通過 endpointName 從 endpoints 中獲得註冊時的 EndpointData ,如果不為空就執行 EndpointData 中 Inbox 的 post(message) 方法,向 Inbox 的 mesages 中插入一條 InboxMessage ,同時向 receivers 中插入一條記錄,此處將 Inbox 單獨畫出來是為了方便大家理解

· 7 、 Dispatcher 中的線程池會拿出一條線程用來循環 receivers 中的消息,首先使用 take 方法獲得 receivers 中的一條記錄,然後調用 Inbox 的 process 方法來執行這條記錄,而 process 將 messages 中的一條 InboxMessage (第 6 步中插入的)拿出來進行處理,具體的處理方法就是通過模式匹配的方法,匹配到消息的類型(此處是 OneWayMessage ),然後來執行 RpcEndpoint 中對應的 receive 方法,在此例中我們只打印出這條消息(步驟 8 )

至此,一個簡單的發送本地消息的流程執行完成。


分享到:


相關文章: