AbstractQueuedSynchronizer 原理分析 - Condition 實現原理

作者:田小波

來源:http://www.tianxiaobo.com/2018/05/04/AbstractQueuedSynchronizer-%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90-Condition-%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86/

1. 簡介

Condition是一個接口,AbstractQueuedSynchronizer 中的ConditionObject內部類實現了這個接口。Condition聲明瞭一組等待/通知的方法,這些方法的功能與Object中的wait/notify/notifyAll等方法相似。這兩者相同的地方在於,它們所提供的等待/通知方法均是為了協同線程的運行秩序。只不過,Object 中的方法需要配合 synchronized 關鍵字使用,而 Condition 中的方法則要配合鎖對象使用,並通過newCondition方法獲取實現類對象。除此之外,Condition 接口中聲明的方法功能上更為豐富一些。比如,Condition 聲明瞭具有不響應中斷和超時功能的等待接口,這些都是 Object wait 方法所不具備的。

本篇文章是上一篇文章AbstractQueuedSynchronizer 原理分析 - 獨佔/共享模式的續篇,在學習 Condition 的原理前,建議大家先去了解 AbstractQueuedSynchronizer 同步隊列相關原理。本篇文章會涉及到同步隊列相關知識,這些知識在上一篇文章分析過。

關於Condition的簡介這裡先說到這,接下來分析一下Condition實現類ConditionObject的原理。

2. 實現原理

ConditionObject是通過基於單鏈表的條件隊列來管理等待線程的。線程在調用await方法進行等待時,會釋放同步狀態。同時線程將會被封裝到一個等待節點中,並將節點置入條件隊列尾部進行等待。當有線程在獲取獨佔鎖的情況下調用signal或singalAll方法時,隊列中的等待線程將會被喚醒,重新競爭鎖。另外,需要說明的是,一個鎖對象可同時創建多個 ConditionObject 對象,這意味著多個競爭同一獨佔鎖的線程可在不同的條件隊列中進行等待。在喚醒時,可喚醒指定條件隊列中的線程。其大致示意圖如下:

AbstractQueuedSynchronizer 原理分析 - Condition 實現原理

以上就是 ConditionObject 所實現的等待/通知機制的大致原理,並不是很難理解。當然,在具體的實現中,則考慮的更為細緻一些。相關細節將會在接下來一章中進行說明,繼續往下看吧。

3. 源碼解析

3.1 等待

ConditionObject 中實現了幾種不同的等待方法,每種方法均有它自己的特點。比如await()會響應中斷,而awaitUninterruptibly()則不響應中斷。await(long, TimeUnit)則會在響應中斷的基礎上,新增了超時功能。除此之外,還有一些等待方法,這裡就不一一列舉了。

在本節中,我將主要分析await()的方法實現。其他的等待方法大同小異,就不一一分析了,有興趣的朋友可以自己看一下。好了,接下來進入源碼分析階段。

<code> 

public

final

void

await

(

) throws InterruptedException

{

if

(Thread.interrupted())

throw

new

InterruptedException(); Node node = addConditionWaiter();

int

savedState = fullyRelease(node);

int

interruptMode =

0

;

while

(!isOnSyncQueue(node)) { LockSupport.park(

this

);

if

((interruptMode = checkInterruptWhileWaiting(node)) !=

0

)

break

; }

if

(acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;

if

(node.nextWaiter !=

null

) unlinkCancelledWaiters();

if

(interruptMode !=

0

) reportInterruptAfterWait(interruptMode); }

private

Node

addConditionWaiter

(

)

{ Node t = lastWaiter;

if

(t !=

null

&& t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node =

new

Node(Thread.currentThread(), Node.CONDITION);

if

(t ==

null

) firstWaiter = node;

else

t.nextWaiter = node; lastWaiter = node;

return

node; }

private

void

unlinkCancelledWaiters

(

)

{ Node t = firstWaiter; Node trail =

null

;

while

(t !=

null

) { Node next = t.nextWaiter;

if

(t.waitStatus != Node.CONDITION) { t.nextWaiter =

null

;

if

(trail ==

null

) firstWaiter = next;

else

trail.nextWaiter = next;

if

(next ==

null

) lastWaiter = trail; }

else

trail = t; t = next; } }

final

int

fullyRelease

(

Node node

)

{ boolean failed =

true

;

try

{

int

savedState = getState();

if

(release(savedState)) { failed =

false

;

return

savedState; }

else

{

throw

new

IllegalMonitorStateException(); } }

finally

{

if

(failed) node.waitStatus = Node.CANCELLED; } }

final boolean

isOnSyncQueue

(

Node node

)

{

if

(node.waitStatus == Node.CONDITION || node.prev ==

null

)

return

false

;

if

(node.next !=

null

)

return

true

;

return

findNodeFromTail(node); }

private

boolean

findNodeFromTail

(

Node node

)

{ Node t = tail;

for

(;;) {

if

(t == node)

return

true

;

if

(t ==

null

)

return

false

; t = t.prev; } }

private

int

checkInterruptWhileWaiting

(

Node node

)

{

return

Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :

0

; }

final boolean

transferAfterCancelledWait

(

Node node

)

{

if

(compareAndSetWaitStatus(node, Node.CONDITION,

0

)) { enq(node);

return

true

; }

while

(!isOnSyncQueue(node)) Thread.

yield

(); }

return

false

; }

private

void

reportInterruptAfterWait

(

int

interruptMode

) throws InterruptedException

{

if

(interruptMode == THROW_IE)

throw

new

InterruptedException();

else

if

(interruptMode == REINTERRUPT) selfInterrupt(); }

static

void

selfInterrupt

(

)

{ Thread.currentThread().interrupt(); }/<code>

3.2 通知

<code> 

public

final

void

signal

()

{

if

(!isHeldExclusively())

throw

new

IllegalMonitorStateException(); Node first = firstWaiter;

if

(first !=

null

) doSignal(first); }

private

void

doSignal

(Node first)

{

do

{

if

( (firstWaiter = first.nextWaiter) ==

null

) lastWaiter =

null

; first.nextWaiter =

null

; }

while

(!transferForSignal(first) && (first = firstWaiter) !=

null

); }

final

boolean

transferForSignal

(Node node)

{

if

(!compareAndSetWaitStatus(node, Node.CONDITION,

0

))

return

false

; Node p = enq(node);

int

ws = p.waitStatus;

if

(ws >

0

|| !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);

return

true

; }/<code>

看完了 signal 方法的分析,下面再來看看 signalAll 的源碼分析,如下:

<code>

public

final

void

signalAll

()

{

if

(!isHeldExclusively())

throw

new

IllegalMonitorStateException(); Node first = firstWaiter;

if

(first !=

null

) doSignalAll(first); }

private

void

doSignalAll

(Node first)

{ lastWaiter = firstWaiter =

null

;

do

{ Node next = first.nextWaiter; first.nextWaiter =

null

; transferForSignal(first); first = next; }

while

(first !=

null

); }/<code>

4. 其他

在我閱讀 ConditionObject 源碼時發現了一個問題 - await 方法竟然沒有做同步控制。而在 signal 和 signalAll 方法開頭都會調用 isHeldExclusively 檢測線程是否已經獲取了獨佔鎖,未獲取獨佔鎖調用這兩個方法會拋出異常。但在 await 方法中,卻沒有進行相關的檢測。如果在正確的使用方式下調用 await 方法是不會出現問題的,所謂正確的使用方式指的是在獲取鎖的情況下調用 await 方法。但如果沒獲取鎖就調用該方法,就會產生線程競爭的情況,這將會對條件隊列的結構造成破壞。這裡再來看一下新增節點的方法源碼,如下:

<code>

private

Node addConditionWaiter() {

Node

t = lastWaiter;

if

(t != null && t.waitStatus != Node.CONDITION) {

unlinkCancelledWaiters();

t

=

lastWaiter;

}

Node

node = new Node(Thread.currentThread(), Node.CONDITION);

存在競爭時將會導致節點入隊出錯

if

(t == null)

firstWaiter

=

node;

else

=

node;

lastWaiter

=

node;

return

node;

}

/<code>

假如現在有線程 t1 和 t2,對應節點 node1 和 node2。線程 t1 獲取了鎖,而 t2 未獲取鎖,此時條件隊列為空,即 firstWaiter = lastWaiter = null。演繹一下會導致條件隊列被破壞的場景,如下:

  1. 時刻1:線程 t1 和 t2 同時執行到 if (t == null),兩個線程都認為 if 條件滿足
  2. 時刻2:線程 t1 初始化 firstWaiter,即將 firstWaiter 指向 node1
  3. 時刻3:線程 t2 再次修改 firstWaiter 的指向,此時 firstWaiter 指向 node2

如上,如果線程是按照上面的順序執行,這會導致隊列被破壞。firstWaiter 本應該指向 node1,但結果卻指向了 node2,node1 被排擠出了隊列。這樣會導致什麼問題呢?這樣可能會導致線程 t1 一直阻塞下去。因為 signal/signalAll 是從條件隊列頭部轉移節點的,但 node1 不在隊列中,所以 node1 無法被轉移到同步隊列上。在不出現中斷的情況下,node1 對應的線程 t1 會被永久阻塞住。

這裡未對 await 方法進行同步控制,導致條件隊列出現問題,應該算 ConditionObject 實現上的一個缺陷了。關於這個缺陷,博客園博主 活在夢裡 在他的文章 AbstractQueuedSynchronizer源碼解讀–續篇之Condition 中也提到了。並向 JDK 開發者提了一個 BUG,BUG 鏈接為 JDK-8187408,有興趣的同學可以去看看。

5. 總結

到這裡,Condition 的原理就分析完了。分析完 Condition 原理,關於 AbstractQueuedSynchronizer 的分析也就結束了。總體來說,通過分析 AQS 並寫成博客,使我對 AQS 的原理有了更深刻的認識。AQS 是 JDK 中鎖和其他併發組件實現的基礎,弄懂 AQS 原理對後續在分析各種鎖和其他同步組件大有裨益。

AQS 本身實現比較複雜,要處理各種各樣的情況。作為類庫,AQS 要考慮和處理各種可能的情況,實現起來可謂非常複雜。不僅如此,AQS 還很好的封裝了同步隊列的管理,線程的阻塞與喚醒等基礎操作,大大降低了繼承類實現同步控制功能的複雜度。所以,在本文的最後,再次向 AQS 的作者,Java 大師Doug Lea致敬。

好了,本文到此結束,謝謝大家閱讀。

參考

  • AbstractQueuedSynchronizer源碼解讀–續篇之Condition


分享到:


相關文章: