AQS
AQS :
AbstractQueuedSynchronizer的簡稱。
AQS提供了一種手動實現鎖功能,使用狀態管理、FIFO(先入先出隊列)等待隊列等實現一個同步器功能。 如果 Synchronized 關鍵字比喻成是一個自動擋汽車,提供自動加鎖一系列平順感, 那麼
AbstractQueuedSynchronizer 就是一個手動擋的變速箱,提供一些手動加鎖的靈活性和可控性。 樂趣包含:獨佔、共享、可重入、可自旋、可中斷。
ReentrantLock 可重入鎖的實現來分析 AQS
一個對象可以重複遞歸的獲取本對象的鎖,而不會出現死鎖,這樣的鎖稱為可重入鎖。
ReentrantLock 就是這樣的一個實現,他通過 AQS 實現狀態管理和隊列控制。 具體實現的方式就是通過狀態位標識,當本線程佔用時 state = 1,重入後 state + 1,每以此重入都會執行 +1 操作。
當釋放鎖的時候,每執行 release 釋放操作,則 state -1 。直到 state = 0 結束,代表鎖已經完全釋放。 volatile 保證這個屬性的可見性,可以被其他線程讀取修改。
<code>private
volatile
int
state;/<code>
先來看看該類下的概況:
構造函數
<code>public
ReentrantLock
()
{ sync =new
NonfairSync(); }public
ReentrantLock
(
boolean
fair) { sync = fair ?new
FairSync() :new
NonfairSync(); } /<code>
從代碼分析可以看到,ReentrantLock 的構造函數有兩種:
- 第一種默認的,使用靜態內部類屬性 sync = new NonfairSync();, 默認使用非公平鎖方式。
- 也可以通過 boolean 類型參數來指定,fair = true , sync = 公平鎖 .fair = false, sync = 非公平鎖。
類關係:ReentrantLock 三個內部類 與 AQS
- Sync : 繼承自 AbstractQueuedSynchronizer,大名鼎鼎 AQS
- NonfairSync : 非公平同步器實現,繼承 Sync
- FairSync : 公平同步器實現,繼承 Sync
NonfairSync 非公平方式獲取鎖
<code>static
final
class
NonfairSync
extends
Sync
{private
static
final
long
serialVersionUID =7316153563782823691L
;final
void
lock
()
{if
(compareAndSetState(0
,1
)) setExclusiveOwnerThread(Thread.currentThread());else
acquire(1
); }protected
final
boolean
tryAcquire
(
int
acquires) {return
nonfairTryAcquire(acquires); } }/<code>
NonfairSync.lock()
NonfairSync.lock() 方式獲取,以非公平的方式搶佔。
當調用 lock 方法時,會進行判斷,當前是否有線程佔用情況, 利用 unsafe 類的 CAS 比較交換 ,如果當前值是 0 ,符合預期的 expect 參數 0 ,那麼將 0 替換為 1 。 如果設置值成功,說明當期沒有線程佔用,當前線程可以直接獲取鎖,並設置當前線程為排他鎖模式 setExclusiveOwnerThread, 這個過程就是搶佔,如果CAS操作失敗,調用 acquire(1),表示沒有搶佔成功,會插入 queue 等待,以公平鎖方式加入FIFO。
下面看下 AQS 的 acquire 具體實現方式:
<code>public
final
void
acquire
(
int
arg) {if
(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /<code>
在 acquire(1)函數中會有兩個判斷條件來決定是否調用 selfInterrupt() 來阻塞當前線程。
- tryAcquire(1) 這個方法由子類實現,也就是 NonfairSync 裡面的實現。
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 這一行代碼博大精深,還是待會結合源碼具體分析。 也就是說,如果競爭獲取 state 失敗,並且已經將本線程封裝為 Node 加入 FIFO 隊列,那麼就說明在等待其他線程先執行,需要掛起本線程等待。
NonfairSync.tryAcquire()
tryAcquire() 方式嘗試獲取,拿到或者拿不到鎖都會直接返回結果。 這個方法在 lock() 裡面也會調用,就是在搶佔鎖失敗後走的流程。
從代碼中可以看出,tryAcquire() 調用的是父類 Sync nonfairTryAcquire 方法,而 sync 實現的是 AQS 中的 tryAcquire()模板方法.
<code>final
booleannonfairTryAcquire
(
int
acquires) {final
Thread current = Thread.currentThread();int
c = getState();if
(c ==0
) {if
(compareAndSetState(0
, acquires)) { setExclusiveOwnerThread(current);return
true
; } }else
if
(current == getExclusiveOwnerThread()) {int
nextc = c + acquires;if
(nextc0
)throw
new
Error("Maximum lock count exceeded"
); setState(nextc);return
true
; }return
false
; } /<code>
AQS addWaiter
當tryAcquire方法獲取鎖資源(state)失敗以後,則會先調用addWaiter將當前線程封裝成Node,然後添加到 FIFO 隊列
<code>private
NodeaddWaiter
(Node mode
) { Node node =new
Node(Thread.currentThread(), mode); Node pred = tail;if
(pred !=null
) { node.prev = pred;if
(compareAndSetTail(pred, node)) { pred.next = node;return
node; } } enq(node);return
node; } /<code>
- 第一步:將當前線程封裝成 Node,並且為獨佔模式
- 判斷當前鏈表中的 tail 尾部節點是否為空,如果不為空,則通過 CAS 操作把當前線程的 Node 添加到 FIFO 隊列尾部
- 如果為空 或者 CAS 失敗,調用 enq (node) 方法自旋來繼續加入隊列
AQS.enq()
<code>private
Nodeenq
(
final
Node node) {for
(;;) { Node t = tail;if
(t ==null
) {if
(compareAndSetHead(new
Node())) tail = head; }else
{ node.prev = t;if
(compareAndSetTail(t, node)) { t.next = node;return
t; } } } 複製代碼/<code>
AQS.acquireQueued()
使線程在等待隊列中獲取資源,一直獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false
<code>final
boolean
acquireQueued
(
final
Node node,int
arg) {boolean
failed =true
;try
{boolean
interrupted =false
;for
(;;) {final
Node p = node.predecessor();if
(p == head && tryAcquire(arg)) { setHead(node); p.next =null
; failed =false
;return
interrupted; }if
(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted =true
; } }finally
{if
(failed) cancelAcquire(node); } 複製代碼/<code>
- 自旋判斷當前節點是不是 FIFO 隊列中的第二個節點,這裡默認 head 是 FIFO 的第一個節點
- 如果是,也就是 head 節點指向 node.predecessor() ,表示可以爭取鎖
- 獲取鎖成功,那麼設置頭節點為當前隊列中的第二個節點
- 如果獲取鎖失敗,則根據 waitStatus 決定是否需要掛起線程
- 最後通過 cancelAcquire 取消獲得鎖的操作
AQS.shouldParkAfterFailedAcquire
從上面的分析可以看出,如果不是隊列的第二個節點,也就是 FIFO 其他的節點, 會進行
shouldParkAfterFailedAcquire(p, node) 來進行判斷,當前爭取鎖的線程是否需要在獲取失敗後阻塞。 只有當 waitStatus == Node.SIGNAL 是才返回 true
<code>private
static
boolean
shouldParkAfterFailedAcquire
(Node pred, Node node)
{int
ws = pred.waitStatus;if
(ws == Node.SIGNAL)return
true
;if
(ws >0
) {do
{ node.prev = pred = pred.prev; }while
(pred.waitStatus >0
); pred.next = node; }else
{ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); }return
false
; }/<code>
- 它首先判斷一個節點的前置節點的狀態是否為 Node.SIGNAL = -1 ,是,說明此節點的後續節點可以不用 park 了
- 前置節點是取消狀態,將前置節點從鏈表上移除,使用前置節點的前一個節點賦值到當前節點的 prev 上面
- 重複第二步判斷,直到鏈表中的前一個節點不為 1 ,並設置 prev.next ,鏈表的下一個節點為當前節點
- CAS 比較替換前置節點為 Node.SIGNAL = -1
<code>static
final
int
CANCELLED =1
;static
final
int
SIGNAL =-1
;static
final
int
CONDITION =-2
; /<code>
AQS.parkAndCheckInterrupt
如果
shouldParkAfterFailedAcquire() 返回了true,則會執行:parkAndCheckInterrupt()方法, 它是通過LockSupport.park(this)將當前線程掛起到 WAITING 狀態,它需要等待一箇中斷、unpark方法來喚醒它, 通過這樣一種 FIFO 的機制的等待,來實現了 Lock 的操作。
<code>private
final
boolean
parkAndCheckInterrupt
()
{ LockSupport.park(this
);return
Thread.interrupted(); }/<code>
LockSupport LockSupport類是Java6引入的一個類,提供了基本的線程同步原語。LockSupport實際上是調用了Unsafe類裡的函數,歸結到Unsafe裡,只有兩個函數:
<code>public
native
void
unpark
(Thread jthread)
;public
native
void
park
(
boolean
isAbsolute,long
time); /<code>
LockSupport是用來創建鎖和其他同步類的基本線程阻塞原語。
LockSupport 提供park()和unpark()方法實現阻塞線程和解除線程阻塞, LockSupport和每個使用它的線程都與一個許可(permit)關聯。permit相當於1,0的開關,默認是0,調用一次unpark就加1變成1, 調用一次park會消費permit, 也就是將1變成0,同時park立即返回。再次調用park會變成block(因為permit為0了,會阻塞在這裡, 直到permit變為1), 這時調用unpark會把permit置為1。
每個線程都有一個相關的permit, permit最多隻有一個,重複調用unpark也不會積累
ReentrantLock.tryRelease 釋放鎖
<code>
protected
final
booleantryRelease
(
int
releases) {int
c = getState() - releases;if
(Thread.currentThread() != getExclusiveOwnerThread())throw
new
IllegalMonitorStateException(); booleanfree
=false
;if
(c ==0
) {free
=true
; setExclusiveOwnerThread(null); } setState(c);return
free
; } 複製代碼/<code>
上面看到release方法首先會調用ReentrantLock的內部類Sync的tryRelease方法。而通過下面代碼的分析,大概知道tryRelease做了這些事情。
- 獲取當前AQS的state,並減去1;
- 判斷當前線程是否等於 AQS 的exclusiveOwnerThread,如果不是,就拋IllegalMonitorStateException異常,這就保證了加鎖和釋放鎖必須是同一個線程;
- 如果(state-1)的結果不為0,說明鎖被重入了,需要多次unlock,這也是lock和unlock成對的原因;
- 如果(state-1)等於0,我們就將AQS的ExclusiveOwnerThread設置為null;
- 如果上述操作成功了,也就是tryRelase方法返回了true;返回false表示需要多次unlock。
模板方法模式應用
<code>protected
booleantryAcquire
(
int
arg) {throw
new
UnsupportedOperationException(); } 複製代碼/<code>
仔細看這個方法,雖然使用了模板方法模式,卻沒有使用 abstract 。這點確實有意思,可以先停下思考一下。
先來簡單介紹下模板方法模式: 規定好一些公用方法的主流程、算法而不具體實現,只是對流程進行控制,子類對具體的流程進行定義,這樣保證其子類實現也按照流程和算法架構進行實現。
舉個例子:抽象汽車公用方法 run() 包含以下功能
- start() --- 啟動汽車
- init() --- 初始化功能 : 可以自定義,打開空調,打開收音機,播放音樂,打開天窗...
- go() --- 前進:可以自定義,加速前進,百公里 0.1 s
- end() --- 熄火
BMW888 車子是抽象汽車的具體實現,但是 run() 方法一樣要遵守主流程。
再來回顧上面的問題,為啥 AQS 裡 tryAcquire 是模板方法卻不定義成 abstract? 因為獨佔模式下只用實現tryAcquire/tryRelease,而共享模式下只用實現
tryAcquireShared/tryReleaseShared。
如果都定義成abstract,那麼每個模式也要去實現另一模式下的接口。這樣設計不符合設計規範, 違反了接口隔離原則(Interface Segregpation Principle),仔細想想也是不合理的。
接口隔離原則:每個接口中不存在子類用不到卻必須實現的方法,如果不然,就要將接口拆分
程序領域(id:think-holy) 作者:holy 程序汪一隻
喜歡文章請關注我
程序領域
關鍵字: return tryAcquire 節點