用java實現rabbitmq消息隊列的實例

前言:在這裡我將用java來簡單的實現rabbitMQ。下面我們帶著下面問題來一步步的瞭解和學習rabbitMQ。

1:如果消費者連接中斷,這期間我們應該怎麼辦

2:如何做到負載均衡

3:如何有效的將數據發送到相關的接收者?就是怎麼樣過濾

4:如何保證消費者收到完整正確的數據

5:如何讓優先級高的接收者先收到數據

一:"Hello RabbitMQ"

下面有一幅圖,其中P表示生產者,C表示消費者,紅色部分為消息隊列

用java實現rabbitmq消息隊列的實例

二:項目開始

2.1:首先引入rabbitMQ jar包

 
com.rabbitmq
amqp-client
3.6.5

2.2:創建消費者Producer

/**
* 消息生成者
*/
public class Producer {
public final static String QUEUE_NAME="rabbitMQ.test";
public static void main(String[] args) throws IOException, TimeoutException {
//創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設置RabbitMQ相關信息
factory.setHost("localhost");
//factory.setUsername("lp");
//factory.setPassword("");
// factory.setPort(2088);
//創建一個新的連接
Connection connection = factory.newConnection();
//創建一個通道
Channel channel = connection.createChannel();
// 聲明一個隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello RabbitMQ";
//發送消息到隊列中

channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("Producer Send +'" + message + "'");
//關閉通道和連接
channel.close();
connection.close();
}
}

注1:queueDeclare第一個參數表示隊列名稱、第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、第三個參數為是否是獨佔隊列(創建者可以使用的私有隊列,斷開後自動刪除)、第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數

注2:basicPublish第一個參數為交換機名稱、第二個參數為隊列映射的路由key、第三個參數為消息的其他屬性、第四個參數為發送信息的主體

2.3:創建消費者

public class Customer {
private final static String QUEUE_NAME = "rabbitMQ.test";
public static void main(String[] args) throws IOException, TimeoutException {
// 創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設置RabbitMQ地址
factory.setHost("localhost");
//創建一個新的連接
Connection connection = factory.newConnection();
//創建一個通道
Channel channel = connection.createChannel();
//聲明要關注的隊列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);

System.out.println("Customer Waiting Received messages");
//DefaultConsumer類實現了Consumer接口,通過傳入一個頻道,
// 告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Customer Received '" + message + "'");
}
};
//自動回覆隊列應答 -- RabbitMQ中的消息確認機制
channel.basicConsume(QUEUE_NAME, true, consumer);
}

前面代碼我們可以看出和生成者一樣的,後面的是獲取生產者發送的信息,其中envelope主要存放生產者相關信息(比如交換機、路由key等)body是消息實體。

2.4:運行結果

生產者:

用java實現rabbitmq消息隊列的實例

消費者:

用java實現rabbitmq消息隊列的實例

三:實現任務分發

工作隊列

用java實現rabbitmq消息隊列的實例

一個隊列的優點就是很容易處理並行化的工作能力,但是如果我們積累了大量的工作,我們就需要更多的工作者來處理,這裡就要採用分佈機制了。

我們新創建一個生產者NewTask

public class NewTask {
private static final String TASK_QUEUE_NAME="task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
//分發信息
for (int i=0;i<10;i++){
String message="Hello RabbitMQ"+i;
channel.basicPublish("",TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("NewTask send '"+message+"'");
}
channel.close();
connection.close();
}
}

然後創建2個工作者Work1和Work2代碼一樣

public class Work1 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("Worker1 Waiting for messages");
//每次從隊列獲取的數量
channel.basicQos(1);

final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Worker1 Received '" + message + "'");
try {
throw new Exception();
//doWork(message);
}catch (Exception e){
channel.abort();
}finally {
System.out.println("Worker1 Done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck=false;
//消息消費完成確認
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) {
try {
Thread.sleep(1000); // 暫停1秒鐘
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}

注:channel.basicQos(1);保證一次只分發一個 。autoAck是否自動回覆,如果為true的話,每次生產者只要發送信息就會從內存中刪除,那麼如果消費者程序異常退出,那麼就無法獲取數據,我們當然是不希望出現這樣的情況,所以才去手動回覆,每當消費者收到並處理信息然後在通知生成者。最後從隊列中刪除這條信息。如果消費者異常退出,如果還有其他消費者,那麼就會把隊列中的消息發送給其他消費者,如果沒有,等消費者啟動時候再次發送。

用java實現rabbitmq消息隊列的實例

用java實現rabbitmq消息隊列的實例

用java實現rabbitmq消息隊列的實例


分享到:


相關文章: