線程併發——AQS抽象隊列同步器

AQS隊列同步器英文全稱AbstractQueuedSynchronizer,這是一個抽象類,為什麼我們今天需要學習這個抽象類呢?這個抽象類它的神奇之處到底是什麼呢?我們一起來掀開它的神奇面紗吧!

什麼是AQS(AbstractQueuedSynchronizer)?


線程併發——AQS抽象隊列同步器

AQS中文翻譯為同步器,Lock接口的實現類基本都是通過包含AQS子類對象來完成線程訪問控制的。比如說Lock中的獲取鎖和釋放鎖操作。說白了就是一把抽象的鎖。


線程併發——AQS抽象隊列同步器

Lock通過調用AQS子類的lock方法實現獲取鎖,unlock方法實現釋放鎖。


線程併發——AQS抽象隊列同步器

線程併發——AQS抽象隊列同步器

1.1 AQS源碼分析

1.1.1類字段分析:

 private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

1)我們可以看到字段中其實最主要的就是一個volatile 修飾的state(狀態),state用於標記鎖狀態。

2)compareAndSetState方法是使用CAS算法修改state保證state修改時的數據安全。

3)head和tail表示隊列的頭和尾,也就是鎖該類將線程和線程狀態封裝成一個節點Node存入到同步隊列中,結構如下:


線程併發——AQS抽象隊列同步器

AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。

1.1.2方法分析:


線程併發——AQS抽象隊列同步器

AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。

tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。

tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。

tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。

tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false。

以ReentrantLock為例,state初始化為0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會獲取該鎖。當然,釋放鎖之前,A線程自己是可以重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。

再以CountDownLatch以例,任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一致)。這N個子線程是並行執行的,每個子線程執行完後countDown()一次,state會CAS減1。等到所有子線程都執行完後(即state=0),會unpark()主調用線程,然後主調用線程就會從await()函數返回,繼續後餘動作。

1.1.3鎖類型分析:


線程併發——AQS抽象隊列同步器

1)公平鎖能保證:老的線程排隊使用鎖,新線程仍然排隊使用鎖。 2)非公平鎖保證:老的線程排隊使用鎖;但是無法保證新線程搶佔已經在排隊的線程的鎖。

注意:ReentrantLock初始化默認為非公平鎖,非公平鎖減少了線程的掛起和恢復運行效率高於公平鎖。公平鎖可以解決飢餓發生。

1.1.4獲取鎖操作分析:

final void lock() {//獲取鎖

if (compareAndSetState(0, 1))//獲取鎖成功

setExclusiveOwnerThread(Thread.currentThread());//獨佔鎖

else

acquire(1);//其他線程嘗試獲取鎖

}

acquire****方法:


線程併發——AQS抽象隊列同步器


1)tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;

2)addWaiter()將該線程加入等待隊列的尾部,並標記為獨佔模式;

3)acquireQueued()****方法;重新競爭同步鎖

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//自旋
final Node p = node.predecessor();//獲取當前節點的上一個節點
if (p == head && tryAcquire(arg)) {//上一節點為頭節點和獲取嘗試獲取鎖成功
setHead(node);//上一個節點已經獲取到鎖,將當前階段設置為頭節點,下一次就由自己獲取鎖
p.next = null; // 頭節點獲取到鎖離開隊列
failed = false;
return interrupted;//返回中斷狀態
}
//不是頭節點或者獲取不到鎖
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire****方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)//如果上一節點被喚醒,當前節點就進入等待
return true;
if (ws > 0) {//如果上一節點線程被取消了
do {
node.prev = pred = pred.prev;//當前階段嘗試跳過上一節點,插個隊
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//嘗試將上一個節點設置為喚醒狀態
}
return false;
}

parkAndCheckInterrupt****方法:

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//線程進入等待狀態
return Thread.interrupted();//線程喚醒後,檢查自己是否真的處於中斷狀態,注意:interrupted會清除中斷狀態
}

1)selfInterrupt(),自我中斷,當前線程調用了interrupt方法

流程圖如下:


線程併發——AQS抽象隊列同步器

16.1.5 釋放鎖操作分析

release****方法:

public final boolean release(int arg) {
if (tryRelease(arg)) {//嘗試釋放鎖
Node h = head;
if (h != null && h.waitStatus != 0)//頭節點不為null並且等待狀態不是為沒有狀態
unparkSuccessor(h);//喚醒頭節點的後面一個節點
return true;
}
return false;

}

tryRelease****方法:

protected final boolean tryRelease(int releases) {
int c = getState() - releases;//重入鎖state大於1,需要調用兩次unlock方法,釋放兩次
if (Thread.currentThread() != getExclusiveOwnerThread())//如果獨佔鎖不是當前需要釋放的線程,拋出異常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);//獨佔鎖設置為null,釋放鎖
}
setState(c);
return free;
}

unparkSuccessor****方法:

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)//等待狀態,不是被取消的狀態
compareAndSetWaitStatus(node, ws, 0);//CAS修改回到最初的狀態,沒有等待的狀態下
Node s = node.next;
if (s == null || s.waitStatus > 0) {//各種等待情況下
s = null;
//從隊尾開始找不為空和不是當前節點的節點,直到找到當前節點的後面一個節點位置
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)//如果t為等待狀態下
s = t;//s設置為t,拿到t這個節點

}
if (s != null)//喚醒當前節點的下一個節點
LockSupport.unpark(s.thread);
}


分享到:


相關文章: