消息隊列基礎知識整理

大部分內容參考https://github.com/doocs/advanced-java和楊開元 - RocketMQ實戰與原理解析 (雲棲社區系列)。

為什麼使用消息隊列

優點:解耦異步削峰、消息分發

解耦主要就是消息隊列可以實現一個帶存儲,可以分佈式的發佈訂閱消息的事件模式。

異步主要就是當直接調用外部接口比較慢時,可以發送消息到消息隊列,然後慢慢消費處理。

削峰主要就是防止某一時間大量請求同時過來,導致系統崩潰。

實例1:報表導出時,發一個消息到RocketMQ。消費端訂閱消息,去數據庫查詢報表數據,導出到並上傳到OSS,用戶可以在界面下載報表文件。

實例2:新同步的數據需要調用爬蟲系統去爬取信息。可以把數據發送到RocketMQ。爬蟲系統訂閱消息,然後執行實際的爬取行為。

實例3:同時幾百人搶紅包,接收到用戶請求之後,發送消息到消息隊列。後臺一條條消息進行消費。

實例4:各個子系統將日誌數據不停地寫入消息隊列,不同的數據處理系統有各自的Offset,互不影響。甚至某個團隊處理完的結果數據也可以寫入消息隊列,作為數據的產生方,供其他團隊使用,避免重複計算。在大數據時代,消息隊列已經成為數據處理系統不可或缺的一部分。

實例5:用戶創建訂單後,如果耦合調用庫存系統、物流系統、支付系統,任何一個子系統出了故障或者因為升級等原因暫時不可用,都會造成下單操作異常,影響用戶使用體驗。當轉變成基於消息隊列的方式後,系統可用性就高多了,比如物流系統因為發生故障,需要幾分鐘的時間來修復,在這幾分鐘的時間裡,物流系統要處理的內容被緩存在消息隊列裡,用戶的下單操作可以正常完成。當物流系統恢復後,補充處理存儲在消息隊列裡的訂單信息即可,終端用戶感知不到物流系統發生過幾分鐘的故障。

缺點:系統可用性降低、系統複雜度提高、一致性問題

消息隊列對比


消息隊列基礎知識整理

消息隊列高可用

RocketMQ的高可用性

消息隊列基礎知識整理

消息隊列基礎知識整理

RocketMQ的最基本架構認識:Name Server是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。NameServer是整個消息隊列中的狀態服務器,集群的各個組件通過它來了解全局的信息。同時,各個角色的機器都要定期向NameServer上報自己的狀態,超時不上報的話,NameServer會認為某個機器出故障不可用了,其他的組件會把這個機器從可用列表裡移除。BrokerLiveTable存儲的內容是Broker機器的實時狀態,包括上次更新狀態的時間戳,NameServer會定期檢查這個時間戳,超時沒有更新就認為Broker無效了,將其從Broker列表裡清除。默認每10秒檢查一次,時間戳超過2分鐘則認為Broker已失效。Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave的對應關係通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。

每個Broker與Name Server集群中的所有節點建立長連接,定時註冊Topic信息到所有Name ServerProducer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。Consumer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。

先啟動NameServer,再啟動Broker,這時候消息隊列已經可以提供服務了,想發送消息就使用Producer來發送,想接收消息就使用Consumer來接收。很多應用程序既要發送,又要接收,可以啟動多個Producer和Consumer來發送多種消息,同時接收多種消息。為了消除單點故障,增加可靠性或增大吞吐量,可以在多臺機器上部署多個NameServer和Broker,為每個Broker部署一個或多個Slave。

同步複製是指當Slave和Master消息同步完成後,再返回發送成功的狀態。同步刷盤情況下,消息真正寫入磁盤後再返回成功狀態;異步刷盤情況下,消息寫入page_cache後就返回成功狀態。

RocketMQ的消息是存儲到磁盤上的,這樣既能保證斷電後恢復,又可以讓存儲的消息量超出內存的限制。RocketMQ為了提高性能,會盡可能地保證磁盤的順序寫。

發送端的高可用性是指在創建Topic的時候,把Topic的多個Message Queue創建在多個Broker組上(相同Broker名稱,不同brokerId的機器組成一個Broker組),這樣當一個Broker組的Master不可用後,其他組的Master仍然可用,Producer仍然可以發送消息。RocketMQ目前還不支持把Slave自動轉成Master,如果機器資源不足,需要把Slave轉成Master,則要手動停止Slave角色的Broker,更改配置文件,用新的配置文件啟動Broker。

在Consumer的配置文件中,並不需要設置是從Master讀還是從Slave讀,當Master不可用或者繁忙的時候,Consumer會被自動切換到從Slave讀。有了自動切換Consumer這種機制,當一個Master角色的機器出現故障後,Consumer仍然可以從Slave讀取消息,不影響Consumer程序。這就達到了消費端的高可用性。

通常情況下,應該把Master和Save配置成ASYNC_FLUSH的刷盤方式,主從之間配置成SYNC_MASTER的複製方式,這樣即使有一臺機器出故障,仍然能保證數據不丟,是個不錯的選擇。

最高的可用性:多Master,每個Master帶有Slave;主從之間設置成SYNC_MASTER;Producer用同步方式寫;刷盤策略設置成SYNC_FLUSH。可以消除單點依賴,即使某臺機器出現極端故障也不會丟消息。

Kafka的高可用性

Kafka 一個最基本的架構認識:由多個 broker 組成,每個 broker 是一個節點;你創建一個 topic,這個 topic 可以劃分為多個 partition,每個 partition 可以存在於不同的 broker 上,每個 partition 就放一部分數據。這就是

天然的分佈式消息隊列,就是說一個 topic 的數據,是分散放在多個機器上的,每個機器就放一部分數據。每個 partition 的數據都會同步到其它機器上,形成自己的多個 replica 副本。所有 replica 會選舉一個 leader 出來,那麼生產和消費都跟這個 leader 打交道,然後其他 replica 就是 follower。寫的時候,leader 會負責把數據同步到所有 follower 上去,讀的時候就直接讀 leader 上的數據即可。只能讀寫 leader?很簡單,要是你可以隨意讀寫每個 follower,那麼就要 care 數據一致性的問題,系統複雜度太高,很容易出問題。Kafka 會均勻地將一個 partition 的所有 replica 分佈在不同的機器上,這樣才可以提高容錯性。

這麼搞,就有所謂的高可用性了,因為如果某個 broker 宕機了,沒事兒,那個 broker上面的 partition 在其他機器上都有副本的。如果這個宕機的 broker 上面有某個 partition 的 leader,那麼此時會從 follower 中重新選舉一個新的 leader 出來,大家繼續讀寫那個新的 leader 即可。這就有所謂的高可用性了。

寫數據的時候,生產者就寫 leader,然後 leader 將數據落地寫本地磁盤,接著其他 follower 自己主動從 leader 來 pull 數據。一旦所有 follower 同步好數據了,就會發送 ack 給 leader,leader 收到所有 follower 的 ack 之後,就會返回寫成功的消息給生產者。(當然,這只是其中一種模式,還可以適當調整這個行為)

消費的時候,只會從 leader 去讀,但是隻有當一個消息已經被所有 follower 都同步成功返回 ack 的時候,這個消息才會被消費者讀到。

RabbitMQ 的高可用性

普通集群模式,意思就是在多臺機器上啟動多個 RabbitMQ 實例,每個機器啟動一個。你創建的 queue,只會放在一個 RabbitMQ 實例上,但是每個實例都同步 queue 的元數據(元數據可以認為是 queue 的一些配置信息,通過元數據,可以找到 queue 所在實例)。你消費的時候,實際上如果連接到了另外一個實例,那麼那個實例會從 queue 所在實例上拉取數據過來。這方案沒有高可用性,主要是提高吞吐量的,就是說讓集群中多個節點來服務某個 queue 的讀寫操作。

在鏡像集群模式下,你創建的 queue,無論元數據還是 queue 裡的消息都會存在於多個實例上,就是說,每個 RabbitMQ 節點都有這個 queue 的一個完整鏡像,包含 queue 的全部數據的意思。然後每次你寫消息到 queue 的時候,都會自動把

消息同步到多個實例的 queue 上。有高可用性,但是性能開銷太大,消息需要同步到所有機器上,導致網絡帶寬壓力和消耗很重。

重複消費

對分佈式消息隊列來說,同時做到確保一定投遞和不重複投遞是很難的,也就是所謂的“有且僅有一次”。

RocketMQ中,一種類型的消息會放到一個Topic裡,為了能夠並行,一般一個Topic會有多個Message Queue(也可以設置成一個),Offset是指某個Topic下的一條消息在某個Message Queue裡的位置,通過Offset的值可以定位到這條消息,或者指示Consumer從這條消息開始向後繼續處理。對於DefaultMQPushConsumer來說,默認是CLUSTERING模式,也就是同一個Consumer group裡的多個消費者每人消費一部分,各自收到的消息內容不一樣。這種情況下,由Broker端存儲和控制Offset的值,使用RemoteBrokerOffsetStore結構。在DefaultMQPushConsumer裡的BROADCASTING模式下,每個Consumer都收到這個Topic的全部消息,各個Consumer間相互沒有干擾,RocketMQ使用LocalFileOffsetStore,把Offset存到本地。如果使用DefaultMQPullConsumer,我們就要自己處理OffsetStore。

消息重複一般情況下不會發生,但是如果消息量大,網絡有波動,消息重複就是個大概率事件。比如Producer有個函數setRetryTimesWhenSendFailed,設置在同步方式下自動重試的次數,默認值是2,這樣當第一次發送消息時,Broker端接收到了消息但是沒有正確返回發送成功的狀態,就造成了消息重複。

解決消息重複有兩種方法:第一種方法是保證消費邏輯的冪等性(多次調用和一次調用效果相同);另一種方法是維護一個已消費消息的記錄,消費前查詢這個消息是否被消費過。這兩種方法都需要使用者自己實現。

Kafka 實際上有個 offset 的概念,就是每個消息寫進去,都有一個 offset,代表消息的序號,然後 consumer 消費了數據之後,每隔一段時間(定時定期),會把自己消費過的消息的 offset 提交一下,表示“我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的 offset 來繼續消費吧”。

但是凡事總有意外,比如我們之前生產經常遇到的,就是你有時候重啟系統,看你怎麼重啟了,如果碰到點著急的,直接 kill 進程了,再重啟。這會導致 consumer 有些消息處理了,但是沒來得及提交 offset,尷尬了。重啟之後,少數消息會再次消費一次。

重複消費需要保證冪等性

  • 比如你拿個數據要寫庫,你先根據主鍵查一下,如果這數據都有了,你就別插入了,update 一下好吧。
  • 比如你是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。
  • 比如你不是上面兩個場景,那做的稍微複雜一點,你需要讓生產者發送每條數據的時候,裡面加一個全局唯一的 id,類似訂單 id 之類的東西,然後你這裡消費到了之後,先根據這個 id 去比如 Redis 裡查一下,之前消費過嗎?如果沒有消費過,你就處理,然後這個 id 寫 Redis。如果消費過了,那你就別處理了,保證別重複處理相同的消息即可。
  • 比如基於數據庫的唯一鍵來保證重複數據不會重複插入多條。因為有唯一鍵約束了,重複數據插入只會報錯,不會導致數據庫中出現髒數據。

消息丟失

RabbitMQ消息丟失


消息隊列基礎知識整理


生產者弄丟了數據

生產者將數據發送到 RabbitMQ 的時候,可能數據就在半路給搞丟了,因為網絡問題啥的,都有可能。

此時可以選擇用 RabbitMQ 提供的事務功能,就是生產者發送數據之前開啟 RabbitMQ 事務channel.txSelect,然後發送消息,如果消息沒有成功被 RabbitMQ 接收到,那麼生產者會收到異常報錯,此時就可以回滾事務channel.txRollback,然後重試發送消息;如果收到了消息,那麼可以提交事務channel.txCommit。

但是問題是,RabbitMQ 事務機制(同步)一搞,基本上吞吐量會下來,因為太耗性能

所以一般來說,如果你要確保說寫 RabbitMQ 的消息別丟,可以開啟 confirm 模式,在生產者那裡設置開啟 confirm 模式之後,你每次寫的消息都會分配一個唯一的 id,然後如果寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個 ack 消息,告訴你說這個消息 ok 了。如果 RabbitMQ 沒能處理這個消息,會回調你的一個 nack 接口,告訴你這個消息接收失敗,你可以重試。而且你可以結合這個機制自己在內存裡維護每個消息 id 的狀態,如果超過一定時間還沒接收到這個消息的回調,那麼你可以重發。

事務機制和 confirm 機制最大的不同在於,事務機制是同步的,你提交一個事務之後會阻塞在那兒,但是 confirm 機制是異步的,你發送個消息之後就可以發送下一個消息,然後那個消息 RabbitMQ 接收了之後會異步回調你的一個接口通知你這個消息接收到了。

所以一般在生產者這塊避免數據丟失,都是用 confirm 機制的。

RabbitMQ 弄丟了數據

就是 RabbitMQ 自己弄丟了數據,這個你必須開啟 RabbitMQ 的持久化,就是消息寫入之後會持久化到磁盤,哪怕是 RabbitMQ 自己掛了,恢復之後會自動讀取之前存儲的數據,一般數據不會丟。除非極其罕見的是,RabbitMQ 還沒持久化,自己就掛了,可能導致少量數據丟失,但是這個概率較小。

設置持久化有兩個步驟

  • 創建 queue 的時候將其設置為持久化
    這樣就可以保證 RabbitMQ 持久化 queue 的元數據,但是它是不會持久化 queue 裡的數據的。
  • 第二個是發送消息的時候將消息的 deliveryMode 設置為 2
    就是將消息設置為持久化的,此時 RabbitMQ 就會將消息持久化到磁盤上去。

必須要同時設置這兩個持久化才行,RabbitMQ 哪怕是掛了,再次重啟,也會從磁盤上重啟恢復 queue,恢復這個 queue 裡的數據。

注意,哪怕是你給 RabbitMQ 開啟了持久化機制,也有一種可能,就是這個消息寫到了 RabbitMQ 中,但是還沒來得及持久化到磁盤上,結果不巧,此時 RabbitMQ 掛了,就會導致內存裡的一點點數據丟失。

所以,持久化可以跟生產者那邊的 confirm 機制配合起來,只有消息被持久化到磁盤之後,才會通知生產者 ack 了,所以哪怕是在持久化到磁盤之前,RabbitMQ 掛了,數據丟了,生產者收不到 ack,你也是可以自己重發的。

消費端弄丟了數據

RabbitMQ 如果丟失了數據,主要是因為你消費的時候,

剛消費到,還沒處理,結果進程掛了,比如重啟了,那麼就尷尬了,RabbitMQ 認為你都消費了,這數據就丟了。

這個時候得用 RabbitMQ 提供的 ack 機制,簡單來說,就是你必須關閉 RabbitMQ 的自動 ack,可以通過一個 api 來調用就行,然後每次你自己代碼裡確保處理完的時候,再在程序裡 ack 一把。這樣的話,如果你還沒處理完,不就沒有 ack 了?那 RabbitMQ 就認為你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理,消息是不會丟的。

KafKa消息丟失

消費端弄丟了數據

唯一可能導致消費者弄丟數據的情況,就是說,你消費到了這個消息,然後消費者那邊自動提交了 offset,讓 Kafka 以為你已經消費好了這個消息,但其實你才剛準備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟咯。

這不是跟 RabbitMQ 差不多嗎,大家都知道 Kafka 會自動提交 offset,那麼只要關閉自動提交 offset,在處理完之後自己手動提交 offset,就可以保證數據不會丟。但是此時確實還是

可能會有重複消費,比如你剛處理完,還沒提交 offset,結果自己掛了,此時肯定會重複消費一次,自己保證冪等性就好了。

生產環境碰到的一個問題,就是說我們的 Kafka 消費者消費到了數據之後是寫到一個內存的 queue 裡先緩衝一下,結果有的時候,你剛把消息寫入內存 queue,然後消費者會自動提交 offset。然後此時我們重啟了系統,就會導致內存 queue 裡還沒來得及處理的數據就丟失了。

Kafka 弄丟了數據

這塊比較常見的一個場景,就是 Kafka 某個 broker 宕機,然後重新選舉 partition 的 leader。大家想想,要是此時其他的 follower 剛好還有些數據沒有同步,結果此時 leader 掛了,然後選舉某個 follower 成 leader 之後,不就少了一些數據?這就丟了一些數據啊。

生產環境也遇到過,我們也是,之前 Kafka 的 leader 機器宕機了,將 follower 切換為 leader 之後,就會發現說這個數據就丟了。

所以此時一般是要求起碼設置如下 4 個參數:

  • 給 topic 設置 replication.factor 參數:這個值必須大於 1,要求每個 partition 必須有至少 2 個副本。
  • 在 Kafka 服務端設置 min.insync.replicas 參數:這個值必須大於 1,這個是要求一個 leader 至少感知到有至少一個 follower 還跟自己保持聯繫,沒掉隊,這樣才能確保 leader 掛了還有一個 follower 吧。
  • 在 producer 端設置 acks=all:這個是要求每條數據,必須是寫入所有 replica 之後,才能認為是寫成功了
  • 在 producer 端設置 retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裡了。

我們生產環境就是按照上述要求配置的,這樣配置之後,至少在 Kafka broker 端就可以保證在 leader 所在 broker 發生故障,進行 leader 切換時,數據不會丟失。

生產者會不會弄丟數據?

如果按照上述的思路設置了 acks=all,一定不會丟,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之後,才認為本次寫成功了。如果沒滿足這個條件,生產者會自動不斷的重試,重試無限次。

順序消息

順序消息是指消息的消費順序和產生順序相同,在有些業務邏輯下,必須保證順序。比如訂單的生成、付款、發貨,這3個消息必須按順序處理才行。順序消息分為全局順序消息和部分順序消息,全局順序消息指某個Topic下的所有消息都要保證順序;部分順序消息只要保證每一組消息被順序消費即可,比如上面訂單消息的例子,只要保證同一個訂單ID的三個消息能按順序消費即可。

RocketMQ

RocketMQ在默認情況下不保證順序,比如創建一個Topic,默認八個寫隊列,八個讀隊列。這時候一條消息可能被寫入任意一個隊列裡;在數據的讀取過程中,可能有多個Consumer,每個Consumer也可能啟動多個線程並行處理,所以消息被哪個Consumer消費,被消費的順序和寫入的順序是否一致是不確定的。

要保證全局順序消息,需要先把Topic的讀寫隊列數設置為一,然後Producer和Consumer的併發設置也要是一。簡單來說,為了保證整個Topic的全局消息有序,只能消除所有的併發處理,各部分都設置成單線程處理。這時高併發、高吞吐量的功能完全用不上了。在實際應用中,更多的是像訂單類消息那樣,只需要部分有序即可。

要保證部分消息有序,需要發送端和消費端配合處理。在發送端,要做到把同一業務ID的消息發送到同一個Message Queue;在消費過程中,要做到從同一個Message Queue讀取的消息不被併發處理,這樣才能達到部分有序。

發送端使用MessageQueueSelector類來控制把消息發往哪個Message Queue。

消費端通過使用MessageListenerOrderly類來解決單Message Queue的消息被併發處理的問題。在MessageListenerOrderly的實現中,為每個Consumer Queue加個鎖,消費每個消息前,需要先獲得這個消息對應的Consumer Queue所對應的鎖,這樣保證了同一時間,同一個Consumer Queue的消息不被併發消費,但不同Consumer Queue的消息可以併發處理。

RocketMQ採用了局部順序一致性的機制,實現了單個隊列中的消息嚴格有序。也就是說,如果想要保證順序消費,必須將一組消息發送到同一個隊列中,然後再由消費者進行逐一消費。

RocketMQ推薦的順序消費解決方案是:按照業務劃分不同的隊列,然後將需要順序消費的消息發往同一隊列中即可,不同業務之間的消息仍採用併發消費。這種方式在滿足順序消費的同時提高了消息的處理速度,在一定程度上避免了消息堆積問題。

想要實現順序消費,發送方式必須為同步發送,異步發送無法保證消息的發送順序!

在消費端,消息完全按照發送的順序進行了消費,保證了消息的順序性。

在多Consumer的情況下,不同Queue上的消息可以併發消費,同一個Queue上的消息仍然可以保證順序消費。

Kafka

kafka的順序消息僅僅是通過partitionKey,將某類消息寫入同一個partition,一個partition只能對應一個消費線程,以保證數據有序。除了發送消息需要指定partitionKey外,producer和consumer實例化無區別。kafka broker宕機,kafka會有自選擇,所以宕機不會減少partition數量,也就不會影響partitionKey的sharding。

生產者在寫的時候,可以指定一個 key,比如說我們指定了某個訂單 id 作為 key,那麼這個訂單相關的數據,一定會被分發到同一個 partition 中去,而且這個 partition 中的數據一定是有順序的。消費者從 partition 中取出來數據的時候,也一定是有順序的。但是消費者裡可能會有多個線程來併發來處理消息。因為如果消費者是單線程消費數據,那麼這個吞吐量太低了。而多個線程併發的話,順序可能就亂掉了。解決方案是寫N個queue,將具有相同key的數據都存儲在同一個queue,然後對於N個線程,每個線程分別消費一個queue即可。

RabbitMQ

rocketmq保證同一個訂單的消息,一定要發送到同一個隊列。並且該隊列只有一個消費者,也就是說 同一個隊列,不能出現多個消費者並行消費的情況。

雖然同一個隊列不能並行消費,但是可以並行消費不同的隊列。就是說同時多筆訂單之間是可以並行消費。

ActiveMQ

Queue中的消息是按照順序發送給Consumers的。然而,當你有多個Consumer同時從相同的Queue提取消息時,順序將不能得到保證。因為這些消息時被多個線程併發的處理。但是,有時候保證消息的順序是很重要的。例如,你可能不希望插入訂單操作結束之前執行更新訂單的操作。那麼我們可以通過Exclusive Consumer和Message Groups來實現這一目的。從ActiveMQ4.X版本開始支持ExclusiveConsumer(或者說是Exclusive Queues)。Broker會從多個Consumer中挑選一個Consumer來處理所有的消息,從而保證消息的有序處理。如果這個Consumer失效,那麼Broker會自動切換到其他的Consumer。

消息隊列的延時以及過期失效

消息隊列滿了以後該怎麼處理?有幾百萬消息持續積壓幾小時,說說怎麼解決?舉個例子,消費端每次消費之後要寫 mysql,結果 mysql 掛了,消費端 hang 那兒了,不動了;或者是消費端出了個什麼岔子,導致消費速度極其慢。

大量消息在 mq 裡積壓了幾個小時了還沒解決

  • 先修復 consumer 的問題,確保其恢復消費速度,然後將現有 consumer 都停掉。
  • 新建一個 topic,partition 是原來的 10 倍,臨時建立好原先 10 倍的 queue 數量。
  • 然後寫一個臨時的分發數據的 consumer 程序,這個程序部署上去消費積壓的數據,消費之後不做耗時的處理,直接均勻輪詢寫入臨時建立好的 10 倍數量的 queue。
  • 接著臨時徵用 10 倍的機器來部署 consumer,每一批 consumer 消費一個臨時 queue 的數據。這種做法相當於是臨時將 queue 資源和 consumer 資源擴大 10 倍,以正常的 10 倍速度來消費數據。
  • 等快速消費完積壓數據之後,
    得恢復原先部署的架構重新用原先的 consumer 機器來消費消息。

mq 中的消息過期失效了

假設你用的是 RabbitMQ,RabbtiMQ 是可以設置過期時間的,也就是 TTL。如果消息在 queue 中積壓超過一定的時間就會被 RabbitMQ 給清理掉,這個數據就沒了。那這就是第二個坑了。這就不是說數據會大量積壓在 mq 裡,而是大量的數據會直接搞丟

這個情況下,就不是說要增加 consumer 消費積壓的消息,因為實際上沒啥積壓,而是丟了大量的消息。我們可以採取一個方案,就是批量重導,這個我們之前線上也有類似的場景幹過。就是大量積壓的時候,我們當時就直接丟棄數據了,然後等過了高峰期以後,比如大家一起喝咖啡熬夜到晚上12點以後,用戶都睡覺了。這個時候我們就開始寫程序,將丟失的那批數據,寫個臨時程序,一點一點的查出來,然後重新灌入 mq 裡面去,把白天丟的數據給他補回來。也只能是這樣了。

假設 1 萬個訂單積壓在 mq 裡面,沒有處理,其中 1000 個訂單都丟了,你只能手動寫程序把那 1000 個訂單給查出來,手動發到 mq 裡去再補一次。

mq 都快寫滿了

如果消息積壓在 mq 裡,你很長時間都沒有處理掉,此時導致 mq 都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程序,接入數據來消費,消費一個丟棄一個,都不要了,快速消費掉所有的消息。然後走第二個方案,到了晚上再補數據吧。

集群擴容和縮容

RocketMQ動態擴容和縮容

需要對集群進行擴容的時候,可以動態增加Broker角色的機器。只增加Broker不會對原有的Topic產生影響,原來創建好的Topic中數據的讀寫依然在原來的那些Broker上進行。集群擴容後,一是可以把新建的Topic指定到新的Broker機器上,均衡利用資源;另一種方式是通過updateTopic命令更改現有的Topic配置,在新加的Broker上創建新的隊列。

當某個Topic有多個Master Broker,停了其中一個,這時候是否會丟失消息呢?答案和Producer使用的發送消息的方式有關,如果使用同步方式send(msg)發送,在DefaultMQProducer內部有個自動重試邏輯,其中一個Broker停了,會自動向另一個Broker發消息,不會發生丟消息現象。如果使用異步方式發送send(msg,callback),或者用sendOneWay方式,會丟失切換過程中的消息。因為在異步和sendOneWay這兩種發送方式下,Producer.setRetryTimesWhenSendFailed設置不起作用,發送失敗不會重試。DefaultMQProducer默認每30秒到NameServer請求最新的路由消息,Producer如果獲取不到已停止的Broker下的隊列信息,後續就自動不再向這些隊列發送消息。

性能優化

RocketMQ性能優化

消息過濾

  • 在服務端進行消息過濾可以減少無效消息傳輸造成的帶寬浪費,Tag是最常用的一種高效過濾方式,此外還可以用SQL表達式、FilterServer來過濾消息。

另一個提高吞吐量的方法是增加集群的機器數量,提高併發性,要根據實際場景增加Broker、Consumer或Producer角色的機器數量。

Consumer處理能力提高

  • 在同一個ConsumerGroup下(Clustering方式),可以通過增加Consumer實例的數量來提高並行度,通過加機器,或者在已有機器中啟動多個Consumer進程都可以增加Consumer實例數。注意總的Consumer數量不要超過Topic下Read Queue數量,超過的Consumer實例接收不到消息(負載均衡)。此外,通過提高單個Consumer實例中的並行處理的線程數,可以在同一個Consumer內增加並行度來提高吞吐量(設置方法是修改consumeThreadMin和consumeThreadMax)。
  • 可以通過批量方式消費來提高消費的吞吐量。
  • Consumer在消費的過程中,如果發現由於某種原因發生嚴重的消息堆積,短時間無法消除堆積,這個時候可以選擇丟棄不重要的消息,使Consumer儘快追上Producer的進度。

Producer的發送速度提高

發送一條消息出去要經過三步,一是客戶端發送請求到服務器,二是服務器處理該請求,三是服務器向客戶端返回應答,一次消息的發送耗時是上述三個步驟的總和。

  • 在一些對速度要求高,但是可靠性要求不高的場景下,比如日誌收集類應用,可以採用Oneway方式發送,Oneway方式只發送請求不等待應答,即將數據寫入客戶端的Socket緩衝區就返回,不等待對方返回結果,用這種方式發送消息的耗時可以縮短到微秒級。
  • 另一種提高發送速度的方法是增加Producer的併發量,使用多個Producer同時發送,我們不用擔心多Producer同時寫會降低消息寫磁盤的效率,RocketMQ引入了一個併發窗口,在窗口內消息可以併發地寫入DirectMem中,然後異步地將連續一段無空洞的數據刷入文件系統當中。順序寫CommitLog可讓RocketMQ無論在HDD還是SSD磁盤情況下都能保持較高的寫入性能。
  • 在Linux操作系統層級進行調優,推薦使用EXT4文件系統,IO調度算法使用deadline算法。EXT4創建/刪除文件的性能比EXT3及其他文件系統要好,RocketMQ的CommitLog會有頻繁的創建/刪除動作。deadline算法大致思想如下:實現四個隊列,其中兩個處理正常的read和write操作,另外兩個處理超時的read和write操作。正常的read和write隊列中,元素按扇區號排序,進行正常的IO合併處理以提高吞吐量。因為IO請求可能會集中在某些磁盤位置,這樣會導致新來的請求一直被合併,可能會有其他磁盤位置的IO請求被餓死。超時的read和write的隊列中,元素按請求創建時間排序,如果有超時的請求出現,就放進這兩個隊列,調度算法保證超時(達到最終期限時間)的隊列中的IO請求會優先被處理。

主從同步

RocketMQ的主從同步

RocketMQ的Broker分為Master和Slave兩個角色,為了保證高可用性,Master角色的機器接收到消息後,要把內容同步到Slave機器上,這樣一旦Master宕機,Slave機器依然可以提供服務。

需要同步的信息分為兩種類型,實現方式各不相同:一種是元數據信息,採用基於Netty的command方式來同步消息;另一種是commitLog信息,同步方式是直接基於Java NIO來實現。

CommitLog和元數據信息不同:首先,CommitLog的數據量比元數據要大;其次,對實時性和可靠性要求也不一樣。元數據信息是定時同步的,在兩次同步的時間差裡,如果出現異常可能會造成Master上的元數據內容和Slave上的元數據內容不一致,不過這種情況還可以補救(手動調整Offset,重啟Consumer等)。CommitLog在高可靠性場景下如果沒有及時同步,一旦Master機器出故障,消息就徹底丟失了。所以有專門的代碼來實現Master和Slave之間消息體內容的同步。CommitLog的同步,不是經過netty command的方式,而是直接進行TCP連接,這樣效率更高。連接成功以後,通過對比Master和Slave的Offset,不斷進行同步。

sync_master是同步方式,也就是Master角色Broker中的消息要立刻同步過去;async_master是異步方式,也就是Master角色Broker中的消息是通過異步處理的方式同步到Slave角色的機器上的。

寫一個消息隊列,該如何進行架構設計

比如說這個消息隊列系統,我們從以下幾個角度來考慮一下:

  • 首先這個 mq 得支持可伸縮性吧,就是需要的時候快速擴容,就可以增加吞吐量和容量,那怎麼搞?設計個分佈式的系統唄,參照一下 kafka 的設計理念,broker -> topic -> partition,每個 partition 放一個機器,就存一部分數據。如果現在資源不夠了,簡單啊,給 topic 增加 partition,然後做數據遷移,增加機器,不就可以存放更多數據,提供更高的吞吐量了?
  • 其次你得考慮一下這個 mq 的數據要不要落地磁盤吧?那肯定要了,落磁盤才能保證別進程掛了數據就丟了。那落磁盤的時候怎麼落啊?順序寫,這樣就沒有磁盤隨機讀寫的尋址開銷,磁盤順序讀寫的性能是很高的,這就是 kafka 的思路。
  • 其次你考慮一下你的 mq 的可用性啊?這個事兒,具體參考之前可用性那個環節講解的 kafka 的高可用保障機制。多副本 -> leader & follower -> broker 掛了重新選舉 leader 即可對外服務。
  • 能不能支持數據 0 丟失啊?可以的,參考我們之前說的那個 kafka 數據零丟失方案。

零拷貝的原理

避免了從用戶態到內核態,以及內核態到用戶態的複製,沒有CPU參與。零拷貝主要包含以下兩種方式:

  • 使用 mmap + write 方式 優點:即使頻繁調用,使用小塊文件傳輸,效率也很高 缺點:不能很好的利用 DMA 方式,會比 sendfile 多消耗CPU,內存安全性控制複雜,需要避免 JVM Crash 問題。
  • 使用 sendfile 方式 優點:可以利用 DMA 方式,消耗 CPU 較少,大塊文件傳輸效率高,無內存安全新問題。 缺點:小塊文件效率低亍 mmap 方式,只能是 BIO 方式傳輸,不能使用 NIO。 RocketMQ 選擇了第一種方式,mmap+write 方式,因為有小塊數據傳輸的需求,效果會比 sendfile 更好。

Linux操作系統分為“用戶態”和“內核態”,文件操作、網絡操作需要涉及這兩種形態的切換,免不了進行數據複製,一臺服務器把本機磁盤文件的內容發送到客戶端,一般分為兩個步驟:

1)read(file,tmp_buf,len),讀取本地文件內容;

2)write(socket,tmp_buf,len),將讀取的內容通過網絡發送出去。

tmp_buf是預先申請的內存,這兩個看似簡單的操作,實際進行了4次數據複製,分別是:從磁盤複製數據到內核態內存,從內核態內存複製到用戶態內存(完成了read(file,tmp_buf,len));然後從用戶態內存複製到網絡驅動的內核態內存,最後是從網絡驅動的內核態內存複製到網卡中進行傳輸(完成write(socket,tmp_buf,len))。

通過使用mmap的方式,可以省去向用戶態的內存複製,提高速度。

rocketMQ 在消費消息時,使用了 mmap。kafka 使用了 sendFile。

參考銜接

  1. https://blog.csdn.net/qq_34021712/article/details/79273641
  2. https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/why-mq.md
  3. https://blog.csdn.net/weixin_34452850/article/details/82664799
  4. https://blog.csdn.net/qq_31329893/article/details/90451889
  5. https://blog.csdn.net/qincidong/article/details/89156732
  6. https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/how-to-ensure-high-availability-of-message-queues.md
  7. https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/how-to-ensure-that-messages-are-not-repeatedly-consumed.md
  8. https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/how-to-ensure-the-reliable-transmission-of-messages.md
  9. https://github.com/doocs/advanced-java
  10. https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/how-to-ensure-the-order-of-messages.md
  11. https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/mq-time-delay-and-expired-failure.md
  12. RocketMQ實戰與原理解析書籍
  13. https://juejin.im/post/5d231b9d5188251b201d619a
  14. http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/


分享到:


相關文章: