java多線程AbstractQueuedSynchronizer(AQS)源碼分析

簡介

AbstractQueuedSynchronizer:譯為:隊列同步器(以下簡稱AQS),可以看到這是一個抽象類。有大名鼎鼎的併發大師Doug Lea設計:

java多線程AbstractQueuedSynchronizer(AQS)源碼分析

併發包中很多Lock都是通過繼承AQS實現的(ReentrantLock、 ReentrantReadWriteLock和CountDownLatch等),AQS中封裝了實現鎖的具體操作,其子類繼承AQS後,可以輕鬆的調用AQS的相應方法來實現同步狀態的管理同步狀態,線程的排隊,等待以及喚醒等操作。 子類可以重寫的方法如下:

  • protected boolean tryAcquire(int arg)獨佔式的獲取同步狀態,使用CAS設置同步狀態
  • protected boolean tryRelease(int arg)獨佔式的釋放同步狀態
  • protected int tryAcquireShared(int arg)共享式的獲取同步狀態,返回大於等於0的值,表示獲取成功,否則失敗
  • protected boolean tryReleaseShared(int arg) 共享式的釋放同步狀態
  • protected boolean isHeldExclusively()判斷當前是否被當前線程鎖獨佔

構成

java多線程AbstractQueuedSynchronizer(AQS)源碼分析

如上圖,AQS中定義了一個volatile整數狀態信息,我們可以通過 getState(),setState(int newState),compareAndSetState(int expect,int update))等protected方法進行操作這一狀態信息。例如:ReentrantLock中用它來表示所有線程呢個已經重複獲取該鎖的次數,Semaphore用它來表示剩餘的許可數量,FutureTask用它來表示任務的狀態(未開始,正在運行,已結束,已取消等)。

AQS是由一個同步隊列(FIFO雙向隊列)來管理同步狀態的,如果線程獲取同步狀態失敗,AQS會將當前線程以及等待狀態信息構造成一個節點(Node)加入到同步隊列中,同時阻塞當前線程;當同步狀態狀態釋放時,會把首節點中的線程喚醒,使其再次嘗試獲取同步狀態。

java多線程AbstractQueuedSynchronizer(AQS)源碼分析

準備工作

在跟著源碼走流程之前,我們先了一下以下幾個需要用到的概念:

AQS.Node

隊列示意圖如下:

java多線程AbstractQueuedSynchronizer(AQS)源碼分析

每個Node節點都是一個自旋鎖:在阻塞時不斷循環讀取狀態變量,當前驅節點釋放同步對象使用權後,跳出循環,執行同步代碼。我們在接下來的代碼分析中,也能夠看到通過死循環來達到自旋這一目的。

我們看一下Node節點類的幾個關鍵屬性(不必記住,下面用到的時候,再回來看即可):

MODE(兩個)

java多線程AbstractQueuedSynchronizer(AQS)源碼分析

兩種Mode,用於創建Node時的構造函數使用。在private Node addWaiter(Node mode)這一方法調用的時候傳入,用於想等待隊列中添加節點。

volatile int waitStatus

手機是waitStatus,用來表示當前節點的狀態。其取值範圍如下:

  • static final int CANCELLED = 1;表示節點的線程是已被取消的。當前節點由於超時或者被中斷而被取消。一旦節點被取消後,那麼它的狀態值不在會被改變,且當前節點的線程不會再次被阻塞。
  • static final int SIGNAL= -1;表示當前節點的後繼節點的線程需要被喚醒。當前節點的後繼節點已經 (或即將)被阻塞(通過LockSupport.park()) , 所以當 當前節點釋放或則被取消時候,一定要unpark它的後繼節點。為了避免競爭,獲取方法一定要首先設置node為signal,然後再次重新調用獲取方法,如果失敗,則阻塞。
  • static final int CONDITION = -2;表示線程正在等待某個條件。表示當前節點正在條件隊列(AQS下的ConditionObject裡也維護了個隊列)中,在從conditionObject隊列轉移到同步隊列前,它不會在同步隊列(AQS下的隊列)中被使用。當成功轉移後,該節點的狀態值將由CONDITION設置為0。
  • static final int PROPAGATE = -3;表示下一個共享模式的節點應該無條件的傳播下去。共享模式下的釋放操作應該被傳播到其他節點。該狀態值在doReleaseShared方法中被設置的。
  • 0 以上都不是。

可以看到,非負數值(0和已經取消)意味著該節點不需要被喚醒。所以,大多數代碼中不需要檢查該狀態值的確定值,只需要根據正負值來判斷即可對於一個正常的Node,他的waitStatus初始化值時0。對於一個condition隊列中的Node,他的初始化值時CONDITION。如果想要修改這個值,可以使用AQS提供CAS進行修改。(方法: boolean compareAndSetWaitStatus(Node node, int expect,int update))

volatile Node prev

用於鏈接當前節點的前驅節點,當前節點依賴前驅節點來檢測waitStatus,前驅節點是在當前節點入隊時候被設置的。為了提高GC效率,在當前節點出隊時候會把前驅節點設置為null。而且,在取消前驅節點的時候,則會while循環直到找到一個非取消(cancelled)的節點,由於頭節點永遠不會是取消狀態,所以我們一定可以找到非取消狀態的前置節點。

volatile Node next;

用於鏈接當前節點的後繼節點,在當前節點釋放時候會喚醒後繼節點。在一個當前節點入隊的時候,會先設置當前節點的prev,而不會立即設置前置節點的next。而是用CAS替換了tail之後才設置前置節點的next。(方法Node addWaiter(Node mode))

Node nextWaiter

用來串聯條件隊列,連接到下一個在條件上等待的結點或是特殊的值SHARED。因為條件隊列只在獨佔模式下持有時訪問,我們只需要一個簡單的鏈表隊列來持有在條件上等待的結點。再然後它們會被轉移到同步隊列(AQS隊列)再次重新獲取。由於條件隊列只能在獨佔模式下使用,所以我們要表示共享模式的節點的話只要使用特殊值SHARED來標明即可。

輔助方法分析(供查閱)

shouldParkAfterFailedAcquire

這個方法是信號控制(waitStatus)的核心。在獲取同步狀態失敗,生成Node並加入隊列中後,用於檢查和更新結點的狀態。返回true表示當前節點應該被阻塞。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驅節點如果狀態如果為SIGNAL。表明當前節點應被阻塞,等待喚醒(參見上文的SIGNAL狀態)
* 則返回true,然後park掛起線程
*/
return true;
if (ws > 0) {
/*
* 前驅節點狀態值大於0(只有一個取值1),表示前驅節點已經取消
* 此時應該丟棄前驅節點,而繼續尋找前驅節點的前驅節點,(見下圖)
* 這裡使用while循環查找前驅節點,並將當前節點的prev屬性設置為找到的新的節點。(下圖步驟1)
* 並將新的前驅節點的後繼節點設置為當前節點(下圖步驟2)
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {

/*
* 排除以上SIGNAL(-1)和>0(1)兩種情況
* 現在是前驅節點的waitStatus為0或PROPAGATE(-3)的情況(不考慮CONDITION的情況)
* 這時候表明前驅節點需要重新設置waitStatus
* 這樣在下一輪循環中,就可以判斷前驅節點的SIGNAL而阻塞park當前節點,以便於等待前驅節點的unpark(比如:shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

如圖:

java多線程AbstractQueuedSynchronizer(AQS)源碼分析

parkAndCheckInterrupt

java多線程AbstractQueuedSynchronizer(AQS)源碼分析

與上面的shouldParkAfterFailedAcquire中聯合調用

(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())

通過shouldParkAfterFailedAcquire方法獲取到可用的前驅節點,並設置前驅節點的WaitStatus值為SIGNAL,進而在此方法中將當前線程park(阻塞等待)。線程醒了之後,檢查線程是否被重點,並將結果返回。

cancelAcquire

上面講到,每一個NODE節點都是一個自旋鎖,都在不斷進行死循環自旋,當自旋過程中發生異常而無法獲得鎖,就需要取消節點。 需要做的是:

  • 清空node節點中的引用
  • node出隊:剔除當前節點,打斷next和prev引用。分為三種情況:1. node是tail 2. node既不是tail,也不是head的後繼節點 3. node是head的後繼節點 源碼分析如下:
 private void cancelAcquire(Node node) {
// 如果node為空,忽略,直接返回
if (node == null)
return;

//將thread引用置空
node.thread = null;

// 跳過取消的(cancelled)的前置節點,找到一個有效的前驅節點,如上面分析過的shouldParkAfterFailedAcquire
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// 拿到前驅節點的後繼節點
Node predNext = pred.next;

// 將節點的狀態值設為已取消,這樣,其他節點就可以跳過本節點,而不受其他線程的干擾
node.waitStatus = Node.CANCELLED;

// 情況1:如果當前節點是尾節點,CAS替換tail字段的引用為為前驅節點
// 成功之後,CAS將前驅節點的後繼節點置空
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);

} else {
// 情況2:如果當前節點不是tail,而前驅節點又不是head
// 則嘗試CAS將前驅節點的waitStatus標記為SIGNAL(表示前驅節點的後繼節點需要喚醒)
// 設置成功之後,CAS將前驅節點的後繼節點設置為當前節點的後繼節點(將當前節點剔除)
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 情況3:如果node是head的後繼節點,則直接喚醒node的後繼節點
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

如上: 情況1:

java多線程AbstractQueuedSynchronizer(AQS)源碼分析

  • 1:compareAndSetTail(node, pred) 替換tail的引用
  • 2:compareAndSetNext(pred, predNext, null); 將pred的next置空

情況2:

java多線程AbstractQueuedSynchronizer(AQS)源碼分析

  • compareAndSetNext(pred, predNext, next); 將前驅節點的next指向後繼節點。後繼節點的prev將在前面講過的shouldParkAfterFailedAcquire進行添加。

情況3 下面將分析unparkSuccessor方法

unparkSuccessor

用於喚醒當前節點的後繼節點。

private void unparkSuccessor(Node node) {
// 將當前節點的狀態重置
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

// 拿到後繼節點 ,如果後繼機節點是空或標記為取消(cancelled)
// 開是循環獲取後繼的可用節點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// LockSupport喚醒下一個節點
if (s != null)
LockSupport.unpark(s.thread);
}

上文中尋找下一個可用節點的時候,可以看到不是head->tail尋找,而是tail->head倒序尋找,這是因為:通過上面代碼可以看到,只有在當前節點node的後繼節點為nul的時候,才會執行循環尋找後面的可用後繼節點。注意此處:後繼節點已經為null了,故只能從尾部向前遍歷,找到第一個可用節點。

差不多就這些了,下面我們進入正題,探討一下獲取同步化狀態的流程。

-----------------------------------------------------

源碼分析

獨佔式獲取同步狀態

上源碼:

java多線程AbstractQueuedSynchronizer(AQS)源碼分析

首先tryAcquire(arg),tryAcquire是由子類實現,通過操作state進行判定當前是否允許當前線程獲取執行權力,用來控制當前是否允許獲取同步狀態。true表示獲取同步狀態,不必加入同步隊列中。如果返回了false,沒有獲取同步狀態,則需要加入到同步隊列中。繼續往下執行:

addWaiter(Node mode)

首先將節點添加到等待隊列中:

private Node addWaiter(Node mode) {
// 構造一個Node,nextWaiter為null
Node node = new Node(Thread.currentThread(), mode);
// 獲取到tail節點(也就是接下來,當前節點的前驅節點)
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS嘗試替換tail引用,如果成功,則返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 上述不成功,存在多線程競爭,則自旋
enq(node);
return node;
}

enq(final Node node)

private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果隊列為空,先CAS設置一下head空節點,完事之後進行下一次循環
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 設置當前節點的prev,然後CAS設置設置tail,和前驅節點的next
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;

return t;
}
}
}
}

添加隊列成功之後,我們繼續往下看,還是那張圖

java多線程AbstractQueuedSynchronizer(AQS)源碼分析

acquireQueued(final Node node, int arg)

acquireQueued主要是處理正在排隊等待的線程。自旋、阻塞重試獲取。如果獲取成功則替換當前節點為鏈表頭,然後返回。在獲取過程中,忽略了中斷,但將是否中斷的返回了。

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 死循環自旋,不斷嘗試獲取同步狀態
for (;;) {
//獲取當前節點的前驅節點
final Node p = node.predecessor();
// 只有前驅節點是head,也就是說排隊排到當前借錢,才有可能獲取同步狀態
// 如果允許獲取同步狀態,則將當前節點設置為head,設置其他標記,並返回,終止自旋
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 在上面同步獲取失敗後,有可能不是頭節點的後繼節點,這時沒有資格獲取同步狀態,就需要休眠
// 下面代碼上面講過,進一步檢查和更新節點狀態,判斷當前節點是否需要park,減少佔用CPU,等待前驅節點釋放同步狀態將它喚醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果失敗,取消獲取同步狀態,移除節點,上文已講
if (failed)
cancelAcquire(node);
}
}

selfInterrupt

獲取鎖過程中,忽略了中斷,在這裡處理中斷

static void selfInterrupt() {
Thread.currentThread().interrupt();
}

獲取分析完了,我們看一下,同步代碼執行完畢,同步狀態是如何釋放的。

獨佔式釋放同步狀態

 public final boolean release(int arg) {
//首先調用子類重寫方法tryRelease,返回true標識標識允許釋放同步狀態
if (tryRelease(arg)) {
//如果允許釋放,則當前head即為要釋放的node,只需要喚醒後繼node即可, unparkSuccessor上文講過
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

到此,我們走完了獨佔式鎖的獲取與釋放。簡要概述一下步驟:

  • 嘗試獲取鎖,如果不能獲取,添加進隊列
  • 隊列中該node進行自旋排隊,嘗試獲取同步狀態
  • 如果當前節點不是head的下個節點,休眠,等待喚醒
  • 喚醒後,檢查自身是否已被interrupted,繼續嘗試獲取鎖
  • 獲取後,執行同步代碼,
  • 執行完畢後,release鎖,喚醒下個節點

-----------------------------------------------------

共享式獲取同步狀態

上源碼:

java多線程AbstractQueuedSynchronizer(AQS)源碼分析

首先還是調用子類實現的tryAcquireShared,查看是否允許獲取同步狀態。如果首次獲取結果大於等於0.則完成獲取 。如果小於0,則表示不允許獲取同步狀態,進入隊列。

doAcquireShared(int arg)

死循環自旋嘗試獲取鎖

 private void doAcquireShared(int arg) {
// 構造Node,添加到隊列中,模式為Node.SHARED,查看Node構造函數
// 可以看到,當前Node的nextWaiter(不是next,詳看上文)為一個空node對象
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 拿到前驅node
final Node p = node.predecessor();
// 前驅node是head才有可能獲取鎖
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { // tryAcquireShared大於等於0,允許獲取鎖
// 獲取成功,需要將當前節點設置為AQS隊列中的第一個節點
// 這是AQS的規則,隊列的頭節點表示正在獲取鎖的節點
// 下面講解
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 同獨佔式
if (interrupted)
selfInterrupt();
failed = false;

return;
}
}
// 不解釋,見上文
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 不解釋,見上文
if (failed)
cancelAcquire(node);
}
}

setHeadAndPropagate

這是

private void setHeadAndPropagate(Node node, int propagate) {
// 取到head做緩存
Node h = head;
//將當前節點設置為head
setHead(node);
// propagate是tryAcquireShared返回的值 ,可以理解為Semaphore,是否還允許其他併發
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 並檢查當前節點的後繼節點為空或者後繼節點的nextWaiter是否為SHARED,表明後繼節點需要共享傳遞
if (s == null || s.isShared())
doReleaseShared(); // 進行share傳遞, doReleaseShared
}
}

可以看到這裡與獨佔式的做了相似的事情,都進行了設置head之後,區別是共享式獲取同步狀態又進行了share傳遞,傳遞給下一個nextWaiter屬性同樣為SHAREED的節點,我們看一下doReleaseShared方法

doReleaseShared

private void doReleaseShared() {
/*
* 即使在併發,多個線程在獲取、釋放的情況下,確保釋放的傳播性,
* 如果當前節點標記為SIGNAL(表示後繼節點需要喚醒,按理說應該在當前節點釋放的時候喚醒,但是此處是共享模式,故立即喚醒),則通常嘗試頭節點的unparkSuccessor 動作。
* 但是如果他不符合喚醒的條件,為了確保能正確release,那麼則把head的waitState設置為為PROPAGATE
* 此外,在執行該代碼時,為了以防萬一有新
* 節點的加入,或者我們CAS修改失敗,所以我們的更新需要在循環中,不斷嘗試。
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // 失敗了就繼續loop
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // 失敗了就繼續loop
}
if (h == head) // loop if head changed
break;
}
}

這裡最重要的是要多線程環境中理解doReleaseShared,一個線程A執行doReleaseShared,然後unparkSuccessor,線程B喚醒執行,這時候被喚醒的線程B運行,重新請求獲取同步狀態,修改head節點,喚醒線程C,然後依次喚醒D、E、F……每個節點在自己喚醒的同時,也喚醒了後面的節點,設置為head,這樣就達到了共享模式。

注意h == head,我們看到上面有註釋說Additionally, we must loop in case a new node is added while we are doing this.為了避免在執行到這裡的時候。如果有兩個新的節點添加到隊列中來,一個節點A喚醒B之後,B恰好setHead了,此時head是B節點。此時A之前獲得的head並不是新的head了,故需要繼續循環,以儘可能保證成功性。

可以看到 獨佔式與共享式的差別就是共享的傳遞: 獨佔模式喚醒頭節點,頭節點釋放之後,後繼節點喚醒 共享模式喚醒全部節點。

共享式釋放同步狀態

源碼不貼了,調用的是上述的doReleaseShared()

響應中斷獲取鎖

acquireInterruptibly和acquire差不多,acquireSharedInterruptibly和acquireShared差不多,區別就是拋出了InterruptedException。


分享到:


相關文章: