靈感來襲,基於Redis的分佈式延遲隊列

延遲隊列,也就是一定時間之後將消息體放入隊列,然後消費者才能正常消費。比如1分鐘之後發送短信,發送郵件,檢測數據狀態等。

2|0Redisson Delayed Queue

如果你項目中使用了redisson,那麼恭喜你,使用延遲隊列將非常的簡單。

靈感來襲,基於Redis的分佈式延遲隊列

基於Redis的Redisson分佈式延遲隊列(Delayed Queue)結構的RDelayedQueue Java對象在實現了RQueue接口的基礎上提供了向隊列按要求延遲添加項目的功能。該功能可以用來實現消息傳送延遲按幾何增長或幾何衰減的發送策略。

<code>RQueue<string> distinationQueue = ...
RDelayedQueue<string> delayedQueue = getDelayedQueue(distinationQueue);
// 10秒鐘以後將消息發送到指定隊列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 一分鐘以後將消息發送到指定隊列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);/<string>/<string>/<code>

在該對象不再需要的情況下,應該主動銷燬。僅在相關的Redisson對象也需要關閉的時候可以不用主動銷燬。

3|0Java DelayQueue

DelayQueue它本質上是一個隊列,而這個隊列裡也只有存放Delayed的子類才有意義。

靈感來襲,基於Redis的分佈式延遲隊列

3|1延遲隊列demo

<code>public class DelayTask implements Delayed {
private long startDate;
public DelayTask(Long delayMillions) {
this.startDate = System.currentTimeMillis() + delayMillions;
}


@Override
public int compareTo(Delayed o) {
Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
}


@Override
public long getDelay(TimeUnit unit) {
return this.startDate - System.currentTimeMillis();
}

public static void main(String[] args) throws Exception {
BlockingQueue<delaytask> queue = new DelayQueue<>();
DelayTask delayTask = new DelayTask(1000 * 5L);
queue.put(delayTask);
while (queue.size()>0){
queue.take();
}
}
}/<delaytask>/<code>

3|2延遲隊列消費原理

靈感來襲,基於Redis的分佈式延遲隊列

源碼中出現了三次await字眼:

  • 第一次是當隊列為空時,等待;
  • 第二次等待是因為,發現有任務,沒有到執行時間,並且有準備執行的線程(leader),那不好意思,還得接續等待直到下一個可執行的任務。
  • 第三次是真正延時的地方了,available.awaitNanos(delay),此時也沒有別的線程要執行,也就是我將要執行,等待剩下的延遲時間即可。

3|3延遲隊列生產原理

靈感來襲,基於Redis的分佈式延遲隊列

為保證消費者正常消費,如果優先隊列頭元素和當前放入元素相等,則說明當前元素消費的優先級高,重置準備消費的線程(leader)為null,喚醒消費者線程重新執行take方法邏輯。

4|0手寫一個Redis延遲隊列

4|1Redis延遲隊列設計

靈感來襲,基於Redis的分佈式延遲隊列

4|2延遲消息體設計

靈感來襲,基於Redis的分佈式延遲隊列

延遲消息體Message實現了Delayed接口,這樣Java DelayQueue就知道什麼時候取出消息體。

4|3Redis延遲隊列實現

靈感來襲,基於Redis的分佈式延遲隊列

RedisDelayQueue構造函數依賴redis操作緩存服務對象目標隊列名稱(redis key)。

offer方法傳入member(具體消息),delay(延遲時間),timeUnit(時間單位),然後封裝成延遲消息體Message對象,放入Java DelayQueue中。

run方法是一個循環體,不斷的從Java DelayQueue對象中獲取消息體,然後放入redis對應的目標隊列裡。

4|4延遲隊列測試demo

靈感來襲,基於Redis的分佈式延遲隊列

控制檯打印效果

靈感來襲,基於Redis的分佈式延遲隊列

5|0思考

這種方案實現比較簡單,使用的時候一定要謹慎,應用於延遲小,消息量不大的場景是沒問題的,畢竟Java DelayQueue是佔用內存的。另外也可以考慮利用Redis的sorted set 結構實現延遲隊列,使用TimeStamp作為score,比如你的任務是要延遲5分鐘,那麼就在當前時間上加5分鐘作為 score ,輪詢任務每秒只輪詢 score 大於當前時間的 key即可,如果任務支持有誤差,那麼當沒有掃描到有效數據的時候可以休眠對應時間再繼續輪詢。

原文鏈接:https://www.cnblogs.com/


分享到:


相關文章: