分布式系統消息中間件——RabbitMQ的使用基礎篇

前言

我是在解決分佈式事務的一致性問題時瞭解到RabbitMQ的,當時主要是要基於RabbitMQ來實現我們分佈式系統之間對有事務可靠性要求的系統間通信的。關於分佈式事務一致性問題及其常見的解決方案,可以看我另一篇博客。提到RabbitMQ,不難想到的幾個關鍵字:消息中間件、消息隊列。而消息隊列不由讓我想到,當時在大學學習操作系統這門課,消息隊列不難想到生產者消費者模式。(PS:操作系統這門課程真的很好也很重要,其中的一些思想在我工作的很長一段一時間內給了我很大幫助和啟發,給我提供了許多解決問題的思路。強烈建議每一個程序員都去學一學操作系統!)

分佈式系統消息中間件——RabbitMQ的使用基礎篇

一 消息中間件

1.1 簡介

消息中間件也可以稱消息隊列,是指用高效可靠的消息傳遞機制進行與平臺無關的數據交流,並基於數據通信來進行分佈式系統的集成。通過提供消息傳遞和消息隊列模型,可以在分佈式環境下擴展進程的通信。當下主流的消息中間件有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。其能在不同平臺之間進行通信,常用來屏蔽各種平臺協議之間的特性,實現應用程序之間的協同。其優點在於能夠在客戶端和服務器之間進行同步和異步的連接,並且在任何時刻都可以將消息進行傳送和轉發。是分佈式系統中非常重要的組件,主要用來解決應用耦合、異步通信、流量削峰等問題。

1.2 作用

消息中間件幾大主要作用如下:

  • 解耦
  • 冗餘(存儲)
  • 擴展性
  • 削峰
  • 可恢復性
  • 順序保證
  • 緩衝
  • 異步通信

1.3 消息中間件的兩種模式

1.3.1 P2P模式

P2P模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。每個消息都被髮送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時。

P2P的特點:

  • 每個消息只有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中)
  • 發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之後,不管接收者有沒有正在運行它不會影響到消息被髮送到隊列
  • 接收者在成功接收消息之後需向隊列應答成功
  • 如果希望發送的每個消息都會被成功處理的話,那麼需要P2P模式

1.3.2 Pub/Sub模式

Pub/Sub模式包含三個角色主題(Topic),發佈者(Publisher),訂閱者(Subscriber) 。多個發佈者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。

Pub/Sub的特點

  • 每個消息可以有多個消費者
  • 發佈者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之後,才能消費發佈者的消息。
  • 為了消費消息,訂閱者必須保持運行的狀態。
  • 如果希望發送的消息可以不被做任何處理、或者只被一個消息者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型。

1.4 常用中間件介紹與對比

  • Kafka是LinkedIn開源的分佈式發佈-訂閱消息系統,目前歸屬於Apache定級項目。Kafka主要特點是基於Pull的模式來處理消息消費,追求高吞吐量,一開始的目的就是用於日誌收集和傳輸。0.8版本開始支持複製,不支持事務,對消息的重複、丟失、錯誤沒有嚴格要求,適合產生大量數據的互聯網服務的數據收集業務。
  • RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。AMQP協議更多用在企業系統內,對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。
  • RocketMQ是阿里開源的消息中間件,它是純Java開發,具有高吞吐量、高可用性、適合大規模分佈式系統應用的特點。RocketMQ思路起源於Kafka,但並不是Kafka的一個Copy,它對消息的可靠傳輸及事務性做了優化,目前在阿里集團被廣泛應用於交易、充值、流計算、消息推送、日誌流式處理、binglog分發等場景。

RabbitMQ比Kafka可靠,kafka更適合IO高吞吐的處理,一般應用在大數據日誌處理或對實時性(少量延遲),可靠性(少量丟數據)要求稍低的場景使用,比如ELK日誌收集。

二 RabbitMQ瞭解

2.1 簡介

RabbitMQ是流行的開源消息隊列系統。RabbitMQ是AMQP(高級消息隊列協議)的標準實現。支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。是使用Erlang編寫的一個開源的消息隊列,本身支持很多的協議:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它變的非常重量級,更適合於企業級的開發。同時實現了一個Broker構架,這意味著消息在發送給客戶端時先在中心隊列排隊。對路由(Routing),負載均衡(Load balance)或者數據持久化都有很好的支持。其主要特點如下:

  • 可靠性
  • 靈活的路由
  • 擴展性
  • 高可用性
  • 多種協議
  • 多語言客戶端
  • 管理界面
  • 插件機制

2.2 概念

RabbitMQ從整體上來看是一個典型的生產者消費者模型,主要負責接收、存儲和轉發消息。其整體模型架構如下圖所示:

分佈式系統消息中間件——RabbitMQ的使用基礎篇

我們先來看一個RabbitMQ的運轉流程,稍後會對這個流程中所涉及到的一些概念進行詳細的解釋。

生產者:

(1)生產者連接到RabbitMQ Broker,建立一個連接( Connection)開啟一個信道(Channel)

(2)生產者聲明一個交換器,並設置相關屬性,比如交換機類型、是否持久化等

(3)生產者聲明一個隊列井設置相關屬性,比如是否排他、是否持久化、是否自動刪除等

(4)生產者通過路由鍵將交換器和隊列綁定起來

(5)生產者發送消息至RabbitMQ Broker,其中包含路由鍵、交換器等信息。

(6)相應的交換器根據接收到的路由鍵查找相匹配的隊列。

(7)如果找到,則將從生產者發送過來的消息存入相應的隊列中。

(8)如果沒有找到,則根據生產者配置的屬性選擇丟棄還是回退給生產者

(9)關閉信道。

(10)關閉連接。'

消費者:

(1)消費者連接到RabbitMQ Broker ,建立一個連接(Connection),開啟一個信道(Channel) 。

(2)消費者向RabbitMQ Broker 請求消費相應隊列中的消息,可能會設置相應的回調函數,

(3)等待RabbitMQ Broker 回應並投遞相應隊列中的消息,消費者接收消息。

(4)消費者確認(ack) 接收到的消息。

(5)RabbitMQ 從隊列中刪除相應己經被確認的消息。

(6)關閉信道。

(7)關閉連接。

2.2.1 信道

這裡我們主要討論兩個問題:

為何要有信道?

主要原因還是在於TCP連接的"昂貴"性。無論是生產者還是消費者,都需要和RabbitMQ Broker 建立連接,這個連接就是一條TCP 連接。而操作系統對於TCP連接的創建與銷燬是非常昂貴的開銷。假設消費者要消費消息,並根據服務需求合理調度線程,若只進行TCP連接,那麼當高併發的時候,每秒可能都有成千上萬的TCP連接,不僅僅是對TCP連接的浪費,也很快會超過操作系統每秒所能建立連接的數量。如果能在一條TCP連接上操作,又能保證各個線程之間的私密性就完美了,於是信道的概念出現了。

信道為何?

信道是建立在Connection 之上的虛擬連接。當應用程序與Rabbit Broker建立TCP連接的時候,客戶端緊接著可以創建一個AMQP 信道(Channel) ,每個信道都會被指派一個唯一的D。RabbitMQ 處理的每條AMQP 指令都是通過信道完成的。信道就像電纜裡的光纖束。一條電纜內含有許多光纖束,允許所有的連接通過多條光線束進行傳輸和接收。

2.2.2 生產者消費者

關於生產者消費者我們需要了解幾個概念:

  • Producer:生產者,即消息投遞者一方。
  • 消息:消息一般分兩個部分:消息體(payload)和標籤。標籤用來描述這條消息,如:一個交換器的名稱或者一個路由Key,Rabbit通過解析標籤來確定消息的去向,payload是消息內容可以使一個json,數組等等。
  • Consumer:消費者,就是接收消息的一方。消費者訂閱RabbitMQ的隊列,當消費者消費一條消息時,只是消費消息的消息體。在消息路由的過程中,會丟棄標籤,存入到隊列中的只有消息體。
  • Broker:消息中間件的服務節點。

2.2.3 隊列、交換器、路由key、綁定

從RabbitMQ的運轉流程我們可以知道生產者的消息是發佈到交換器上的。而消費者則是從隊列上獲取消息的。那麼消息到底是如何從交換器到隊列的呢?我們先具體瞭解一下這幾個概念。

Queue:隊列,是RabbitMQ的內部對象,用於存儲消息。RabbitMQ中消息只能存儲在隊列中。生產者投遞消息到隊列,消費者從隊列中獲取消息並消費。多個消費者可以訂閱同一個隊列,這時隊列中的消息會被平均分攤(輪詢)給多個消費者進行消費,而不是每個消費者都收到所有的消息進行消費。(注意:RabbitMQ不支持隊列層面的廣播消費,如果需要廣播消費,可以採用一個交換器通過路由Key綁定多個隊列,由多個消費者來訂閱這些隊列的方式。)

Exchange:交換器。在RabbitMQ中,生產者並非直接將消息投遞到隊列中。真實情況是,生產者將消息發送到Exchange(交換器),由交換器將消息路由到一個或多個隊列中。如果路由不到,或返回給生產者,或直接丟棄,或做其它處理。

RoutingKey:路由Key。生產者將消息發送給交換器的時候,一般會指定一個RoutingKey,用來指定這個消息的路由規則。這個路由Key需要與交換器類型和綁定鍵(BindingKey)聯合使用才能最終生效。在交換器類型和綁定鍵固定的情況下,生產者可以在發送消息給交換器時通過指定RoutingKey來決定消息流向哪裡。

Binding:RabbitMQ通過綁定將交換器和隊列關聯起來,在綁定的時候一般會指定一個綁定鍵,這樣RabbitMQ就可以指定如何正確的路由到隊列了。

從這裡我們可以看到在RabbitMQ中交換器和隊列實際上可以是一對多,也可以是多對多關係。交換器和隊列就像我們關係數據庫中的兩張表。他們同歸BindingKey做關聯(多對多關係表)。在我們投遞消息時,可以通過Exchange和RoutingKey(對應BindingKey)就可以找到相對應的隊列。

分佈式系統消息中間件——RabbitMQ的使用基礎篇

需要關於分佈式架構方面資料的讀者可以關注我然後私信回覆我“分佈式”即可獲取免費的資料鏈接。

RabbitMQ主要有四種類型的交換器:

  • fanout:扇形交換器,它會把發送到該交換器的消息路由到所有與該交換器綁定的隊列中。如果使用扇形交換器,則不會匹配路由Key。
分佈式系統消息中間件——RabbitMQ的使用基礎篇


  • direct:direct交換器,會把消息路由到RoutingKey與BindingKey完全匹配的隊列中。
分佈式系統消息中間件——RabbitMQ的使用基礎篇


  • topic:完全匹配BindingKey和RoutingKey的direct交換器 有些時候並不能滿足實際業務的需求。topic 類型的交換器在匹配規則上進行了擴展,它與direct 類型的交換器相似,也是將消息路由到BindingKey 和RoutingKey 相匹配的隊
  • 列中,但這裡的匹配規則有些不同,它約定:
  • RoutingKey 為一個點號"."分隔的字符串(被點號"."分隔開的每一段獨立的字符
  • 串稱為一個單詞)λ,如"hs.rabbitmq.client","com.rabbit.client"等。
  • BindingKey 和RoutingKey 一樣也是點號"."分隔的字符串;
  • BindingKey 中可以存在兩種特殊字符串"*"和"#",用於做模糊匹配,其中"*"用於匹配一個單詞,"#"用於匹配多規格單詞(可以是零個)。
分佈式系統消息中間件——RabbitMQ的使用基礎篇

如圖:

​ · 路由鍵為" apple.rabbit.client" 的消息會同時路由到Queuel 和Queue2;

​ · 路由鍵為" orange.mq.client" 的消息只會路由到Queue2 中:

​ · 路由鍵為" apple.mq.demo" 的消息只會路由到Queue2 中:

​ · 路由鍵為" banana.rabbit.demo" 的消息只會路由到Queuel 中:

​ · 路由鍵為" orange.apple.banana" 的消息將會被丟棄或者返回給生產者因為它沒有匹配任何路由鍵。

  • header:headers 類型的交換器不依賴於路由鍵的匹配規則來路由消息,而是根據發送的消息內容中
  • 的headers 屬性進行匹配。在綁定隊列和交換器時制定一組鍵值對, 當發送消息到交換器時,
  • RabbitMQ 會獲取到該消息的headers (也是一個鍵值對的形式) ,對比其中的鍵值對是否完全
  • 匹配隊列和交換器綁定時指定的鍵值對,如果完全匹配則消息會路由到該隊列,否則不會路由
  • 到該隊列。(注:該交換器類型性能較差且不實用,因此一般不會用到)。

瞭解了上面的概念,我們再來思考消息是如何從交換器到隊列的。首先Rabbit在接收到消息時,會解析消息的標籤從而得到消息的交換器與路由key信息。然後根據交換器的類型、路由key以及該交換器和隊列的綁定關係來決定消息最終投遞到哪個隊列裡面。

三 RabbitMQ使用

3.1 RabbitMQ安裝

這裡我們基於docker來安裝。

3.1.1 拉取鏡像

docker pull rabbitmq:management

3.1.2 啟動容器

docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

3.2 RabbitMQ 客戶端開發使用

這裡我們以dotnet平臺下RabbitMQ.Client3.6.9(可以從nuget中下載)為示例,簡單介紹dotnet平臺下對RabbitMQ的簡單操作。更詳細的內容可以從nuget中下載源碼和文檔進行查看。

3.2.1 連接Rabbit

 ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "admin";//用戶名
factory.Password = "admin";//密碼
factory.HostName = "192.168.17.205";//主機名
factory.VirtualHost = "";//虛擬主機(這個暫時不需要,稍後的文章裡會介紹虛擬主機的概念)
factory.Port = 15672;//端口
IConnection conn = factory.CreateConnection();//創建連接

3.2.2 創建信道

 IModel channel = conn.CreateModel();

說明:Connection 可以用來創建多個Channel 實例,但是Channel 實例不能在線程問共享,應用程序應該為每一個線程開闢一個Channel 。某些情況下Channel 的操作可以併發運行,但是在其他情況下會導致在網絡上出現錯誤的通信幀交錯,同時也會影響友送方確認( publisherconfrrm)機制的運行,所以多線程問共享Channel實例是非線程安全的。

3.2.3 交換器、隊列和綁定

 channel.ExchangeDeclare("exchangeName", "direct", true);
String queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queueName, "exchangeName", "routingKey");

如上創建了一個持久化的、非自動刪除的、綁定類型為direct 的交換器,同時也創建了一個非持久化的、排他的、自動刪除的隊列(此隊列的名稱由RabbitMQ 自動生成)。這裡的交換器和隊列也都沒有設置特殊的參數。

上面的代碼也展示瞭如何使用路由鍵將隊列和交換器綁定起來。上面聲明的隊列具備如下特性: 只對當前應用中同一個Connection 層面可用,同一個Connection 的不同Channel可共用,並且也會在應用連接斷開時自動刪除。

上述方法根據參數不同,可以有不同的重載形式,根據自身的需要進行調用。

ExchangeDeclare方法詳解:

ExchangeDeclare有多個重載方法,這些重載方法都是由下面這個方法中缺省的某些參數構成的。

void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments);
  • exchange : 交換器的名稱。
  • type : 交換器的類型,常見的如fanout、direct 、topic
  • durable: 設置是否持久化。durab l e 設置為true 表示持久化, 反之是非持久化。持久化可以將交換器存盤,在服務器重啟的時候不會丟失相關信息。
  • autoDelete : 設置是否自動刪除。autoDelete 設置為true 則表示自動刪除。自動刪除的前提是至少有一個隊列或者交換器與這個交換器綁定,之後所有與這個交換器綁定的隊列或者交換器都與此解綁。注意不能錯誤地把這個參數理解為:"當與此交換器連接的客戶端都斷開時, RabbitMQ 會自動刪除本交換器"。
  • internal : 設置是否是內置的。如果設置為true ,則表示是內置的交換器,客戶端程序無法直接發送消息到這個交換器中,只能通過交換器路由到交換器這種方式。
  • argument : 其他一些結構化參數,比如alternate - exchange。

QueueDeclare方法詳解:

QueueDeclare只有兩個重載。

 QueueDeclareOk QueueDeclare();

QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments);

不帶任何參數的queueDeclare 方法默認創建一個由RabbitMQ 命名的(類似這種amq.gen-LhQzlgv3GhDOv8PIDabOXA 名稱,這種隊列也稱之為匿名隊列〉、排他的、自動刪除的、非持久化的隊列。

  • queue : 隊列的名稱。
  • durable: 設置是否持久化。為true 則設置隊列為持久化。持久化的隊列會存盤,在服務器重啟的時候可以保證不丟失相關信息。
  • exclusive : 設置是否排他。為true 則設置隊列為排他的。如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。這裡需要注意三點:排他隊列是基於連接( Connection) 可見的,同一個連接的不同信道(Channel)是可以同時訪問同一連接創建的排他隊列; "首次"是指如果一個連接己經聲明瞭一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同:即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除,這種隊列適用於一個客戶端同時發送和讀取消息的應用場景。
  • autoDelete: 設置是否自動刪除。為true 則設置隊列為自動刪除。自動刪除的前提是:至少有一個消費者連接到這個隊列,之後所有與這個隊列連接的消費者都斷開時,才會自動刪除。不能把這個參數錯誤地理解為:當連接到此隊列的所有客戶端斷開時,這個隊列自動刪除",因為生產者客戶端創建這個隊列,或者沒有消費者客戶端與這個隊列連接時,都不會自動刪除這個隊列。
  • argurnents: 設置隊列的其他一些參數,如x-rnessage-ttl、x-expires、x-rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key, x-rnax-priority等。

注意:生產者和消費者都能夠使用queueDeclare 來聲明一個隊列,但是如果消費者在同一個信道上訂閱了另一個隊列,就無法再聲明隊列了。必須先取消訂閱,然後將信道直為"傳輸"模式,之後才能聲明隊列。

QueueBind 方法詳解:

將隊列和交換器綁定的方法如下:

void QueueBind(string queue, string exchange, string routingKey, IDictionary arguments);
  • queue: 隊列名稱:
  • exchange: 交換器的名稱:
  • routingKey: 用來綁定隊列和交換器的路由鍵;
  • argument: 定義綁定的一些參數。

將隊列與交換器解綁的方法如下:

QueueUnbind(string queue, string exchange, string routingKey, IDictionary arguments);

其參數與綁定意義相同。

注:除隊列可以綁定交換器外,交換器同樣可以綁定隊列。即:ExchangeBind方法,其使用方式與隊列綁定相似。

3.2.4 發送消息

發送消息可以使用BasicPublish方法。

void BasicPublish(string exchange, string routingKey, bool mandatory,IBasicProperties basicProperties, byte[] body);
  • exchange: 交換器的名稱,指明消息需要發送到哪個交換器中。如果設置為空字符串,則消息會被髮送到RabbitMQ 默認的交換器中。
  • routingKey : 路由鍵,交換器根據路由鍵將消息存儲到相應的隊列之中。
  • basicProperties: 消息的基本屬性集。
  • body : 消息體( pay1oad ),真正需要發送的消息。
  • mandatory: 是否將消息返回給生產者(會在後續的文章中介紹這個參數).

3.2.5 消費消息

RabbitMQ 的消費模式分兩種: 推(Push)模式和拉(Pull)模式。推模式採用BasicConsume

進行消費,而拉模式則是調用BasicGet進行消費。

推模式:

 EventingBasicConsumer consumer = new EventingBasicConsumer(channel);//定義消費者對象
consumer.Received += (model, ea) =>
{
//do someting;
channel.BasicAck(ea.DeliveryTag, multiple: false);//確認
};
channel.BasicConsume(queue: "queueName",
noAck: false,
consumer: consumer);//訂閱消息
string BasicConsume(string queue, bool noAck, string consumerTag, bool noLocal, bool exclusive, IDictionary arguments, IBasicConsumer consumer);
  • queue : 隊列的名稱:
  • noAck : 設置是否需要確認,false為需要確認。
  • consumerTag: 消費者標籤,用來區分多個消費者:
  • noLocal : 設置為true 則表示不能將同一個Connection中生產者發送的消息傳送給這個Connection中的消費者:
  • exclusive : 設置是否排他
  • arguments : 設置消費者的其他參數
  • consumer: 指定處理消息的消費者對象。

拉模式

BasicGetResult result = channel.BasicGet("queueName", noAck: false);//獲取消息
channel.BasicAck(result.DeliveryTag, multiple: false);//確認

3.2.6 關閉連接

在應用程序使用完之後,需要關閉連接,釋放資源:

channel.close();
conn.close() ;

顯式地關閉Channel 是個好習慣,但這不是必須的,在Connection 關閉的時候,Channel 也會自動關閉。

結束語

以上簡單介紹了分佈式系統中消息中間件的概念與作用,以及RabbitMQ的一些基本概念與簡單使用。下一篇文章將繼續針對RabbitMQ進行總結。主要內容包括何時創建隊列、RabbitMQ的確認機制、過期時間的使用、死信隊列、以及利用RabbitMQ實現延遲隊列......

分佈式系統消息中間件——RabbitMQ的使用基礎篇

需要關於分佈式架構方面資料的讀者可以關注我然後私信回覆我“分佈式”即可獲取免費的資料鏈接。


分享到:


相關文章: