RabbitMQ實戰進階祕籍,應用場景分析,RabbitMQ如何提升系統性能

0、引言

上次課我們僅說了RabbitMQ是一種消息中間件,可以異步的處理客戶端發送的消息,但是我們並沒有將什麼是消息中間件,使用消息中間件會給我們的系統帶來怎樣的性能提升?這次課我們就來說一下什麼是消息中間件,我們通過一個案例來引出消息中間件,說明消息中間件在我們這個系統中扮演什麼樣重要的角色?

如果你對文章有什麼疑惑或者不理解的地方,可以參考如下視頻講解:對本文內容有更加詳盡的介紹,分析,並手把手演示

1、什麼是消息中間件

上節課我們僅說了什麼是RabbitMQ,但並沒有說什麼是消息中間件,為什麼要使用消息中間件?

1)問題引入:

假設我們現在需要設計這樣一個用戶註冊系統:用戶註冊完成後,需要給用戶發送激活郵件,開通用戶賬號,記錄用戶IP、用戶設備、時間等信息。

起初的設計:

RabbitMQ實戰進階秘籍,應用場景分析,RabbitMQ如何提升系統性能

2)但存在的問題是

由於多個系統強耦合在一起,用戶註冊響應會非常慢,嚴重影響了用戶的體驗,當流量大的時候,性能會更差。

3)引入消息中間件:

為了解決上述問題,我們引入消息中間件,來實現系統的解耦,多個系統間通過消息中間件進行異步通信,最終的設計圖如下:

RabbitMQ實戰進階秘籍,應用場景分析,RabbitMQ如何提升系統性能

即實現了系統解耦,又提升了系統響應的速度

4)消息中間件介紹:

消息中間件(Message Queue Middleware,簡稱MQ)又稱為消息隊列,是指利用高效可靠的消息傳遞機制進行與平臺無關的數據交流,並基於數據通信來進行分佈式系統的構建。

RabbitMQ實戰進階秘籍,應用場景分析,RabbitMQ如何提升系統性能

2、應用場景分析

1)異步通信

在很多時候,為了加快應用系統整體運轉速度,並不需要立即響應某些請求,消息中間件提供了異步處理機制,允許將一些請求信息放入消息中間件中,但並不立即處理它,而是慢慢處理。在有限資源下,使用消息中間件能夠使系統性能從容倍增!

如:用戶註冊成功的郵件通知;用戶購物下單的信息通知;大數據日誌收集處理

2)削峰

以防突發劇增流量瞬間沖垮系統,使用消息中間件可以支撐突發訪問壓力

3)業務系統解耦

系統間的耦合關係太強,會對系統的設計產生束縛,也會增加系統的複雜性,通過消息中間件可以更好的設計系統,是一個系統完成指定的功能,而不是將所有的功能融合在同一個系統中。

3、回顧下上節課的內容

RabbitMQ實戰進階秘籍,應用場景分析,RabbitMQ如何提升系統性能

關於上節課說的binding key、routing key,我這裡再解釋下,有點難以理解

其實兩者是一樣的,只不過是不同的叫法而已,我們可以統一叫它routing key

4、詳解RabbitMQ運作流程

RabbitMQ實戰進階秘籍,應用場景分析,RabbitMQ如何提升系統性能

相對於上節課講的概念,又增加了virtual host、connection、channel

Virtual Host: 每個virtual host本質上都是一個RabbitMQ Server(但是一個server中可以有多個virtual host),擁有它自己若干的個Exchange、Queue和bings rule等等。其實這是一個虛擬概念,類似於權限控制組。Virtual Host是權限控制的最小粒度。

Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。接下來的實踐案例中我們就可以看到,producer和consumer與exchange的通信的前提是先建立TCP連接。

僅僅創建了TCP連接,producer和consumer與exchange還是不能通信的。我們還需要為每一個Connection創建Channel。

Channel: 它是建立在上述TCP連接之上的虛擬連接。數據傳輸都是在Channel中進行的。AMQP協議規定只有通過Channel才能執行AMQP的命令。一個Connection可以包含多個Channel。

有人要問了,為什麼要使用Channel呢,直接用TCP連接不就好了麼?

對於一個消息服務器來說,它的任務是處理海量的消息,當有很多線程需要從RabbitMQ中消費消息或者生產消息,那麼必須建立很多個connection,也就是許多個TCP連接。然而對於操作系統而言,建立和關閉TCP連接是非常昂貴的開銷,而且TCP的連接數也有限制,頻繁的建立關閉TCP連接對於系統的性能有很大的影響,如果遇到高峰,性能瓶頸也隨之顯現。RabbitMQ採用類似NIO的做法,選擇TCP連接服用,不僅可以減少性能開銷,同時也便於管理。在TCP連接中建立Channel是沒有上述代價的,可以複用TCP連接。對於Producer或者Consumer來說,可以併發的使用多個Channel進行Publish或者Receive。

有實驗表明,在Channel中,1秒可以Publish10K的數據包。對於普通的Consumer或者Producer來說,這已經足夠了。除非有非常大的流量時,一個connection可能會產生性能瓶頸,此時就需要開闢多個connection。

5、實戰演練生產、消費,串講整個運作流程

代碼倉庫:https://gitee.com/jikeh/JiKeHCN-RELEASE.git

項目名:rabbitmqdemo

項目代碼中有詳細的運轉流程註釋,這裡就不細講了……

查看Tcp連接:

RabbitMQ實戰進階秘籍,應用場景分析,RabbitMQ如何提升系統性能

查看信道連接:

RabbitMQ實戰進階秘籍,應用場景分析,RabbitMQ如何提升系統性能

6、拓展:使用默認交換器完成最簡單的案例

代碼倉庫:https://gitee.com/jikeh/JiKeHCN-RELEASE.git

項目名:rabbitmq-base-demo

RabbitMQ實戰進階秘籍,應用場景分析,RabbitMQ如何提升系統性能

生產者(producer)把消息發送到一個名為“hello”的隊列中。

消費者(consumer)從這個隊列中獲取消息。

(1)引入jar包

com.rabbitmq

amqp-client

LATEST

log4j

log4j

1.2.17

org.slf4j

slf4j-log4j12

1.6.6

(2)生產者發送消息

private static final String QUUE_NAME = "hello";

private static final String IP_ADDRESS = "localhost";

private static final int PORT = 5672;//默認端口號為5672

private static final String USER_NAME = "guest";

private static final String USER_PWD = "guest";

1)創建和RabbitMQ的連接

//初始化TCP連接參數
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername(USER_NAME);
factory.setPassword(USER_PWD);
factory.setVirtualHost("/");
//創建連接
Connection connection = factory.newConnection();
//創建信道
Channel channel = connection.createChannel();
/**
* 語法格式:queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
* 創建一個持久化、非排他的、非自動刪除的隊列
* exclusive:排他的 由該客戶端獨佔,一般設為false
* autoDelete:消息消費完,該隊列就會被刪除,一般為false
*/
channel.queueDeclare(QUUE_NAME, true, false, false, null);

3)發送消息

//發送一條持久化的消息
//MessageProperties.PERSISTENT_TEXT_PLAIN:是以純文本格式發送
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

4)關閉連接,信道

//關閉信道
channel.close();
//關閉連接
connection.close();

(3)消費者獲取消息

private static final String QUUE_NAME = "hello";
private static final String IP_ADDRESS = "localhost";
private static final int PORT = 5672;//默認端口號為5672
private static final String USER_NAME = "guest";
private static final String USER_PWD = "guest";

1)創建和RabbitMQ的連接,並且聲明隊列

//初始化TCP連接參數
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USER_NAME);
factory.setPassword(USER_PWD);
Address[] addresses = new Address[]{
new Address(IP_ADDRESS, PORT)
};
//創建連接
Connection connection = factory.newConnection(addresses);
//創建信道
final Channel channel = connection.createChannel();

2)需要為隊列定義一個回調(callback)函數。當我們獲取到消息的時候,就會調用此回調函數

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.printf("receive message:" + new String(body));
/**
* true to acknowledge all messages up to and including the supplied delivery tag; false to acknowledge just the supplied delivery tag.

* true:確認包括傳輸標記之前的所有信息;
* false:僅確認該傳輸標記的消息
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
};

3)我們需要告訴RabbitMQ這個回調函數將會從名為"hello"的隊列中接收消息

//channel.basicConsume(QUEUE_NAME, true, consumer);//自動確認消息
channel.basicConsume(QUUE_NAME, consumer);

4)關閉連接,信道

channel.close();
connection.close();

默認交換器,默認綁定關係:

RabbitMQ實戰進階秘籍,應用場景分析,RabbitMQ如何提升系統性能

RabbitMQ實戰進階秘籍,應用場景分析,RabbitMQ如何提升系統性能


分享到:


相關文章: