作者:田小波
來源:http://www.tianxiaobo.com/2018/05/04/AbstractQueuedSynchronizer-%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90-Condition-%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86/
1. 簡介
Condition是一個接口,AbstractQueuedSynchronizer 中的ConditionObject內部類實現了這個接口。Condition聲明瞭一組等待/通知的方法,這些方法的功能與Object中的wait/notify/notifyAll等方法相似。這兩者相同的地方在於,它們所提供的等待/通知方法均是為了協同線程的運行秩序。只不過,Object 中的方法需要配合 synchronized 關鍵字使用,而 Condition 中的方法則要配合鎖對象使用,並通過newCondition方法獲取實現類對象。除此之外,Condition 接口中聲明的方法功能上更為豐富一些。比如,Condition 聲明瞭具有不響應中斷和超時功能的等待接口,這些都是 Object wait 方法所不具備的。
本篇文章是上一篇文章AbstractQueuedSynchronizer 原理分析 - 獨佔/共享模式的續篇,在學習 Condition 的原理前,建議大家先去了解 AbstractQueuedSynchronizer 同步隊列相關原理。本篇文章會涉及到同步隊列相關知識,這些知識在上一篇文章分析過。
關於Condition的簡介這裡先說到這,接下來分析一下Condition實現類ConditionObject的原理。
2. 實現原理
ConditionObject是通過基於單鏈表的條件隊列來管理等待線程的。線程在調用await方法進行等待時,會釋放同步狀態。同時線程將會被封裝到一個等待節點中,並將節點置入條件隊列尾部進行等待。當有線程在獲取獨佔鎖的情況下調用signal或singalAll方法時,隊列中的等待線程將會被喚醒,重新競爭鎖。另外,需要說明的是,一個鎖對象可同時創建多個 ConditionObject 對象,這意味著多個競爭同一獨佔鎖的線程可在不同的條件隊列中進行等待。在喚醒時,可喚醒指定條件隊列中的線程。其大致示意圖如下:
以上就是 ConditionObject 所實現的等待/通知機制的大致原理,並不是很難理解。當然,在具體的實現中,則考慮的更為細緻一些。相關細節將會在接下來一章中進行說明,繼續往下看吧。
3. 源碼解析
3.1 等待
ConditionObject 中實現了幾種不同的等待方法,每種方法均有它自己的特點。比如await()會響應中斷,而awaitUninterruptibly()則不響應中斷。await(long, TimeUnit)則會在響應中斷的基礎上,新增了超時功能。除此之外,還有一些等待方法,這裡就不一一列舉了。
在本節中,我將主要分析await()的方法實現。其他的等待方法大同小異,就不一一分析了,有興趣的朋友可以自己看一下。好了,接下來進入源碼分析階段。
<code>public
finalvoid
await
() throws InterruptedException {if
(Thread.interrupted())throw
new
InterruptedException(); Node node = addConditionWaiter();int
savedState = fullyRelease(node);int
interruptMode =0
;while
(!isOnSyncQueue(node)) { LockSupport.park(this
);if
((interruptMode = checkInterruptWhileWaiting(node)) !=0
)break
; }if
(acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;if
(node.nextWaiter !=null
) unlinkCancelledWaiters();if
(interruptMode !=0
) reportInterruptAfterWait(interruptMode); }private
NodeaddConditionWaiter
() { Node t = lastWaiter;if
(t !=null
&& t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node =new
Node(Thread.currentThread(), Node.CONDITION);if
(t ==null
) firstWaiter = node;else
t.nextWaiter = node; lastWaiter = node;return
node; }private
void
unlinkCancelledWaiters
() { Node t = firstWaiter; Node trail =null
;while
(t !=null
) { Node next = t.nextWaiter;if
(t.waitStatus != Node.CONDITION) { t.nextWaiter =null
;if
(trail ==null
) firstWaiter = next;else
trail.nextWaiter = next;if
(next ==null
) lastWaiter = trail; }else
trail = t; t = next; } }final
int
fullyRelease
(Node node
) { boolean failed =true
;try
{int
savedState = getState();if
(release(savedState)) { failed =false
;return
savedState; }else
{throw
new
IllegalMonitorStateException(); } }finally
{if
(failed) node.waitStatus = Node.CANCELLED; } }final boolean
isOnSyncQueue
(Node node
) {if
(node.waitStatus == Node.CONDITION || node.prev ==null
)return
false
;if
(node.next !=null
)return
true
;return
findNodeFromTail(node); }private
booleanfindNodeFromTail
(Node node
) { Node t = tail;for
(;;) {if
(t == node)return
true
;if
(t ==null
)return
false
; t = t.prev; } }private
int
checkInterruptWhileWaiting
(Node node
) {return
Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0
; }final boolean
transferAfterCancelledWait
(Node node
) {if
(compareAndSetWaitStatus(node, Node.CONDITION,0
)) { enq(node);return
true
; }while
(!isOnSyncQueue(node)) Thread.yield
(); }return
false
; }private
void
reportInterruptAfterWait
(int
interruptMode) throws InterruptedException {if
(interruptMode == THROW_IE)throw
new
InterruptedException();else
if
(interruptMode == REINTERRUPT) selfInterrupt(); }static
void
selfInterrupt
() { Thread.currentThread().interrupt(); }/<code>
3.2 通知
<code>public
final
void
signal
()
{if
(!isHeldExclusively())throw
new
IllegalMonitorStateException(); Node first = firstWaiter;if
(first !=null
) doSignal(first); }private
void
doSignal
(Node first)
{do
{if
( (firstWaiter = first.nextWaiter) ==null
) lastWaiter =null
; first.nextWaiter =null
; }while
(!transferForSignal(first) && (first = firstWaiter) !=null
); }final
boolean
transferForSignal
(Node node)
{if
(!compareAndSetWaitStatus(node, Node.CONDITION,0
))return
false
; Node p = enq(node);int
ws = p.waitStatus;if
(ws >0
|| !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return
true
; }/<code>
看完了 signal 方法的分析,下面再來看看 signalAll 的源碼分析,如下:
<code>public
final
void
signalAll
()
{if
(!isHeldExclusively())throw
new
IllegalMonitorStateException(); Node first = firstWaiter;if
(first !=null
) doSignalAll(first); }private
void
doSignalAll
(Node first)
{ lastWaiter = firstWaiter =null
;do
{ Node next = first.nextWaiter; first.nextWaiter =null
; transferForSignal(first); first = next; }while
(first !=null
); }/<code>
4. 其他
在我閱讀 ConditionObject 源碼時發現了一個問題 - await 方法竟然沒有做同步控制。而在 signal 和 signalAll 方法開頭都會調用 isHeldExclusively 檢測線程是否已經獲取了獨佔鎖,未獲取獨佔鎖調用這兩個方法會拋出異常。但在 await 方法中,卻沒有進行相關的檢測。如果在正確的使用方式下調用 await 方法是不會出現問題的,所謂正確的使用方式指的是在獲取鎖的情況下調用 await 方法。但如果沒獲取鎖就調用該方法,就會產生線程競爭的情況,這將會對條件隊列的結構造成破壞。這裡再來看一下新增節點的方法源碼,如下:
<code>private
Node addConditionWaiter() {
Node
t = lastWaiter;
if
(t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t
=lastWaiter;
}
Node
node = new Node(Thread.currentThread(), Node.CONDITION);
存在競爭時將會導致節點入隊出錯
if
(t == null)
firstWaiter
=node;
else
=node;
lastWaiter
=node;
return
node;
}
/<code>
假如現在有線程 t1 和 t2,對應節點 node1 和 node2。線程 t1 獲取了鎖,而 t2 未獲取鎖,此時條件隊列為空,即 firstWaiter = lastWaiter = null。演繹一下會導致條件隊列被破壞的場景,如下:
- 時刻1:線程 t1 和 t2 同時執行到 if (t == null),兩個線程都認為 if 條件滿足
- 時刻2:線程 t1 初始化 firstWaiter,即將 firstWaiter 指向 node1
- 時刻3:線程 t2 再次修改 firstWaiter 的指向,此時 firstWaiter 指向 node2
如上,如果線程是按照上面的順序執行,這會導致隊列被破壞。firstWaiter 本應該指向 node1,但結果卻指向了 node2,node1 被排擠出了隊列。這樣會導致什麼問題呢?這樣可能會導致線程 t1 一直阻塞下去。因為 signal/signalAll 是從條件隊列頭部轉移節點的,但 node1 不在隊列中,所以 node1 無法被轉移到同步隊列上。在不出現中斷的情況下,node1 對應的線程 t1 會被永久阻塞住。
這裡未對 await 方法進行同步控制,導致條件隊列出現問題,應該算 ConditionObject 實現上的一個缺陷了。關於這個缺陷,博客園博主 活在夢裡 在他的文章 AbstractQueuedSynchronizer源碼解讀–續篇之Condition 中也提到了。並向 JDK 開發者提了一個 BUG,BUG 鏈接為 JDK-8187408,有興趣的同學可以去看看。
5. 總結
到這裡,Condition 的原理就分析完了。分析完 Condition 原理,關於 AbstractQueuedSynchronizer 的分析也就結束了。總體來說,通過分析 AQS 並寫成博客,使我對 AQS 的原理有了更深刻的認識。AQS 是 JDK 中鎖和其他併發組件實現的基礎,弄懂 AQS 原理對後續在分析各種鎖和其他同步組件大有裨益。
AQS 本身實現比較複雜,要處理各種各樣的情況。作為類庫,AQS 要考慮和處理各種可能的情況,實現起來可謂非常複雜。不僅如此,AQS 還很好的封裝了同步隊列的管理,線程的阻塞與喚醒等基礎操作,大大降低了繼承類實現同步控制功能的複雜度。所以,在本文的最後,再次向 AQS 的作者,Java 大師Doug Lea致敬。
好了,本文到此結束,謝謝大家閱讀。
參考
- AbstractQueuedSynchronizer源碼解讀–續篇之Condition