0、引言
上次課我們僅說了RabbitMQ是一種消息中間件,可以異步的處理客戶端發送的消息,但是我們並沒有將什麼是消息中間件,使用消息中間件會給我們的系統帶來怎樣的性能提升?這次課我們就來說一下什麼是消息中間件,我們通過一個案例來引出消息中間件,說明消息中間件在我們這個系統中扮演什麼樣重要的角色?
如果你對文章有什麼疑惑或者不理解的地方,可以參考如下視頻講解:對本文內容有更加詳盡的介紹,分析,並手把手演示
1、什麼是消息中間件
上節課我們僅說了什麼是RabbitMQ,但並沒有說什麼是消息中間件,為什麼要使用消息中間件?
1)問題引入:
假設我們現在需要設計這樣一個用戶註冊系統:用戶註冊完成後,需要給用戶發送激活郵件,開通用戶賬號,記錄用戶IP、用戶設備、時間等信息。
起初的設計:
2)但存在的問題是:
由於多個系統強耦合在一起,用戶註冊響應會非常慢,嚴重影響了用戶的體驗,當流量大的時候,性能會更差。
3)引入消息中間件:
為了解決上述問題,我們引入消息中間件,來實現系統的解耦,多個系統間通過消息中間件進行異步通信,最終的設計圖如下:
即實現了系統解耦,又提升了系統響應的速度
4)消息中間件介紹:
消息中間件(Message Queue Middleware,簡稱MQ)又稱為消息隊列,是指利用高效可靠的消息傳遞機制進行與平臺無關的數據交流,並基於數據通信來進行分佈式系統的構建。
2、應用場景分析
1)異步通信
在很多時候,為了加快應用系統整體運轉速度,並不需要立即響應某些請求,消息中間件提供了異步處理機制,允許將一些請求信息放入消息中間件中,但並不立即處理它,而是慢慢處理。在有限資源下,使用消息中間件能夠使系統性能從容倍增!
如:用戶註冊成功的郵件通知;用戶購物下單的信息通知;大數據日誌收集處理
2)削峰
以防突發劇增流量瞬間沖垮系統,使用消息中間件可以支撐突發訪問壓力
3)業務系統解耦
系統間的耦合關係太強,會對系統的設計產生束縛,也會增加系統的複雜性,通過消息中間件可以更好的設計系統,是一個系統完成指定的功能,而不是將所有的功能融合在同一個系統中。
3、回顧下上節課的內容
關於上節課說的binding key、routing key,我這裡再解釋下,有點難以理解
其實兩者是一樣的,只不過是不同的叫法而已,我們可以統一叫它routing key
4、詳解RabbitMQ運作流程
相對於上節課講的概念,又增加了virtual host、connection、channel
Virtual Host: 每個virtual host本質上都是一個RabbitMQ Server(但是一個server中可以有多個virtual host),擁有它自己若干的個Exchange、Queue和bings rule等等。其實這是一個虛擬概念,類似於權限控制組。Virtual Host是權限控制的最小粒度。
Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。接下來的實踐案例中我們就可以看到,producer和consumer與exchange的通信的前提是先建立TCP連接。
僅僅創建了TCP連接,producer和consumer與exchange還是不能通信的。我們還需要為每一個Connection創建Channel。
Channel: 它是建立在上述TCP連接之上的虛擬連接。數據傳輸都是在Channel中進行的。AMQP協議規定只有通過Channel才能執行AMQP的命令。一個Connection可以包含多個Channel。
有人要問了,為什麼要使用Channel呢,直接用TCP連接不就好了麼?
對於一個消息服務器來說,它的任務是處理海量的消息,當有很多線程需要從RabbitMQ中消費消息或者生產消息,那麼必須建立很多個connection,也就是許多個TCP連接。然而對於操作系統而言,建立和關閉TCP連接是非常昂貴的開銷,而且TCP的連接數也有限制,頻繁的建立關閉TCP連接對於系統的性能有很大的影響,如果遇到高峰,性能瓶頸也隨之顯現。RabbitMQ採用類似NIO的做法,選擇TCP連接服用,不僅可以減少性能開銷,同時也便於管理。在TCP連接中建立Channel是沒有上述代價的,可以複用TCP連接。對於Producer或者Consumer來說,可以併發的使用多個Channel進行Publish或者Receive。
有實驗表明,在Channel中,1秒可以Publish10K的數據包。對於普通的Consumer或者Producer來說,這已經足夠了。除非有非常大的流量時,一個connection可能會產生性能瓶頸,此時就需要開闢多個connection。
5、實戰演練生產、消費,串講整個運作流程
代碼倉庫:https://gitee.com/jikeh/JiKeHCN-RELEASE.git
項目名:rabbitmqdemo
項目代碼中有詳細的運轉流程註釋,這裡就不細講了……
查看Tcp連接:
查看信道連接:
6、拓展:使用默認交換器完成最簡單的案例
代碼倉庫:https://gitee.com/jikeh/JiKeHCN-RELEASE.git
項目名:rabbitmq-base-demo
生產者(producer)把消息發送到一個名為“hello”的隊列中。
消費者(consumer)從這個隊列中獲取消息。
(1)引入jar包
(2)生產者發送消息
private static final String QUUE_NAME = "hello";
private static final String IP_ADDRESS = "localhost";
private static final int PORT = 5672;//默認端口號為5672
private static final String USER_NAME = "guest";
private static final String USER_PWD = "guest";
1)創建和RabbitMQ的連接
//初始化TCP連接參數
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername(USER_NAME);
factory.setPassword(USER_PWD);
factory.setVirtualHost("/");
//創建連接
Connection connection = factory.newConnection();
//創建信道
Channel channel = connection.createChannel();
/**
* 語法格式:queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Maparguments)
* 創建一個持久化、非排他的、非自動刪除的隊列
* exclusive:排他的 由該客戶端獨佔,一般設為false
* autoDelete:消息消費完,該隊列就會被刪除,一般為false
*/
channel.queueDeclare(QUUE_NAME, true, false, false, null);
3)發送消息
//發送一條持久化的消息
//MessageProperties.PERSISTENT_TEXT_PLAIN:是以純文本格式發送
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
4)關閉連接,信道
//關閉信道
channel.close();
//關閉連接
connection.close();
(3)消費者獲取消息
private static final String QUUE_NAME = "hello";
private static final String IP_ADDRESS = "localhost";
private static final int PORT = 5672;//默認端口號為5672
private static final String USER_NAME = "guest";
private static final String USER_PWD = "guest";
1)創建和RabbitMQ的連接,並且聲明隊列
//初始化TCP連接參數
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USER_NAME);
factory.setPassword(USER_PWD);
Address[] addresses = new Address[]{
new Address(IP_ADDRESS, PORT)
};
//創建連接
Connection connection = factory.newConnection(addresses);
//創建信道
final Channel channel = connection.createChannel();
2)需要為隊列定義一個回調(callback)函數。當我們獲取到消息的時候,就會調用此回調函數
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.printf("receive message:" + new String(body));
/**
* true to acknowledge all messages up to and including the supplied delivery tag; false to acknowledge just the supplied delivery tag.
* true:確認包括傳輸標記之前的所有信息;
* false:僅確認該傳輸標記的消息
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
3)我們需要告訴RabbitMQ這個回調函數將會從名為"hello"的隊列中接收消息
//channel.basicConsume(QUEUE_NAME, true, consumer);//自動確認消息
channel.basicConsume(QUUE_NAME, consumer);
4)關閉連接,信道
channel.close();
connection.close();
默認交換器,默認綁定關係:
閱讀更多 極客慧 的文章