RabbitMQ在分佈式系統的應用

由於之前做的項目中需要在多個節點之間可靠地通信,所以廢棄了之前使用的Redis pub/sub(因為集群有單點問題,且有諸多限制),改用了RabbitMQ。使用期間得到不少收穫,也踩了不少坑,所以在此分享下心得。

怎麼保證可靠性的?

RabbitMQ提供了幾種特性,犧牲了一點性能代價,提供了可靠性的保證。

持久化

當RabbitMQ退出時,默認會將消息和隊列都清除,所以需要在第一次聲明隊列和發送消息時指定其持久化屬性為true,這樣RabbitMQ會將隊列、消息和狀態存到RabbitMQ本地的數據庫,重啟後會恢復。

接收應答

客戶端接收消息的模式默認是自動應答,但是通過設置autoAck為false可以讓客戶端主動應答消息。當客戶端拒絕此消息或者未應答便斷開連接時,就會使得此消息重新入隊(在版本2.7.0以前是到重新加入到隊尾,2.7.0及以後是保留消息在隊列中的原來位置)。

發送確認

默認情況下,發送端不關注發出去的消息是否被消費掉了。可設置channel為confirm模式,所有發送的消息都會被確認一次,用戶可以自行根據server發回的確認消息查看狀態。詳細介紹見:confirms

事務:和confirm模式不能同時使用,而且會帶來大量的多餘開銷,導致吞吐量下降很多,故而不推薦。

消息隊列的高可用(主備模式)

相比於路由和綁定,可以視為是共享於所有的節點的,消息隊列默認只存在於第一次聲明它的節點上,這樣一旦這個節點掛了,這個隊列中未處理的消息就沒有了。 幸好,RabbitMQ提供了將它備份到其他節點的機制,任何時候都有一個master負責處理請求,其他slaves負責備份,當master掛掉,會將最早創建的那個slave提升為master。

命令:rabbitmqctl set_policy ha-all “^ha\\.” ‘{“ha-mode”:”all”}’:設置所有以’ha’開頭的queue在所有節點上擁有備份。詳細語法點這裡;也可以在界面上配置。

RabbitMQ在分佈式系統的應用

順序保證

直接上圖好了:

一些需要注意的地方

集群配置:

一個集群中多個節點共享一份.erlang.cookie文件;若是沒有啟用RABBITMQ_USE_LONGNAME,需要在每個節點的hosts文件中指定其他節點的地址,不然會找不到其他集群中的節點。

腦裂:

RabbitMQ集群對於網絡分區的處理和忍受能力不太好,推薦使用federation或者shovel插件去解決。federation詳見高級->Federation。但是,情況已經發生了,怎麼去解決呢?放心,還是有辦法恢復的。當網絡斷斷續續時,會使得節點之間的通信斷掉,進而造成集群被分隔開的情況。這樣,每個小集群之後便只處理各自本地的連接和消息,從而導致數據不同步。當重新恢復網絡連接時,它們彼此都認為是對方掛了-_-||,便可以判斷出有網絡分區出現了。但是RabbitMQ默認是忽略掉不處理的,造成兩個節點繼續各自為政(路由,綁定關係,隊列等可以獨立地創建刪除,甚至主備隊列也會每一方擁有自己的master)。可以更改配置使得連接恢復時,會根據配置自動恢復。

ignore:默認,不做任何處理

pause-minority:斷開連接時,判斷當前節點是否屬於少數派(節點數少於或者等於一半),如果是,則暫停直到恢復連接。

{pause_if_all_down, [nodes], ignore | autoheal}:斷開連接時,判斷當前集群中節點是否有節點在nodes中,如果有,則繼續運行,否則暫停直到恢復連接。這種策略下,當恢復連接時,可能會有多個分區存活,所以,最後一個參數決定它們怎麼合併。

autoheal:當恢復連接時,選擇客戶端連接數最多的節點狀態為主,重啟其他節點。

多次ack:客戶端多次應答同一條消息,會使得該客戶端收不到後續消息。

結合Docker使用

集群版本的實現:詳見我自己寫的一個例子rabbitmq-server-cluster

消息隊列中間件的比較

RabbitMQ:

優點:支持很多協議如:AMQP,XMPP,STMP,STOMP;靈活的路由;成熟穩定的集群方案;負載均衡;數據持久化等。

缺點:速度較慢;比較重量級,安裝需要依賴Erlang環境。

Redis:

優點:比較輕量級,易上手

缺點:單點問題,功能單一

Kafka:

優點:高吞吐;分佈式;快速持久化;負載均衡;輕量級

缺點:極端情況下會丟消息

最後附一張網上截取的測試結果:

RabbitMQ在分佈式系統的應用

更多性能參數見:RabbitMQ Performance Measurements

如果有興趣簡單瞭解下RabbitMQ,可以繼續往下看~

幾個重要的概念

Virtual Host:包含若干個Exchange和Queue,表示一個節點;

Exchange:接受客戶端發送的消息,並根據Binding將消息路由給服務器中的隊列,Exchange分為direct、fanout、topic三種;

Binding:連接Exchange和Queue,包含路由規則;

Queue:消息隊列,存儲還未被消費的消息;

Message:Header+Body;

Channel:通道,執行AMQP的命令;一個連接可創建多個通道以節省資源。

Client

RabbitMQ官方實現了很多熱門語言的客戶端,就不一一列舉啦,以java為例,直接開始正題:

建立連接:

可以加上斷開重試機制:

創建連接和通道:

一對一:一個生產者,一個消費者

RabbitMQ在分佈式系統的應用

生產者:

消費者:

一對多:一個生產者,多個消費者

RabbitMQ在分佈式系統的應用

代碼同上,只不過會有多個消費者,消息會輪序發給各個消費者。

如果設置了autoAck=false,那麼可以實現公平分發(即對於某個特定的消費者,每次最多隻發送指定條數的消息,直到其中一條消息應答後,再發送下一條)。需要在消費者中加上:

其他同上。

廣播

RabbitMQ在分佈式系統的應用

生產者:

消費者同上。

Routing:指定路由規則

RabbitMQ在分佈式系統的應用

生產者:

消費者同上。

Topics:支持通配符的Routing

RabbitMQ在分佈式系統的應用

生產者:

消費者同上。

RPC

RabbitMQ在分佈式系統的應用

其實就是一對一模式的一種用法:

首先,客戶端發送一條消息到服務端聲明的隊列,消息屬性中包含reply_to和correlation_id

然後,服務端接收到消息,處理,並返回一條結果到reply_to隊列中,最終,客戶端接收到返回消息,繼續向下處理。

Server

支持各大主流操作系統,這裡以Unix為例介紹下常用配置和命令:

安裝

由於RabbitMQ是依賴於Erlang的,所以得首先安裝最近版本的Erlang。

單點的安裝比較簡單,下載解壓即可。

下載地址:http://www.rabbitmq.com/download.html

配置:(一般的,用默認的即可。)

$RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf: 環境變量默認配置(也可在啟動腳本中設置,且以啟動命令中的配置為準)。常用的有:

RABBITMQ_NODENAME:節點名稱,默認是rabbit@$HOSTNAME。

RABBITMQ_NODE_PORT:協議端口號,默認5672。

RABBITMQ_SERVER_START_ARGS:覆蓋rabbitmq.config中的一些配置。

$RABBITMQ_HOME/etc/rabbitmq/rabbitmq.config: 核心組件,插件,erlang服務等配置,常用的有:

disk_free_limit:隊列持久化等信息都是存到RabbitMQ本地的數據庫中的,默認限制50000000(也就是最多隻讓它使用50M空間啦,不夠可以上調,也支持空閒空間百分比的配置)。要是超標了,它就罷工了……

vm_memory_high_watermark:內存使用,默認0.4(最多讓它使用40%的內存,超標罷工)

(注:若啟動失敗了,可以在啟動日誌中查看到具體的錯誤信息。)

命令:

集群

集群節點共享所有的狀態和數據,如:用戶、路由、綁定等信息(隊列有點特殊,雖然從所有節點都可達,但是隻存在於第一次聲明它的那個節點上,解決方案——詳見上文:消息隊列的高可用);每個節點都可以接收連接,處理數據。

集群節點有兩種,disc:默認,信息存在本地數據庫;ram:加入集群時,添加–ram參數,信息存在內存,可提高性能。

配置:(一般的,用默認的即可。)

$RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf:

RABBITMQ_USE_LONGNAME:默認false,(默Rene的,RABBITMQ_NODENAME中@後面的\\$HOSTNAME是主機名,所以需要集群中每個節點的hosts文件包含其他節點主機名到地址的映射。但是如果設置為true,就可以定義RABBITMQ_NODENAME中的$HOSTNAME為域名了)

RABBITMQ_DIST_PORT:集群端口號,默認RABBITMQ_NODE_PORT + 20000

$RABBITMQ_HOME/etc/rabbitmq/rabbitmq.config:

cluster_nodes:設置後,在啟動時會嘗試自動連接加入的節點並組成集群。

cluster_partition_handling:【詳見上文:網絡分區的處理】

更多詳細的配置見:配置

命令:

(注:如果加入集群失敗,可先查看)

每個節點的$HOME/.erlang.cookie內容一致;

如果hostname是主機名,那麼此hostname和地址的映射需要加入hosts文件中;

如果使用的是域名,那麼需要設置RABBITMQ_USE_LONGNAME為true。

(注:docker版集群的見:rabbitmq-server-cluster)

高級

AMQP協議簡介

RabbitMQ原生支持AMQP 0-9-1並擴展實現了了一些常用的功能:AMQP 0-9-1

包含三層:

模型層: 最高層,提供了客戶端調用的命令,如:queue.declare,basic.ack,consume等。

會話層:將命令從客戶端傳遞給服務器,再將服務器的應答傳遞給客戶端,會話層為這個傳遞過程提供可靠性、同步機制和錯誤處理。

傳輸層:主要傳輸二進制數據流,提供幀的處理、信道複用、錯誤檢測和數據表示。

RabbitMQ在分佈式系統的應用

注:其他協議的支持見:RabbitMQ支持的協議

常用插件

管理界面(神器)

RabbitMQ在分佈式系統的應用

啟動後,執行rabbitmq-plugins enable rabbitmq_management→訪問http://localhost:15672→查看節點狀態,隊列信息等等,甚至可以動態配置消息隊列的主備策略,如下圖:

RabbitMQ在分佈式系統的應用

Federation

啟用Federation插件,使得不同集群的節點之間可以傳遞消息,從而模擬出類似集群的效果。這樣可以有幾點好處:

松耦合:聯合在一起的不同集群可以有各自的用戶,權限等信息,無需一致;此外,這些集群的RabbitMQ和Erlang的版本可以不一致。

遠程網絡連接友好:由於通信是遵循AMQP協議的,故而對斷斷續續的網絡連接容忍度高。

自定義:可以自主選擇哪些組件啟用federation。

幾個概念:

Upstreams:定義上游節點信息,如下:

uri是其上游節點的地址,多個upstream的節點無需在同一集群中。

expires表示斷開連接3600000ms後其上游節點會緩存消息。

Upstream sets:多個Upstream的集合;默認有個all,會將所有的Upstream加進去。

Policies:定義哪些exchanges,queues關聯到哪個Upstream或者Upstream set,如:

將此節點所有以amq.開頭的exchange聯合到上游節點的同名exchange。
注:

由於下游節點的exchange可以繼續作為其他節點的上游,故可設置成循環,廣播等形式。

通過max_hops參數控制傳遞層數。

模擬集群,可以將多個節點兩兩互連,並設置max_hops=1。

RabbitMQ在分佈式系統的應用

如果啟用了管理界面,可以添加:

這樣就可以在界面配置Upstream和Policy了。


分享到:


相關文章: