RocketMQ的心臟:Broker

RocketMQ的心臟:Broker

這節介紹下RocketMQ中最後的一個部分,也是內容較多的一部分:Broker。

Broker的啟動同其他幾個組件一樣,從XXXStartup(BrokerStartup)類的main方法開始,首先加載對應的配置文件XXXConfig(BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig),然後實例化XXXController(BrokerController),接著調用Controller的initialize方法,最後註冊ShutdownHook。

Broker在構造方法中,會進行如下的實例化動作:

RocketMQ的心臟:Broker


包括:

  1. 配置類:BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig
  2. 管理類:ConsumerOffsetManager(偏移量管理)、TopicConfigManager(topic配置管理)、ConsumerManager(消費者管理)、ConsumerFilterManager(消費者過濾管理)、ProducerManager(生產者管理)、SubscriptionGroupManager(訂閱組管理)、FilterServerManager(服務端過濾管理)、BrokerStatsManager(broker狀態管理)
  3. 服務類:PullMessageProcessor(拉取消息處理器)、PullRequestHoldService(拉取請求緩存服務)、ClientHousekeepingService(客戶端長連接服務)、SlaveSynchronize(slave同步)
  4. 監聽類:NotifyMessageArrivingListener(通知消息到達監聽器)、DefaultConsumerIdsChangeListener(客戶端id改變監聽器)
  5. 工具類:Broker2Client、BrokerOuterAPI
  6. 線程隊列類:發送線程、拉取線程、查詢線程、客戶端管理線程、消費者管理線程、心跳線程、事務線程

Broker的初始化和啟動過程如下:

RocketMQ的心臟:Broker


初始化的步驟為:

  1. 加載配置文件,由TopicConfigManager處理:存儲每個topic的配置信息;ConsumerOffsetManager:緩存所有topic@group對應的queue的偏移量;SubscriptionGroupManager:存儲每個group的配置信息;ConsumerFilterManager:Topic的過濾表達式信息
  2. 加載消息存儲內容,由MessageStore處理
  3. 註冊請求處理器,由NettyRequestProcesser處理,根據RequestCode,分發遠程請求給對應的Processer,包括SendMessageProcessor、QueryMessageProcessor、ClientManageProcessor、ConsumerManageProcessor、AdminBrokerProcessor
  4. 每隔一天記錄broker狀態,BrokerStats會打印當天接受/處理的消息總數
  5. 每一段時間(默認5s)記錄消費者消費的偏移量,ConsumerOffsetManager會將緩存topic下每個q對應的消費偏移量進行持久化存儲為json
  6. 每10s記錄消費者過濾情況,ConsumerFilterManager會將緩存topic的過濾表達式持久化存儲為json
  7. 每3min“保護”broker,BrokerStatsManager會每秒/分鐘/小時記錄各字表的次數和消費消息的大小,如果記錄的客戶端消費失敗字節數大於配置的參數,會將該topic設置為不可消費
  8. 每1s打印打印消息隊列水位線,包括髮送Queue數量、拉取Queue數量、查詢Queue數量
  9. 每1分鐘打印處理的消息量
  10. 同步namesrv地址列表,由BrokerOutterAPI處理,如果指定了namesrv地址,則使用該列表;否則從配置的網絡路徑上同步
  11. (slave)每1min同步slave,SlaveSynchronize會通過BrokerOutterApi向master請求數據同步
  12. (master)每1min打印slave比master落後的消息數,MessageStore會打印最大的消費偏移量-已經同步給slave的偏移量
  13. 初始化事務服務

啟動步驟包括:

  1. 啟動消息存儲服務MessageStore
  2. 啟動Netty服務RemotingServer
  3. 啟動文件監聽服務FileWatchService,監聽tls證書,在發生變化時重新加載
  4. 啟動API服務BrokerOuterAPI
  5. 啟動緩存pull請求處理服務PullRequestHoldService,對每個topic下的q的pull請求,定時再次執行
  6. 啟動客戶端連接服務ClientHouseKeepingService,每10s清除超時沒有消息的客戶端連接
  7. 啟動服務端過濾服務FilterServerManager,每30s執行本地腳本
  8. 註冊broker,後面每一段時間(不超過1分鐘)註冊broker,首次啟動會向所有namesrv同步broker信息和所持有的topic信息,後面每隔一段時間同步一次
  9. 啟動broker狀態管理BrokerStatsManager
  10. 啟動請求過期清理任務BrokerFastFailure,如果開啟了快速失效的配置,會定時清理緩存中的過期請求
  11. 啟動事務消息檢查服務TransactionalMessageCheckService

下面將介紹其中幾塊內容。

1. SendMessageProcessor

消息發送處理器,主要處理Producer發送的消息和Consumer的重試消息。

1.1. 處理發送消息請求

主要過程為:

  1. 解析請求頭,得到SendMessageRequestHeader
  2. 構建上下文對象,SendMessageContext
  3. 執行前置hook,調用SendMessageHook的sendMessageBefore方法
  4. 執行核心處理邏輯,分為單消息和批量消息的處理,主要是先進行前置檢查,如判斷MesageStore是否已經啟動、Queue是否正確等,然後將請求內容包裝為Broker內部處理的形式,交由MessageStore處理放入CommitLog中。其中單消息包裝為MessageExtBrokerInner、批量消息包裝為MessageExtBatch
  5. 執行後置hook,調用SendMessageHook的sendMessageAfter方法
RocketMQ的心臟:Broker

1.2. 處理Consumer的重試消息

在介紹Consumer消費消息過程時提到過,在Push模式下,如果消息消費失敗,可以將消息重新返回Consumer實例的內存緩存隊裡中等待消費,也可以由Consumer模擬Producer角色,將消息發送到Broker,等待再次消費。

主要過程為:

  1. 解析請求頭,得到ConsumerSendMsgBackRequestHeader
  2. 如果原消息Id不為空,執行消費後置hook,調用ConsumeMessageHook.consumeMessageAfter
  3. 檢查前置判斷條件,包括所在Group是否存在、是否有權限等
  4. 默認放到Retry重試隊列中,分配新的topic,格式為 ``%RETRY%+Group````
  5. 檢查重試隊列配置,包括重試隊列配置是否為空、重置隊列是否可寫等
  6. 根據原消息的偏移量offset在CommitLog中查找原消息內容(MessageExt格式)
  7. 判斷該消息的重試次數是否已經超過設定的最大值(默認16次),如果是,則將放到DLQ死信隊列中,將topic格式更新為``%DLQ%+Group````
  8. 將原消息重新包裝為MessageExtBrokerInner對象,並調用MessageStore放到CommitLog中
RocketMQ的心臟:Broker

2. PullMessageProcessor

消息拉取處理器,在介紹消費者消費消息過程時提到過,RocketMQ內部都是通過Pull方式從Broker拉取消息的,Push模式是通過包裝Pull方式,由RocketMQ定時發起,並自動處理offset、消息重試動作。

PullMessageProcessor主要的工作是根據客戶端提供的offset,從ConsumeQueue中獲取到該topic-queueId在CommitLog中的起始位置,每次讀取消息都會從ConsumeQueue中儘可能多的讀取消息,並計算出客戶端下次的offset,把結果返回。如果提供的offset過大,會將請求暫緩,在超時時間內或者ConsumeQueue的數據符合後,再次處理請求。

具體的處理流程如下:

RocketMQ的心臟:Broker


上面過程比較清晰,大多是前置檢查,主要看"判斷GetMessageResult"的過程,如下:

RocketMQ的心臟:Broker


該過程主要是設置返回結果RemotingCommand的值,包括:

  1. 偏移量:下次消費的起始偏移量、下次可消費的最小偏移量、下次可消費的最大偏移量
  2. 消費節點:下次從master節點還是slave節點消費
  3. 消費結果:成功(SUCCESS)、未找到符合條件的消息(PULL_NOT_FOUND)、直接重試(PULL_RETRY_IMMEDIATELY)、消息被移動(PULL_OFFSET_MOVED)

該過程有幾點需要說明:

  1. 在判斷response狀態碼進行後續處理前會執行一次前置調用,調用consumerMessageBefore,而後置調用則發送在客戶端或者消息重試的時候,詳見之前Consumer消費過程一節和本節前面內容。
  2. 向response寫內容時的實現


RocketMQ的心臟:Broker


用戶態方式直接複製GetMessageResult的結果,設置到response中,由response序列化後返回;零拷貝方式則通過封裝為Netty的FileRegion,以mmap的方式返回。

3. 未找到消息時暫緩響應由PullRequestHoldService完成,會等待一段時間後再處理

3. ClientManageProcessor

客戶端管理處理,主要包括:接收客戶端心跳、去註冊客戶端、檢查客戶端配置,這裡主要介紹接收客戶端心跳的情況。

3.1. 接收客戶端心跳

Broker通過解碼客戶端上報的心跳信息,會得到HeartbeatData對象,該對象包括客戶端實例的標識clientID以及該實例的生產者和消費者信息,如下:

RocketMQ的心臟:Broker


對於沒一個客戶端,會根據clientID和連接對象Channel關聯到一個ClientChannelInfo,用於標識該客戶端實例。獲得生產者和消費者信息後,就會調用用ConsumerManager和ProducerManager註冊消費者和生產者。

ConsumerManager對象如下,主要持有每個Group對應的ConsumerGroupInfo,用於存儲消費者的連接和訂閱信息;以及一個ConsumerIdsChangeListener,用於監聽消費者變化情況。

RocketMQ的心臟:Broker


當消費者上線(
ConsumerManager.registerConsumer)時,如果是新客戶端連接,或者消費者訂閱的topic列表發生了改變時,就會觸發ConsumerIdsChangeListener的CHANGE事件;另外,當消費者下線(
ConsumerManager.unregisterConsumer)時,也會觸發CHANGE事件。ConsumerIdsChangeListener的默認實現如下:

RocketMQ的心臟:Broker


對於CHANGE事件,會遍歷Broker所持有的消費客戶端連接(
ConsumerGroupInfo.channelInfoTable),通知該Group下的消費者列表發生了改變。即在介紹Rebalace那節介紹過的Broker端主動通知Consumer的情況。

ProducerManager類似,如下,比較簡單,只持有生成者客戶端鏈接。

RocketMQ的心臟:Broker


上面只介紹了Broker對外提供的幾個有代表行的接口,主要用於同其他組件交互。其中有提及用於Producer和Consuemr中轉的消息存儲服務MessageStore,該部分內容會在下節介紹。


分享到:


相關文章: