RabbitMQ詳解

RabbitMQ的優點:

  • 開源, 性能有效, 穩定性好
  • 提供可靠性消息投遞模式(confirm), 返回模式(return)等
  • 與Spring完美整合, API豐富
  • 集群模式豐富, 支持表達式配置, 高可用HA模式, 鏡像隊列模型
  • 可以保證數據不丟失的前提下做到高可靠性, 可用性

RabbitMQ高性能原因:

  • 由Erlang語言開發,繼承其天生的併發性,穩定性和安全性有保障

RabbitMQ的協議:

AMQP(Advanced Message Queuing Protocol)高級消息隊列協議,是一個異步消息傳遞所使用應用層協議規範,為面向消息中間件設計,基於此協議的客戶端與消息中間件可以無視消息來源傳遞消息,不受客戶端、消息中間件、不同的開發語言環境等條件的限制。

RabbitMQ詳解

設計概念解釋:

  • Server : 又稱Broker, 接受客戶端連接, 實現AMQP實體服務
  • Connection : 連接, 應用程序與Broker的網絡連接
  • Channel : 網絡信道, 幾乎所有的操作都在Channel中進行, Channel是進行消息讀寫的通道。客戶端可以建立多個Channel, 每個Channel代表一個會話任務。
  • Message : 消息, 服務器和應用程序之間傳送的數據, 有Properties和Body組成。Properties可以對消息進行修飾, 比如消息的優先級, 延遲等高級特性; Body就是消息體內容。
  • Virtual Host : 虛擬地址, 用於進行邏輯隔離, 最上層的消息路由。一個Virtual Host裡面可以有若干個Exchange和Queue, 同一個Virtual Host裡面不能有相同名稱的Exchange或Queue
  • Exchange : 交換機, 用於接收消息, 根據路由鍵轉發消息到綁定的隊列
  • Binding : Exchange和Queue之間的虛擬連接, binding中可以包含routing key
  • Routing Key : 一個路由規則, 虛擬機可用它來確定如何路由一個特定消息
  • Queue : 也成Message Queue, 消息隊列, 用於保存消息並將它們轉發給消費者

RabbitMQ整體架構

RabbitMQ詳解

RabbitMQ成員簡介

Binding-綁定

  • Exchange和Exchange, Queue之間的連接關係
  • 綁定中可以包含RoutingKey或者參數

Queue-消息隊列

  • 消息隊列, 實際存儲消息數據
  • Durability : 是否持久化
  • Auto delete : 如選yes,代表當最後一個監聽被移除之後, 該Queue會自動被刪除

Message-消息

  • 服務和應用程序之間傳送的數據
  • 本質上就是一段數據, 由Properties和Payload(Body)組成
  • 常用屬性 : delivery mode, headers(自定義屬性)
  • 其他屬性content_type, content_encoding, prioritycorrelation_id : 可以認為是消息的唯一idreplay_to : 重回隊列設定expiration : 消息過期時間message_id : 消息idtimestamp, type, user_id, app_id, cluster_id

Virtual Host-虛擬主機

  • 虛擬地址, 用於進行邏輯隔離, 最上層的消息路由
  • 一個Virtual Host裡面可以有若干個Exchange和Queue
  • 同一個Virtual Host裡面不能有相同名稱的Exchange或Queue

Exchange交換機

接收消息,並根據路由鍵轉發消息到所綁定的隊列

注:交換機不會存儲消息,如果消息發送到沒有綁定消費隊列的交換機,消息則丟失。

RabbitMQ詳解

交換機的屬性

  • Name : 交換機名稱
  • Type : 交換機類型, direct, topic, fanout, headers
  • Durability : 是否需要持久化, true為持久化
  • Auto Delete : 當最後一個綁定到Exchange上的隊列刪除後, 自動刪除該Exchange
  • Internal : 當前Exchange是否用於RabbitMQ內部使用, 默認為False, 這個屬性很少會用到
  • Arguments : 擴展參數, 用於擴展AMQP協議制定化使用

交換機的四種類型

  • Direct exchange(直連交換機)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的注意 : Direct模式可以使用RabbitMQ自帶的Exchange(default Exchange), 所以不需要將Exchange進行任何綁定(binding)操作, 消息傳遞時, RoutingKey必須完全匹配才會被隊列接收, 否則該消息會被拋棄
RabbitMQ詳解

  • Fanout exchange(扇型交換機)將消息路由給綁定到它身上的所有隊列不處理路由鍵, 只需要簡單的將隊列綁定到交換機上發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上Fanout交換機轉發消息是最快的
RabbitMQ詳解

  • Topic exchange(主題交換機)隊列通過路由鍵綁定到交換機上,然後,交換機根據消息裡的路由值,將消息路由給一個或多個綁定隊列(模糊匹配)“#” : 匹配一個或多個詞“*” : 匹配一個詞
RabbitMQ詳解

  • Headers exchange(頭交換機)類似主題交換機,但是頭交換機使用多個消息屬性來代替路由鍵建立路由規則。通過判斷消息頭的值能否與指定的綁定相匹配來確立路由規則。

RabbitMQ常用的5種工作模式

1、點對點(簡單)的隊列

RabbitMQ詳解

  • 不需要交換機
  • 一個生產者,一個消費者

2、工作隊列(公平性)

RabbitMQ詳解

  • 不需要交換機
  • 一個生產者,多個消費者,但是一個消息只會發送給一個隊列(競爭的消費者模式)
  • 默認是輪詢,即會將消息輪流發給多個消費者,但這樣對消費得比較慢的消費者不公平
  • 可採用公平分配,即能者多勞channel.basicQos(1);// 限定:發送一條信息給消費者A,消費者A未反饋處理結果之前,不會再次發送信息給消費者Aboolean autoAck = false;// 取消自動反饋 channel.basicConsume(QUEUE_NAME, autoAck, consumer);// 接收信息channel.basicAck(envelope.getDeliveryTag(), false);// 反饋消息處理完畢

3、發佈/訂閱

RabbitMQ詳解

  • 一個生產者,多個消費者
  • 每一個消費者都有自己的一個隊列
  • 生產者沒有直接發消息到隊列中,而是發送到交換機
  • 每個消費者的隊列都綁定到交換機上
  • 消息通過交換機到達每個消費者的隊列

該模式就是Fanout Exchange(扇型交換機)將消息路由給綁定到它身上的所有隊列

4、路由

RabbitMQ詳解

生產者發送消息到交換機並指定一個路由key,消費者隊列綁定到交換機時要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)

該模式採用Direct exchange(直連交換機)

5、主題(通配符)

RabbitMQ詳解

此模式實在路由key模式的基礎上,使用了通配符來管理消費者接收消息。生產者P發送消息到交換機X,交換機根據綁定隊列的routing key的值進行通配符匹配

符號#:匹配一個或者多個詞lazy.# 可以匹配lazy.irs或者lazy.irs.cor

符號*:只能匹配一個詞lazy.* 可以匹配lazy.irs或者lazy.cor

該模式採用Topic exchange(主題交換機)

消息可靠性傳遞或回退(生產者端)

生產者發送消息出去之後,不知道到底有沒有發送到RabbitMQ服務器, 默認是不知道的。而且有的時候我們在發送消息之後,後面的邏輯出問題了,我們不想要發送之前的消息了,需要撤回該怎麼做。

AMQP 事務機制

  • txSelect 將當前channel設置為transaction模式
  • txCommit 提交當前事務
  • txRollback 事務回滾

Confirm 模式

消息的確認, 是指生產者投遞消息後, 如果Broker收到消息, 則會給我們產生一個應答

生產者進行接收應答, 用來確定這條消息是否正常發送到Broker, 這種方式也是消息的可靠性投遞的核心保障

  • 在channel上開啟確認模式 : channel.confirmSelect()
  • 在channel上添加監聽 : addConfirmListener, 監聽成功和失敗的返回結果, 根據具體的結果對消息進行重新發送, 或記錄日誌等後續處理

Return消息機制

Return Listener用於處理一些不可路由的消息

正常情況下消息生產者通過指定一個Exchange和RoutingKey, 把消息送到某一個隊列中去, 然後消費者監聽隊列, 進行消費,但在某些情況下, 如果在發送消息的時候, 當前的exchange不存在或者指定的路由key路由不到,這個時候如果我們需要監聽這種不可達的消息, 就要使用Return Listener。

在基礎API中有一個關鍵的配置項Mandatory : 如果為true, 則監聽器會接收到路由不可達的消息, 然後進行後續處理(補償或人工處理), 如果為false, 那麼broker端自動刪除該消息。

如何保障消息可靠傳遞

  • 保障消息的成功發出
  • 保障MQ節點的成功接收
  • 發送端收到MQ節點(Broker)的確認應答
  • 完善的消息補償機制

方案:

1、消息落庫, 對消息狀態進行標記

RabbitMQ詳解

  • step1:消息入庫
  • step2:消息發送
  • step3:消費端消息確認
  • step4:更新庫中消息狀態為已確認
  • step5:定時任務讀取數據庫中未確認的消息
  • step6:未收到確認結果的消息重新發送
  • step7:如果重試幾次之後仍然失敗, 則將消息狀態更改為投遞失敗的終態, 後面需要人工介入

2、消息的延遲投遞, 做二次確認, 回調檢查

RabbitMQ詳解

  • step1 : 第一次消息發送, 必須業務數據落庫之後才能進行消息發送
  • step2 : 第二次消息延遲發送, 設定延遲一段時間發送第二次check消息
  • step3 : 消費端監聽Broker, 進行消息消費
  • step4 : 消費成功之後, 發送確認消息到確認消息隊列
  • step5 : Callback Service監聽step4中的確認消息隊列, 維護消息狀態, 是否消費成功等狀態
  • step6 : Callback Service監聽step2發送的Delay Check的消息隊列, 檢測內部的消息狀態, 如果消息是發送成功狀態, 則流程結束, 如果消息是失敗狀態, 或者查不到當前消息狀態時, 會通知生產者, 進行消息重發, 重新上述步驟

重試機制和冪等性保障(消費者端)

重試機制

消費者在消費消息的時候,如果消費者業務邏輯出現程序異常,會使用消息重試機制。

  • 情況1: 消費者獲取到消息後,調用第三方接口,但接口暫時無法訪問,是否需要重試? (需要重試機制)
  • 情況2: 消費者獲取到消息後,拋出數據轉換異常,是否需要重試?(不需要重試機制)需要發佈進行解決。

對於情況2,如果消費者代碼拋出異常是需要發佈新版本才能解決的問題,那麼不需要重試,重試也無濟於事。應該採用日誌記錄+定時任務job健康檢查+人工進行補償

重試機制的實現

在SpringBoot中,@RabbitListener(queue="")用於消費者監聽隊列。底層使用Aop進行攔截,如果程序沒有拋出異常,則自動提交事務。如果拋出異常,該消息會緩存到RabbitMQ服務器,自動實施重試機制,一直到成功為止。可以配置重試間隔時間和重試的次數。

冪等性保障

冪等性:多次執行, 結果保持一致

網絡延遲傳輸中,消費出現異常或者是消費延遲消費,會造成MQ進行重試補償,在重試過程中,可能會造成重複消費。

解決方案:

  • 唯一ID+指紋碼機制唯一ID + 指紋碼機制,利用數據庫主鍵去重SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID +指紋碼好處:實現簡單壞處:高併發下有數據庫寫入的性能瓶頸解決方案:跟進ID進行分庫分表進行算法路由
  • 利用Redis的原子性去實現在接收到消息後將消息ID作為key執行 setnx 命令,如果執行成功就表示沒有處理過這條消息,可以進行消費了,執行失敗表示消息已經被消費了。

自動簽收與手動簽收(消費端)

默認是自動簽收

<code>channel.basicConsume(QUEUE_NAME, false, defaultConsumer);//關閉自動簽收,變為手動簽收/<code>
<code>channel.basicAck(envelope.getDeliveryTag(), false);// 手工簽收, 第二個參數表示是否批量簽收/<code>

消費端限流

消息隊列中囤積了大量的消息, 或者某些時刻生產的消息遠遠大於消費者處理能力的時候, 這個時候如果消費者一次取出大量的消息, 但是客戶端又無法處理, 就會出現問題, 甚至可能導致服務崩潰, 所以需要對消費端進行限流

RabbitMQ提供了一種qos(服務質量保證)功能, 即在非自動確認消息的前提下, 如果一定數目的消息(通過consumer或者channel設置qos的值)未被確認前, 不進行消費新的消息

  • 自動簽收要設置成false, 建議實際工作中也設置成false
  • void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;prefetchSize : 消息大小限制, 一般設置為0, 消費端不做限制prefetchCount : 會告訴RabbitMQ不要同時給一個消費者推送多於N個消息, 即一旦有N個消息還沒有ack, 則該consumer將block(阻塞), 直到有消息ackglobal : true/false 是否將上面設置應用於channel, 簡單來說就是上面的限制是channel級別的還是consumer級別 注意 :

prefetchSize和global這兩項,RabbitMQ沒有實現,暫且不關注,prefetchCount在autoAck設置false的情況下生效,即在自動確認的情況下這個值是不生效的

限流可實現公平隊列。


分享到:


相關文章: