RabbitMQ使用以及原理解析

RabbitMQ使用以及原理解析

RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue)的開源實現;在RabbitMQ官網上主要有這樣的模塊信息, Work queues消息隊列,Publish/Subscribe發佈訂閱服務,Routing, Topics, RPC等主要應用的模塊功能.

幾個概念說明:

Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸,

Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。

Queue:消息的載體,每個消息都會被投到一個或多個隊列。

Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來.

Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。

vhost:虛擬主機,一個broker裡可以有多個vhost,用作不同用戶的權限分離。

Producer:消息生產者,就是投遞消息的程序.

Consumer:消息消費者,就是接受消息的程序.

Channel:消息通道,在客戶端的每個連接裡,可建立多個channel.

RabbitMQ的流程圖


RabbitMQ使用以及原理解析

AMQP(高級消息隊列協議 Advanced Message Queue Protocol)

Rabbitmq系統最核心的組件是Exchange和Queue,上圖是系統簡單的示意圖。Exchange和Queue是在rabbitmq server(又叫做broker)端,producer和consumer在應用端。


流程思路

左邊的Client向右邊的Client發送消息,流程:

1, 獲取Conection

2, 獲取Channel

3, 定義Exchange,Queue

4, 使用一個RoutingKey將Queue Binding到一個Exchange上

5, 通過指定一個Exchange和一個RoutingKey來將消息發送到對應的Queue上,

6, 接收方在接收時也是獲取connection,接著獲取channel,然後指定一個Queue直接到它關心的Queue上取消息,它對Exchange,RoutingKey及如何binding都不關心,到對應的Queue上去取消息就OK了;

通信過程

假設P1和C1註冊了相同的Broker,Exchange和Queue。P1發送的消息最終會被C1消費。基本的通信流程大概如下所示:

P1生產消息,發送給服務器端的Exchange

Exchange收到消息,根據ROUTINKEY,將消息轉發給匹配的Queue1

Queue1收到消息,將消息發送給訂閱者C1

C1收到消息,發送ACK給隊列確認收到消息

Queue1收到ACK,刪除隊列中緩存的此條消息

注意要點:

Consumer收到消息時需要顯式的向rabbit broker發送basic.ack消息或者consumer訂閱消息時設置auto_ack參數為true。在通信過程中,隊列對ACK的處理有以下幾種情況:

如果consumer接收了消息,發送ack,rabbitmq會刪除隊列中這個消息,發送另一條消息給consumer。

如果cosumer接受了消息, 但在發送ack之前斷開連接,rabbitmq會認為這條消息沒有被deliver,在consumer在次連接的時候,這條消息會被redeliver。

如果consumer接受了消息,但是程序中有bug,忘記了ack,rabbitmq不會重複發送消息。

rabbitmq2.0.0和之後的版本支持consumer reject某條(類)消息,可以通過設置requeue參數中的reject為true達到目地,那麼rabbitmq將會把消息發送給下一個註冊的consumer。


vhosts(broker)

一個RabbitMQ的實體上可以有多個vhosts,用戶與權限設置就是依附於vhosts。

在rabbitmq server上可以創建多個虛擬的message broker,又叫做virtual hosts (vhosts)。每一個vhost本質上是一個mini-rabbitmq server,分別管理各自的exchange,和bindings。vhost相當於物理的server,可以為不同app提供邊界隔離,使得應用安全的運行在不同的vhost實例上,相互之間不會干擾。producer和consumer連接rabbit server需要指定一個vhost。

connection 與 channel(連接與信道)

connection是指物理的連接,一個client與一個server之間有一個連接;一個連接上可以建立多個channel,可以理解為邏輯上的連接。一般應用的情況下,有一個channel就夠用了,不需要創建更多的channel。

exchange 與 routingkey(交換機與路由鍵)

Exchange類似於數據通信網絡中的交換機,提供消息路由策略。rabbitmq中,producer不是通過信道直接將消息發送給queue,而是先發送給Exchange。一個Exchange可以和多個Queue進行綁定,producer在傳遞消息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由算法,將消息路由給指定的queue。和Queue一樣,Exchange也可設置為持久化,臨時或者自動刪除。

Exchange有4種類型:direct(默認),fanout, topic, 和headers,不同類型的Exchange轉發消息的策略有所區別:

Direct 直接交換器,工作方式類似於單播,Exchange會將消息發送完全匹配ROUTING_KEY的Queue;


RabbitMQ使用以及原理解析

fanout 廣播是式交換器,不管消息的ROUTING_KEY設置為什麼,Exchange都會將消息轉發給所有綁定的Queue。

topic 主題交換器,工作方式類似於組播,Exchange會將消息轉發和ROUTING_KEY匹配模式相同的所有隊列,比如,ROUTING_KEY為user.stock的Message會轉發給綁定匹配模式為 * .stock,user.stock, * . * 和#.user.stock.#的隊列。( * 表是匹配一個任意詞組,#表示匹配0個或多個詞組)


RabbitMQ使用以及原理解析

headers 消息體的header匹配(ignore)


queue(隊列)

消息隊列,提供了FIFO的處理機制,具有緩存消息的能力。rabbitmq中,隊列消息可以設置為持久化,臨時或者自動刪除。

設置為持久化的隊列,queue中的消息會在server本地硬盤存儲一份,防止系統crash,數據丟失

設置為臨時隊列,queue中的數據在系統重啟之後就會丟失

設置為自動刪除的隊列,當不存在用戶連接到server,隊列中的數據會被自動刪除;


Binding(綁定)

所謂綁定就是將一個特定的Exchange和一個特定的 Queue 綁定起來。Exchange和Queue的綁定可以是多對多的關係。


client(Producer&Consumer)

producer指的是消息生產者,consumer消息的消費者。

Rabbit的消息任務機制

1.Round-robin dispathching循環分發

RabbbitMQ的分發機制非常適合擴展,而且它是專門為併發程序設計的,如果現在load加重,那麼只需要創建更多的Consumer來進行任務處理。

2.Message acknowledgment消息確認

為了保證數據不被丟失,RabbitMQ支持消息確認機制,為了保證數據能被正確處理而不僅僅是被Consumer收到,那麼我們不能採用no-ack,而應該是在處理完數據之後發送ack.

在處理完數據之後發送ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ可以安強調內容全的刪除它了.

如果Consumer退出了但是沒有發送ack,那麼RabbitMQ就會把這個Message發送到下一個Consumer,這樣就保證在Consumer異常退出情況下數據也不會丟失.

RabbitMQ它沒有用到超時機制.RabbitMQ僅僅通過Consumer的連接中斷來確認該Message並沒有正確處理,也就是說RabbitMQ給了Consumer足夠長的時間做數據處理。

如果忘記ack,那麼當Consumer退出時,Mesage會重新分發,然後RabbitMQ會佔用越來越多的內存.

消息序列化

RabbitMQ使用ProtoBuf序列化消息,它可作為RabbitMQ的Message的數據格式進行傳輸,由於是結構化的數據,這樣就極大的方便了Consumer的數據高效處理,當然也可以使用XML,與XML相比,ProtoBuf有以下優勢:

1.簡單

2.size小了3-10倍

3.速度快了20-100倍

4.易於編程

6.減少了語義的歧義.

另外,ProtoBuf具有速度和空間的優勢,使得它現在應用非常廣泛;


rabbitmq組件斷鏈重連機制

方案一:

Rabbitmq在啟動時,為rabbitmq設置一個status,在第一次建立連接的時候將其變為true,rabbitmq client在初始化時啟動一個定時器,每隔一段時間開啟一個線程,查詢當前status的狀態,如果status變為false,重新建立連接(包括connection、channel的連接)。

方案二:

Implement shutdown listener,如果rabbitmq斷線,在shutdown方法執行相應的重連方法。


關於消息的重複執行

首先我們可以確認的是,觸發消息重複執行的條件會是很苛刻的! 也就說 在大多數場景下不會觸發該條件!!! 一般出在任務超時,或者沒有及時返回狀態,引起任務重新入隊列,重新消費! 在rabbtimq裡連接的斷開也會觸發消息重新入隊列。

消費任務類型最好要支持冪等性,這樣的好處是 任務執行多少次都沒關係,頂多消耗一些性能! 如果不支持冪等,比如發送信息? 那麼需要構建一個map來記錄任務的執行情況! 不僅僅是成功和失敗,還要有心跳!!! 這個map在消費端實現就可以了!!! 這裡會出現一個問題,有兩個消費者 c1, c2 ,一個任務有可能被c1消費,如果再來一次,被c2執行? 那麼如何得知任務的情況? 任務派發! 任務做成hash,固定消費者!

堅決不要想方設法在mq擴展這個future。

一句話,要不保證消息冪等性,要不就用map記錄任務狀態.

我這裡還有一份Java進階寶典《Java核心知識點整理.pdf》,覆蓋了JVM、Dubbo、鎖、高併發、反射、mybatis、Spring原理、微服務、Zookeeper、數據庫、數據結構等等,需要的朋友關注私信我:【806】,免費領取!


分享到:


相關文章: