Netty-RocketMQ底層通信的利器

這節介紹RocketMQ底層通信的原理

在之前的內容中有介紹過RocketMQ底層用了Netty來進行通信,下圖為RocketMQ通信的大致過程,主要分為Server端和Client端。

Netty-RocketMQ底層通信的利器

客戶端通過invokeSyncImpl、invokeAsyncImpl、invokeOnewayImpl這幾個方法同服務端交互。

1. NettyRemotingServer

Server啟動主要是初始化ServerBootstrap,主要配置如下:

Netty-RocketMQ底層通信的利器

  1. 設置tcp的參數,包括SO_BACKLOG、SO_REUSEADDR、SO_KEEPALIVE、TCP_NODELAY等。
  2. 設置pipeline處理鏈,包括編碼、解碼、空閒處理、連接管理、請求分發。

啟動完ServerBootstrap後會啟動一個定時器,每3秒清除超時的請求。

這裡介紹下面幾個處理器:

  1. NettyEncoder
  2. NettyDecoder
  3. NettyConnectManageHandler
  4. NettyServerHandler

1.1. NettyEncoder

NettyEncoder繼承自LengthFieldBasedFrameDecoder,主要有用於解碼入站數據流,並將數據流解碼為RemotingCommand對象。

LengthFieldBasedFrameDecoder(自定義長度解碼器)的構造器,涉及5個參數,都與長度域(數據包中的長度字段)相關,具體介紹如下:

  1. maxFrameLength:發送的數據包最大長度;
  2. lengthFieldOffset:長度域偏移量,指的是長度域位於整個數據包字節數組中的下標;
  3. lengthFieldLength:長度域的自己的字節數長度。
  4. lengthAdjustment:長度域的偏移量矯正。 如果長度域的值,除了包含有效數據域的長度外,還包含了其他域(如長度域自身)長度,那麼,就需要進行矯正。矯正的值為:包長 - 長度域的值 – 長度域偏移 – 長度域長。
  5. initialBytesToStrip:丟棄的起始字節數。丟棄處於有效數據前面的字節數量。比如前面有4個節點的長度域,則它的值為4。

以NettyEncoder為例,器構造構造方法為

<code>public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
/<code>

即數據流中前4個字節的值表示有效數據域的長度,除開前4個字節外的內容都是有效數據域的內容,不存在偏移量。

接收到數據域的內容後,便會調用RemotingCommand.decode方法,將數據流轉為RemotingCommand對象。

RemotingCommand對象分為Header部分和Body部分。Header部分包括固定的一組字段,已經長度不定的擴展字段;Body部分為byte[],不進行具體的細分。

Netty-RocketMQ底層通信的利器

數據域的解析過程同上面的類似,數據域中前4個自己為Header域的長度,取到Header長度後便能計算出Body長度,從而進行讀取。RemotingCommand的內容如下:

Netty-RocketMQ底層通信的利器

根據serializerType的不同,Header的編碼會分為Json或者二進制的方式。

1.2. NettyDecoder

NettyEncoder的反過程,將RemotingCommand對象序列化為ByteBuffer對象。根據serializerType的不同,Header會編碼為JSON或者二進制。

1.3. NettyConnectManageHandler

NettyConnectManageHandler繼承自ChannelDuplexHandler,用於監聽pipeline中入站/出站的事件,主要進行日誌記錄。

1.4. NettyServerHandler

NettyServerHandler繼承自SimpleChannelInboundHandler,重寫了channelRead0方法,在裡面調用了父類NettyRemotingAbstract的processMessageReceived方法,如下:

Netty-RocketMQ底層通信的利器

該方法定義了請求和響應的處理過程。

1.processRequestCommand

處理請求過程,先根據RemotingCommand中的code值判斷當前請求是否能夠處理,如果不能處理則直接響應不支持。如果可以支持,則會找到對應的處理器,新起線程來處理當前請求。需要說明的是,NettyRemotingServer內部維護這一個processorTable,表示該server可以處理的command,對應的Processor以及對應的線程池。

<code>protected final HashMap<integer>> processorTable =
new HashMap<integer>>(64);
/<integer>/<integer>/<code>

Processor的定義如下,對於具體的command,會由對應的Processor來處理

<code>public interface NettyRequestProcessor {
RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws Exception;

boolean rejectRequest();
}
/<code>

RocketMQ提供的Processor如下,其中一個Processor可能會處理一個或者多個code.

Netty-RocketMQ底層通信的利器

2.processResponseCommand

客戶端發起一次調用時,會根據請求id,構造一個ResponseFuture,並將其緩存在responseTable字段中,用來表示目前正在進行中的請求。

<code>protected final ConcurrentMap<integer> responseTable =
new ConcurrentHashMap<integer>(256);
/<integer>/<integer>/<code>

當有響應的時候,會根據請求id,獲取對應的ResponseFuture,再進行後置處理,包括執行回調、釋放資源等。

2. NettyRemotingClient

Client啟動主要是初始化Bootstrap,主要配置如下:

Netty-RocketMQ底層通信的利器

  1. 設置tcp的參數,包括TCP_NODELAY、SO_KEEPALIVE、CONNECT_TIMEOUT_MILLIS等。
  2. 設置pipeline處理鏈,包括編碼、解碼、空閒處理、連接管理、請求分發。

啟動完ServerBootstrap後會啟動一個定時器,每3秒清除超時的請求。

Client端處理鏈上的幾個處理器,除了NettyClientHandler外都同Server端的一樣。而NettyClientHandler也繼承自SimpleChannelInboundHandler,並重寫了channelRead0方法,在裡面調用了父類NettyRemotingAbstract的processMessageReceived方法,過程跟Server端類似。

3. 調用流程

上面介紹了Server端和Client端的啟動過程,以及消息的編解碼,這裡介紹消息的具體請求過程。主要是開頭提到的invokeSyncImpl、invokeAsyncImpl和invokeOnewayImpl這幾個方法。

3.1. invokeSyncImpl 同步調用

內部是通過countdownlatch等待來模擬的同步調用,如下圖:

Netty-RocketMQ底層通信的利器

  1. 客戶端調用invokeSyncImpl後,client會構造ResponseFeature對象,並根據請求id將其緩存起來,然後調用Netty發送請求後在ChannelFutureListener中等待回調。
  2. 這時候客戶端會通過countdownlatch等待一定的時間,如果客戶端請求成功,則在ChannelFutureListener中直接返回,等待超時時間到達;如果請求失敗,則直接通知countdownlatch,不再等待,直接返回
  3. 請求到達服務端,經過NettyDecoder、NettyServerHandler後,會調用processRequestCommand方法,最終在對應類型的線程池中提交任務,任務執行完後通過執行糊掉,返回結果
  4. 客戶端接收到響應後,通過NettyClientHandler,會加油processResponseCommand方法處理,這時會根據請求id獲取之前的ResponseReature對象,執行回調,最後清除緩存。

3.2. invokeAsyncImpl 異步調用

相比同步調用,少了等待超時時間,但是增加了semaphore信號量控制最多有多少個連接同時執行。請求發起後,將結果對象緩存起來,結果將通過InvokeCallback進行回調,如果有設置回調函數,結果返回,在回調線程發起後就會將信號量回收,如果沒有設置回調函數,結果返回後就會將信號量回收。其餘過程大致同同步調用類似。

Netty-RocketMQ底層通信的利器

3.3. invokeOnewayImpl 單步調用

單向請求,無結果,請求成功後不等待結果,直接釋放信號量,服務端也不會返回結果。

Netty-RocketMQ底層通信的利器

3.4. MQClientAPIImpl

MQClientAPIImpl在之前介紹過,主要為Producer和Consumer提供遠程通信調用的功能,內部主要是對NettyRemotingClient的封裝,以對外提供服務,如:

  1. createSubscriptionGroup,請求broker創建group
  2. createTopic,請求broker創建創建topic
  3. sendMessage,發送消息,單步、異步、同步

等多種服務的封裝。同時MQClientAPIImpl也能夠接收服務端的主動請求,從而進行響應,對外提供的具體功能如下,通過調用registerProcessor來添加:

Netty-RocketMQ底層通信的利器

以NOTIFY_CONSUMER_IDS_CHANGED為例,當Broker發現Group中的Consumer實例發生改變的時候,會遍歷客戶的連接Channel,然後逐一通知到客戶端。這時候客戶端的角色轉變為”服務端“,服務端轉變為"客戶端",兩端都會觸發processResponseCommand方法。


分享到:


相關文章: