Java中間件實戰系列(1)- RabbitMQ死信與延遲隊列的區別與實現

對於消息中間件RabbitMQ,想必各位小夥伴並不陌生,其廣泛應用程度不言而喻,此前我們也在許多課程以及諸多專欄文章中介紹了它的應用,其應用場景也是相當廣泛的,像什麼消息異步通信、服務模塊解耦、高併發流量削峰、訂單超時未支付自動失效等等都是實際項目中最為常見的場景。本文我們將重點介紹並實現RabbitMQ的死信與延時隊列,並將兩者做一個簡單的對比!

內容

對於RabbitMQ的死信隊列,此前我們在"Java秒殺系統"這一技術專欄中已經有重點介紹過了,在那裡我們是將其應用於 "訂單超時未支付自動失效"這一業務場景中,簡而言之,"死信隊列"是一種特殊的"隊列",跟普通的隊列相比,具有"延遲處理任務"的特性。

而在消息中間件RabbitMQ的架構組件中,也存在著跟"死信隊列"在功能特性方面幾乎相同的組件,那就是"延遲隊列/延時隊列",同樣也具有"延遲、延時處理任務"的功效!

當然啦,這兩者還是有一丟丟區別的,最直觀的當然是名字上啦,從名字上你就可以看出來兩者的"處事風格"是不一樣的,具體體現在:

一、創建上的差異:

(1)RabbitMQ的死信隊列DeadQueue是由"死信交換機DLX"+"死信路由DLK"組成的,當然,可能還會有"TTL",而DLX和DLK又可以綁定指向真正的隊列RealQueue,這個隊列RealQueue便是"消費者"真正監聽的對象.

(2)而RabbitMQ的延遲/延時隊列DelayedQueue 則是由普通的隊列來創建即可,唯一不同的地方在於其綁定的交換機為自定義的交換機,即"CustomExchange",在創建該交換機時只需要指定其消息的類型為 "x-delayed-message"即可."消費者"真正監聽的隊列也是它本人,即DelayedQueue

畫外音:從這一點上看,延遲/延時隊列的創建相對而言簡單一些!


二、功能特性上的差異:

(1)死信隊列在實際應用時雖然可以實現"延時、延遲處理任務"的功效,但進入死信中的消息卻依然保留了隊列的特性,即"FIFO" ~ 先進先出,而不管先後進入隊列中消息的TTL的值. 即假設先後進入死信的消息為A、B、C,各自的TTL分別為:10s、3s、5s,理論上TTL先後到達的順序是:B、C、A,然後從死信出來,最終被路由到真正的隊列中,即消息被消費的先後順序應該為:B、C、A,然而現實卻是殘酷的,其最終消費的消息的順序為:A、B、C,即"消息是怎麼進去的,就怎麼出來",保留了所謂的FIFO特性.

(2)或許是因為死信有這種缺陷,所以RabbitMQ提供了另一種組件,即"延遲隊列",它可以很完美的解決上面死信出現的問題,即最終消費的消息的順序為:B、C、A,我們將在下面用實際的代碼進行實戰實現與演練.


三、插件安裝上的差異:

(1)死信不需要額外的插件

(2)但是延遲隊列在實際項目使用時卻需要在Mq Server中安裝一個插件,它的名字叫做:"rabbitmq_delayed_message_exchange",其安裝過程可以參考鏈接: 裡面就提供了Windows環境和Linux環境下的插件的安裝過程(很簡單,只需要不到3步的步驟.)


四、代碼的實戰實現~RabbitMQ的死信隊列

說了這麼多,想必有些小夥伴有點不耐煩了,下面我將採用實際的代碼對上面所介紹的幾點區別進行實現與演練(代碼都是基於Spring Boot2.0搭建的項目環境實現與測試的)

(1)首先,我們需要創建死信隊列以及真正的隊列,並實現相關的綁定:

<code>//構建訂單超時未支付的死信隊列消息模型
@Bean
public Queue successKillDeadQueue(){
Map<string> argsMap= Maps.newHashMap();
argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);
}

//基本交換機
@Bean
public TopicExchange successKillDeadProdExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);

}

//創建基本交換機+基本路由 -> 死信隊列 的綁定
@Bean
public Binding successKillDeadProdBinding(){
return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
}

//真正的隊列
@Bean
public Queue successKillRealQueue(){
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
}

//死信交換機
@Bean
public TopicExchange successKillDeadExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
}

//死信交換機+死信路由->真正隊列 的綁定
@Bean
public Binding successKillDeadBinding(){
return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
}/<string>/<code>


(2)將項目運行起來,登錄RabbitMQ的後端控制檯,可以看到成功創建了相應的死信隊列和真正的隊列等組件,如下圖所示:


Java中間件實戰系列(1)- RabbitMQ死信與延遲隊列的區別與實現

(3)緊接著,我們在Controller中建立一個請求方法,用於接收前端請求過來的消息,並將該消息附以TTL值,塞入死信隊列中,如下所示:

<code>//死信隊列-生產者
@RequestMapping(value = "dead/msg/send",method = RequestMethod.GET)
@ResponseBody
public BaseResponse sendDQMsg(@RequestParam String msg,@RequestParam Long ttl){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
Message realMsg=MessageBuilder.withBody(msg.getBytes("UTF-8")).build();

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"), env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"), realMsg, message -> {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

//TODO:動態設置TTL
mp.setExpiration(String.valueOf(ttl));

log.info("死信隊列生產者-發出消息:{} TTL:{}",msg,ttl);
return message;
});
}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}/<code>

(4)最後是寫一個Spring Bean類充當消費者,在其中監聽"實際隊列"的消息:

<code>@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
public void consumeExpireOrder(@Payload byte[] msg){
try {
log.info("死信隊列-監聽者-接收消息:{}",new String(msg,"UTF-8"));

}catch (Exception e){
log.error("死信隊列-監聽者-發生異常:",e.fillInStackTrace());

}
}/<code>

最後,我們進入測試環節,打開Postman,前後輸入3次不同的請求信息,其中各自的TTL也不盡相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最終在Console控制檯等待,你會發現消費者監聽的消息的順序為:A、B、C,而不是C、B、A,如下圖所示:


Java中間件實戰系列(1)- RabbitMQ死信與延遲隊列的區別與實現


Java中間件實戰系列(1)- RabbitMQ死信與延遲隊列的區別與實現

五、代碼的實戰實現~RabbitMQ的延遲/延時隊列

很明顯,由於死信存在的這個缺陷,故而其在上面的應用場景中是不太適用的!即死信隊列在 消息的TTL不一致,且後入死信的消息TTL小於前入的消息TTL的應用場景中是不適用的,而像"訂單超時未支付"的應用場景,因為大家都一樣,都是固定的30min或者 1h,故而這種場景,死信是相當適合的。

因此,為了解決實際項目中"TTL不一致且不固定"的應用場景,我們需要搬上"延遲/延時隊列"(當然啦,Redisson的延遲/延遲隊列也是可以實現的!),下面我們用代碼加以實現!

(1)首先是創建"延遲/延時隊列"等相關的組件,如下所示;

<code>\t//TODO:RabbitMQ延遲隊列
@Bean
public Queue delayQueue(){
return QueueBuilder.durable(env.getProperty("mq.kill.delay.queue")).build();
}

@Bean
public CustomExchange delayExchange(){
Map<string> map=Maps.newHashMap();
map.put("x-delayed-type","direct");
return new CustomExchange(env.getProperty("mq.kill.delay.exchange"),"x-delayed-message",true,false,map);
}

@Bean
public Binding delayBinding(){
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(env.getProperty("mq.kill.delay.routingKey")).noargs();
}
/<string>/<code>

(2)其生產者發送消息的代碼我們仍然是放在一個Controller的請求方法中,如下所示:

<code>//延遲隊列-生產者
@RequestMapping(value = "delay/msg/send",method = RequestMethod.GET)
@ResponseBody
public BaseResponse sendDelayMsg(@RequestParam String msg,@RequestParam Long ttl){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
String info=msg;

Message realMsg=MessageBuilder.withBody(info.getBytes("UTF-8")).build();
rabbitTemplate.convertAndSend(env.getProperty("mq.kill.delay.exchange"),env.getProperty("mq.kill.delay.routingKey"),

realMsg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
mp.setHeader("x-delay",ttl);

log.info("延遲隊列生產者-發出消息:{} TTL:{}",msg,ttl);
return message;
}
});

}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}/<code>

(3)最後是用於監聽延遲隊列中消息的消費者的代碼,如下所示:

<code>/**
* 延時隊列-消息監聽器-消費者
* @Author:debug (SteadyJack)
* @Link: weixin-> debug0868 qq-> 1948831260
**/
@Component
public class DelayQueueMqListener {
private static final Logger log= LoggerFactory.getLogger(DelayQueueMqListener.class);

//消息監聽
@RabbitListener(queues = {"${mq.kill.delay.queue}"})
public void consumeMsg(@Payload byte[] msg){
try {
String info=new String(msg,"UTF-8");

log.info("延時隊列監聽到消息:{} ",info);
}catch (Exception e){
log.error("延時隊列-消息監聽器-消費者-消息監聽-發生異常:",e.fillInStackTrace());
}
}
}/<code>

(4)將項目跑起來,可以看到RabbitMQ的後端控制檯已經建立了該隊列,如下圖所示:


Java中間件實戰系列(1)- RabbitMQ死信與延遲隊列的區別與實現

(5)最後,我們打開postman,前後輸入3次不同的請求信息,其中各自的TTL也不盡相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最終在Console控制檯等待,你會發現消費者監聽的消息的順序為:A、B、C,而不是C、B、A,如下圖所示:

Java中間件實戰系列(1)- RabbitMQ死信與延遲隊列的區別與實現

從該運行結果上看,會發現這才是我們真正想要的結果,即按照時間TTL的大小來決定消息被消費的先後順序,而且,你可以看出消費時的時間跟發出的時間剛好差 TTL !


在文章的最後的,我們簡單總結一下本文所講的內容,即主要介紹、對比並實戰了RabbitMQ中兩款具有"延時、延遲處理任務"功效的組件,即"死信隊列"和"延遲隊列",其差異性主要體現在:創建上的不同、功能特性的不同、插件安裝上的不同等方面。

總體來說,如果是想追求消息傳輸的穩定性、可靠性且TTL是固定的話,那麼建議選擇"死信隊列",因為消息從一開始就在隊列中待著,等到TTL一到才被路由到真正的隊列!而"延遲隊列"則不同,即發送出去的消息需要等待 TTL 的時間才進入"延遲隊列",如果在等待的期間,Mq Server 宕機了,那很可能消息就丟失了…..

課程觀看: https://www.ixigua.com/i6806173584910713356/


分享到:


相關文章: