淺談Java中的Condition條件隊列,手摸手帶你實現一個阻塞隊列

條件隊列是什麼?可能很多人和我一樣答不出來,不過今天終於搞清楚了!

什麼是條件隊列

條件隊列:當某個線程調用了wait方法,或者通過Condition對象調用了await相關方法,線程就會進入阻塞狀態,並加入到對應條件隊列中。

在 中我們提到了條件隊列,即當對象獲取到同步鎖之後,如果調用了wait方法,當前線程會進入到條件隊列中,並釋放鎖。

<code>synchronized(對象){ // 獲取鎖失敗,線程會加入到同步隊列中 
while(條件不滿足){
對象.wait();// 調用wait方法當前線程加入到條件隊列中
}
}/<code>

基於synchcronized的內置條件隊列存在一些缺陷。每個內置鎖都只能有一個相關聯的條件隊列,因而存在多個線程可能在同一個條件隊列上等待不同的條件謂詞,並且在最常見的加鎖模式下公開條件隊列對象。

Java中的鎖的實現可以分為兩種,一種是基於synchronized的隱式鎖,它是基於JVM層面實現的;而另一種則是基於AQS框架在代碼層面實現的鎖,如ReentrantLock等,在進行併發控制過程中,很多情況下他們都可以相互替代。

其中同步隊列和條件隊列是AQS中兩個比較核心的概念,它們是代碼層面實現鎖的關鍵。關於同步隊列的內容,我們已經在 中進行了詳細的介紹。

與Object配合synchronized相比,基於AQS的Lock&Condition實現的等待喚醒模式更加靈活,支持多個條件隊列,支持等待狀態中不響應中斷以及超時等待功能; 其次就是基於AQS實現的條件隊列是"肉眼可見"的,我們可以通過源代碼進行debug,而synchronized則是完全隱式的。

同步隊列和條件隊列

與條件隊列密不可分的類則是ConditionObject, 是AQS中實現了Condition接口的內部類,通常配合基於AQS實現的鎖一同使用。當線程獲取到鎖之後,可以調用await方法進入條件隊列並釋放鎖,或者調用singinal方法喚醒對應條件隊列中等待時間最久的線程並加入到等待隊列中。

在AQS中,線程會被封裝成Node對象加入隊列中,而條件隊列中則複用了同步隊列中的Node對象

淺談Java中的Condition條件隊列,手摸手帶你實現一個阻塞隊列

Condition相關方法和描述

Condition接口一共定義了以下幾個方法:

await(): 當前線程進入等待狀態,直到被通知(siginal)或中斷【和wait方法語義相同】。

awaitUninterruptibly(): 當前線程進入等待狀態,直到被通知,對中斷不敏感。

awaitNanos(long timeout): 當前線程進入等待狀態直到被通知(siginal),中斷或超時。

awaitUnitil(Date deadTime): 當前線程進入等待狀態直到被通知(siginal),中斷或到達某個時間。

signal(): 喚醒一個等待在Condition上的線程,該線程從等待方法返回前必須獲得與Condition關聯的鎖【和notify方法語義相同】

signalAll(): 喚醒所有等待在Condition上的線程,能夠從等待方法返回的線程必須獲得與Condition關聯的鎖【和notifyAll方法語義相同】。

條件隊列入隊操作

當線程獲取到鎖之後,Condition對象調用await相關的方法,線程會進入到對應的條件隊列中。

<code>/**
* 如果當前線程被終端,拋出 InterruptedException 異常
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加當前線程到【條件隊列】
Node node = addConditionWaiter();
// 釋放已經獲取的鎖資源,並返回釋放前的同步狀態
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果當前節點不在【同步隊列】中, 線程進入阻塞狀態,等待被喚醒
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}/<code>
淺談Java中的Condition條件隊列,手摸手帶你實現一個阻塞隊列

條件隊出隊操作

Condition對象調用signal或者signalAll方法時,

<code>/**
* 將【條件隊列】中第一個有效的元素移除並且添加到【同步隊列】中
* 所謂有效指的是非null,並且狀態嗎
* @param first 條件隊列中第一個非空的元素
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 將條件隊列中等待最久的那個有效元素添加到同步隊列中
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

/**
* 將條件隊列中的節點轉換到同步隊列中
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
* 如果節點的等待狀態不能被修改,說明當前線程已經被取消等待【多個線程執行siginal時會出現的情況】
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* 加入到【同步隊列】中,並且嘗試將前驅節點設置為可喚醒狀態

*/
Node p = enq(node); // 將node添加到同步隊列中,並返回它的前驅節點
int ws = p.waitStatus;
// 如果前驅節點不需要喚醒,或者設置狀態為‘喚醒’失敗,則喚醒線程時期重新爭奪同步狀態
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}/<code>
淺談Java中的Condition條件隊列,手摸手帶你實現一個阻塞隊列


實現阻塞隊列

  1. 自定義互斥鎖升級,增加獲取Condition對象接口。
<code>/**
* 自定義互斥鎖
*
* @author cruder
* @time 2019/11/29 9:43
*/
public class MutexLock {

private static final Sync STATE_HOLDER = new Sync();

/**
* 通過Sync內部類來持有同步狀態, 當狀態為1表示鎖被持有,0表示鎖處於空閒狀態
*/
private static class Sync extends AbstractQueuedSynchronizer {

/**
* 是否被獨佔, 有兩種表示方式
* 1. 可以根據狀態,state=1表示鎖被佔用,0表示空閒
* 2. 可以根據當前獨佔鎖的線程來判斷,即getExclusiveOwnerThread()!=null 表示被獨佔
*/
@Override
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() != null;
}

/**
* 嘗試獲取鎖,將狀態從0修改為1,操作成功則將當前線程設置為當前獨佔鎖的線程
*/
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

/**
* 釋放鎖,將狀態修改為0
*/
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) {
throw new UnsupportedOperationException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}

//【新增代碼】
final ConditionObject newCondition() {
return new ConditionObject();
}
}

/**
* 下面的實現Lock接口需要重寫的方法,基本是就是調用內部內Sync的方法
*/
public void lock() {
STATE_HOLDER.acquire(1);
}

public void unlock() {
STATE_HOLDER.release(1);
}

// 【新增代碼】 獲取條件隊列
public Condition newCondition(){
return STATE_HOLDER.newCondition();
}

}
/<code>


  1. 基於自定義互斥鎖,實現阻塞隊列。阻塞隊列具有兩個特點:
  • 添加元素到隊列中, 如果隊列已滿會使得當前線程阻塞【加入到條件隊列-隊列不滿】,直到隊列不滿為止
  • 移除隊列中的元素,當隊列為空時會使當前線程阻塞【加入到條件隊列-隊列不空】,直到隊列不為空為止
<code>/**
* 有界阻塞阻塞隊列
*
* @author Jann Lee
* @date 2019-12-11 22:20
**/
public class BoundedBlockingQueue {
/**
* list作為底層存儲結構
*/
private List dataList;
/**
* 隊列的大小
*/
private int size;

/**
* 鎖,和條件變量
*/
private MutexLock lock;
/**
* 隊列非空 條件變量

*/
private Condition notEmpty;
/**
* 隊列未滿 條件變量
*/
private Condition notFull;

public BoundedBlockingQueue(int size) {
dataList = new ArrayList<>();
lock = new MutexLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();
this.size = size;
}


/**
* 隊列中添加元素 [只有隊列未滿時才可以添加,否則需要等待隊列變成未滿狀態]
*/
public void add(T data) throws InterruptedException {
lock.lock();
try {
// 如果隊列已經滿了, 需要在等待“隊列未滿”條件滿足
while (dataList.size() == size) {
notFull.await();
}
dataList.add(data);
Thread.sleep(2000);
notEmpty.signal();
} finally {
lock.unlock();
}
}

/**
* 移除隊列的第一個元素[只有隊列非空才可以移除,否則需要等待變成隊列非空狀態]
*/
public T remove() throws InterruptedException {
lock.lock();
try {

// 如果為空, 需要在等待“隊列非空”條件滿足
while (dataList.isEmpty()) {
notEmpty.await();
}
T result = dataList.remove(0);
notFull.signal();
return result;
} finally {
lock.unlock();
}
}

}
/<code>


總結

  1. 條件隊列和同步隊列在Java中有兩種實現,synchronized關鍵字以及基於AQS
  2. 每個(基於synchronized的)內置鎖都只能有一個相關聯的條件隊列,會存在多個線程可能在同一個條件隊列上等待不同的條件謂詞;而(基於AQS實現的)顯式鎖支持多個條件隊列
  3. 與wait,notify,notifyAll 對應的方法時Conditoin接口中的await,signal,signalAll,他們具有相同的語義


如果你是一個初中級java開發工程師,在技術上有所追求,想要結識志同道合的朋友,可以關注筆者的公眾號cruder,大家一起進步


分享到:


相關文章: