「轉」 Java 無界阻塞隊列 DelayQueue 入門實戰

原文出處:http://cmsblogs.com/ 『chenssy

DelayQueue是一個支持延時獲取元素的無界阻塞隊列。裡面的元素全部都是“可延期”的元素,列頭的元素是最先“到期”的元素,如果隊列裡面沒有元素到期,是不能從列頭獲取元素的,哪怕有元素也不行。也就是說只有在延遲期到時才能夠從隊列中取元素。

DelayQueue主要用於兩個方面:

  • 緩存:清掉緩存中超時的緩存數據
  • 任務超時處理

DelayQueue

DelayQueue實現的關鍵主要有如下幾個:

  1. 可重入鎖ReentrantLock
  2. 用於阻塞和通知的Condition對象
  3. 根據Delay時間排序的優先級隊列:PriorityQueue
  4. 用於優化阻塞通知的線程元素leader

ReentrantLock、Condition這兩個對象就不需要闡述了,他是實現整個BlockingQueue的核心。PriorityQueue是一個支持優先級線程排序的隊列(參考http://cmsblogs.com/?p=2407),leader後面闡述。這裡我們先來了解Delay,他是實現延時操作的關鍵。

Delayed

Delayed接口是用來標記那些應該在給定延遲時間之後執行的對象,它定義了一個long getDelay(TimeUnit unit)方法,該方法返回與此對象相關的的剩餘時間。同時實現該接口的對象必須定義一個compareTo 方法,該方法提供與此接口的 getDelay 方法一致的排序。

public interface Delayed extends Comparable<delayed> {
long getDelay(TimeUnit unit);
}
/<delayed>

如何使用該接口呢?上面說的非常清楚了,實現該接口的getDelay()方法,同時定義compareTo()方法即可。

內部結構

先看DelayQueue的定義:

 public class DelayQueue extends AbstractQueue
implements BlockingQueue {
/** 可重入鎖 */
private final transient ReentrantLock lock = new ReentrantLock();
/** 支持優先級的BlockingQueue */
private final PriorityQueue q = new PriorityQueue();
/** 用於優化阻塞 */
private Thread leader = null;
/** Condition */
private final Condition available = lock.newCondition();
/**
* 省略很多代碼
*/
}

看了DelayQueue的內部結構就對上面幾個關鍵點一目瞭然了,但是這裡有一點需要注意,DelayQueue的元素都必須繼承Delayed接口。同時也可以從這裡初步理清楚DelayQueue內部實現的機制了:以支持優先級無界隊列的PriorityQueue作為一個容器,容器裡面的元素都應該實現Delayed接口,在每次往優先級隊列中添加元素時以元素的過期時間作為排序條件,最先過期的元素放在優先級最高。

offer()

 public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 向 PriorityQueue中插入元素
q.offer(e);
// 如果當前元素的對首元素(優先級最高),leader設置為空,喚醒所有等待線程
if (q.peek() == e) {
leader = null;
available.signal();
}
// 無界隊列,永遠返回true
return true;
} finally {
lock.unlock();
}
}

offer(E e)就是往PriorityQueue中添加元素,具體可以參考(http://cmsblogs.com/?p=2407)。整個過程還是比較簡單,但是在判斷當前元素是否為對首元素,如果是的話則設置leader=null,這是非常關鍵的一個步驟,後面闡述。

take()

 public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 對首元素
E first = q.peek();
// 對首為空,阻塞,等待off()操作喚醒
if (first == null)
available.await();
else {
// 獲取對首元素的超時時間
long delay = first.getDelay(NANOSECONDS);
// <=0 表示已過期,出對,return
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
// leader != null 證明有其他線程在操作,阻塞
if (leader != null)
available.await();
else {
// 否則將leader 設置為當前線程,獨佔
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 超時阻塞
available.awaitNanos(delay);
} finally {
// 釋放leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 喚醒阻塞線程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();

}
}

首先是獲取對首元素,如果對首元素的延時時間 delay <= 0 ,則可以出對了,直接return即可。否則設置first = null,這裡設置為null的主要目的是為了避免內存洩漏。如果 leader != null 則表示當前有線程佔用,則阻塞,否則設置leader為當前線程,然後調用awaitNanos()方法超時等待。

first = null

這裡為什麼如果不設置first = null,則會引起內存洩漏呢?線程A到達,列首元素沒有到期,設置leader = 線程A,這是線程B來了因為leader != null,則會阻塞,線程C一樣。假如線程阻塞完畢了,獲取列首元素成功,出列。這個時候列首元素應該會被回收掉,但是問題是它還被線程B、線程C持有著,所以不會回收,這裡只有兩個線程,如果有線程D、線程E...呢?這樣會無限期的不能回收,就會造成內存洩漏。

這個入隊、出對過程和其他的阻塞隊列沒有很大區別,無非是在出對的時候增加了一個到期時間的判斷。同時通過leader來減少不必要阻塞。

本文由博客一文多發平臺 https://openwrite.cn?from=article_bottom 發佈!


分享到:


相關文章: