AQS 不懂怎能說懂併發編程?考官的拷問我沒說,卻認真的寫出來

AQS

AQS :


AbstractQueuedSynchronizer的簡稱。

AQS提供了一種手動實現鎖功能,使用狀態管理、FIFO(先入先出隊列)等待隊列等實現一個同步器功能。 如果 Synchronized 關鍵字比喻成是一個自動擋汽車,提供自動加鎖一系列平順感, 那麼
AbstractQueuedSynchronizer 就是一個手動擋的變速箱,提供一些手動加鎖的靈活性和可控性。 樂趣包含:獨佔、共享、可重入、可自旋、可中斷。


AQS 不懂怎能說懂併發編程?考官的拷問我沒說,卻認真的寫出來


ReentrantLock 可重入鎖的實現來分析 AQS

一個對象可以重複遞歸的獲取本對象的鎖,而不會出現死鎖,這樣的鎖稱為可重入鎖。

ReentrantLock 就是這樣的一個實現,他通過 AQS 實現狀態管理和隊列控制。 具體實現的方式就是通過狀態位標識,當本線程佔用時 state = 1,重入後 state + 1,每以此重入都會執行 +1 操作。

當釋放鎖的時候,每執行 release 釋放操作,則 state -1 。直到 state = 0 結束,代表鎖已經完全釋放。 volatile 保證這個屬性的可見性,可以被其他線程讀取修改。

<code>     
    

private

volatile

int

state;/<code>

先來看看該類下的概況:


AQS 不懂怎能說懂併發編程?考官的拷問我沒說,卻認真的寫出來


構造函數

<code>     
    

public

ReentrantLock

()

{ sync =

new

NonfairSync(); }

public

ReentrantLock

(

boolean

fair)

{ sync = fair ?

new

FairSync() :

new

NonfairSync(); } /<code>

從代碼分析可以看到,ReentrantLock 的構造函數有兩種:

  • 第一種默認的,使用靜態內部類屬性 sync = new NonfairSync();, 默認使用非公平鎖方式。
  • 也可以通過 boolean 類型參數來指定,fair = true , sync = 公平鎖 .fair = false, sync = 非公平鎖。

類關係:ReentrantLock 三個內部類 與 AQS


  • Sync : 繼承自 AbstractQueuedSynchronizer,大名鼎鼎 AQS
  • NonfairSync : 非公平同步器實現,繼承 Sync
  • FairSync : 公平同步器實現,繼承 Sync

NonfairSync 非公平方式獲取鎖

<code>     
    

static

final

class

NonfairSync

extends

Sync

{

private

static

final

long

serialVersionUID =

7316153563782823691L

;

final

void

lock

()

{

if

(compareAndSetState(

0

,

1

)) setExclusiveOwnerThread(Thread.currentThread());

else

acquire(

1

); }

protected

final

boolean

tryAcquire

(

int

acquires)

{

return

nonfairTryAcquire(acquires); } }/<code>

NonfairSync.lock()

NonfairSync.lock() 方式獲取,以非公平的方式搶佔。

當調用 lock 方法時,會進行判斷,當前是否有線程佔用情況, 利用 unsafe 類的 CAS 比較交換 ,如果當前值是 0 ,符合預期的 expect 參數 0 ,那麼將 0 替換為 1 。 如果設置值成功,說明當期沒有線程佔用,當前線程可以直接獲取鎖,並設置當前線程為排他鎖模式 setExclusiveOwnerThread, 這個過程就是搶佔,如果CAS操作失敗,調用 acquire(1),表示沒有搶佔成功,會插入 queue 等待,以公平鎖方式加入FIFO。

下面看下 AQS 的 acquire 具體實現方式:

<code>     
    

public

final

void

acquire

(

int

arg)

{

if

(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /<code>

在 acquire(1)函數中會有兩個判斷條件來決定是否調用 selfInterrupt() 來阻塞當前線程。

  1. tryAcquire(1) 這個方法由子類實現,也就是 NonfairSync 裡面的實現。
  2. acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 這一行代碼博大精深,還是待會結合源碼具體分析。 也就是說,如果競爭獲取 state 失敗,並且已經將本線程封裝為 Node 加入 FIFO 隊列,那麼就說明在等待其他線程先執行,需要掛起本線程等待。

NonfairSync.tryAcquire()

tryAcquire() 方式嘗試獲取,拿到或者拿不到鎖都會直接返回結果。 這個方法在 lock() 裡面也會調用,就是在搶佔鎖失敗後走的流程。

從代碼中可以看出,tryAcquire() 調用的是父類 Sync nonfairTryAcquire 方法,而 sync 實現的是 AQS 中的 tryAcquire()模板方法.

<code>         
        

final

boolean

nonfairTryAcquire

(

int

acquires)

{

final

Thread current = Thread.currentThread();

int

c = getState();

if

(c ==

0

) {

if

(compareAndSetState(

0

, acquires)) { setExclusiveOwnerThread(current);

return

true

; } }

else

if

(current == getExclusiveOwnerThread()) {

int

nextc = c + acquires;

if

(nextc

0

)

throw

new

Error(

"Maximum lock count exceeded"

); setState(nextc);

return

true

; }

return

false

; } /<code>

AQS addWaiter

當tryAcquire方法獲取鎖資源(state)失敗以後,則會先調用addWaiter將當前線程封裝成Node,然後添加到 FIFO 隊列

<code>    

private

Node

addWaiter

(

Node mode

)

{ Node node =

new

Node(Thread.currentThread(), mode); Node pred = tail;

if

(pred !=

null

) { node.prev = pred;

if

(compareAndSetTail(pred, node)) { pred.next = node;

return

node; } } enq(node);

return

node; } /<code>
  • 第一步:將當前線程封裝成 Node,並且為獨佔模式
  • 判斷當前鏈表中的 tail 尾部節點是否為空,如果不為空,則通過 CAS 操作把當前線程的 Node 添加到 FIFO 隊列尾部
  • 如果為空 或者 CAS 失敗,調用 enq (node) 方法自旋來繼續加入隊列

AQS.enq()

<code>     
    

private

Node

enq

(

final

Node node)

{

for

(;;) { Node t = tail;

if

(t ==

null

) {

if

(compareAndSetHead(

new

Node())) tail = head; }

else

{ node.prev = t;

if

(compareAndSetTail(t, node)) { t.next = node;

return

t; } } } 複製代碼/<code>

AQS.acquireQueued()

使線程在等待隊列中獲取資源,一直獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false

<code>     
    

final

boolean

acquireQueued

(

final

Node node,

int

arg)

{

boolean

failed =

true

;

try

{

boolean

interrupted =

false

;

for

(;;) {

final

Node p = node.predecessor();

if

(p == head && tryAcquire(arg)) { setHead(node); p.next =

null

; failed =

false

;

return

interrupted; }

if

(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted =

true

; } }

finally

{

if

(failed) cancelAcquire(node); } 複製代碼/<code>
  • 自旋判斷當前節點是不是 FIFO 隊列中的第二個節點,這裡默認 head 是 FIFO 的第一個節點
  • 如果是,也就是 head 節點指向 node.predecessor() ,表示可以爭取鎖
  • 獲取鎖成功,那麼設置頭節點為當前隊列中的第二個節點
  • 如果獲取鎖失敗,則根據 waitStatus 決定是否需要掛起線程
  • 最後通過 cancelAcquire 取消獲得鎖的操作

AQS.shouldParkAfterFailedAcquire

從上面的分析可以看出,如果不是隊列的第二個節點,也就是 FIFO 其他的節點, 會進行
shouldParkAfterFailedAcquire(p, node) 來進行判斷,當前爭取鎖的線程是否需要在獲取失敗後阻塞。 只有當 waitStatus == Node.SIGNAL 是才返回 true

<code>         
        

private

static

boolean

shouldParkAfterFailedAcquire

(Node pred, Node node)

{

int

ws = pred.waitStatus;

if

(ws == Node.SIGNAL)

return

true

;

if

(ws >

0

) {

do

{ node.prev = pred = pred.prev; }

while

(pred.waitStatus >

0

); pred.next = node; }

else

{ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); }

return

false

; }/<code>
  • 它首先判斷一個節點的前置節點的狀態是否為 Node.SIGNAL = -1 ,是,說明此節點的後續節點可以不用 park 了
  • 前置節點是取消狀態,將前置節點從鏈表上移除,使用前置節點的前一個節點賦值到當前節點的 prev 上面
  • 重複第二步判斷,直到鏈表中的前一個節點不為 1 ,並設置 prev.next ,鏈表的下一個節點為當前節點
  • CAS 比較替換前置節點為 Node.SIGNAL = -1
<code>         
        

static

final

int

CANCELLED =

1

;

static

final

int

SIGNAL =

-1

;

static

final

int

CONDITION =

-2

; /<code>

AQS.parkAndCheckInterrupt

如果
shouldParkAfterFailedAcquire() 返回了true,則會執行:parkAndCheckInterrupt()方法, 它是通過LockSupport.park(this)將當前線程掛起到 WAITING 狀態,它需要等待一箇中斷、unpark方法來喚醒它, 通過這樣一種 FIFO 的機制的等待,來實現了 Lock 的操作。

<code>

private

final

boolean

parkAndCheckInterrupt

()

{ LockSupport.park(

this

);

return

Thread.interrupted(); }/<code>

LockSupport LockSupport類是Java6引入的一個類,提供了基本的線程同步原語。LockSupport實際上是調用了Unsafe類裡的函數,歸結到Unsafe裡,只有兩個函數:

<code>    

public

native

void

unpark

(Thread jthread)

;

public

native

void

park

(

boolean

isAbsolute,

long

time)

; /<code>

LockSupport是用來創建鎖和其他同步類的基本線程阻塞原語。

LockSupport 提供park()和unpark()方法實現阻塞線程和解除線程阻塞, LockSupport和每個使用它的線程都與一個許可(permit)關聯。permit相當於1,0的開關,默認是0,調用一次unpark就加1變成1, 調用一次park會消費permit, 也就是將1變成0,同時park立即返回。再次調用park會變成block(因為permit為0了,會阻塞在這裡, 直到permit變為1), 這時調用unpark會把permit置為1。

每個線程都有一個相關的permit, permit最多隻有一個,重複調用unpark也不會積累

ReentrantLock.tryRelease 釋放鎖

<code>        

protected

final

boolean

tryRelease

(

int

releases)

{

int

c = getState() - releases;

if

(Thread.currentThread() != getExclusiveOwnerThread())

throw

new

IllegalMonitorStateException(); boolean

free

=

false

;

if

(c ==

0

) {

free

=

true

; setExclusiveOwnerThread(null); } setState(c);

return

free

; } 複製代碼/<code>

上面看到release方法首先會調用ReentrantLock的內部類Sync的tryRelease方法。而通過下面代碼的分析,大概知道tryRelease做了這些事情。

  1. 獲取當前AQS的state,並減去1;
  2. 判斷當前線程是否等於 AQS 的exclusiveOwnerThread,如果不是,就拋IllegalMonitorStateException異常,這就保證了加鎖和釋放鎖必須是同一個線程;
  3. 如果(state-1)的結果不為0,說明鎖被重入了,需要多次unlock,這也是lock和unlock成對的原因;
  4. 如果(state-1)等於0,我們就將AQS的ExclusiveOwnerThread設置為null;
  5. 如果上述操作成功了,也就是tryRelase方法返回了true;返回false表示需要多次unlock。

模板方法模式應用

<code>     
    

protected

boolean

tryAcquire

(

int

arg)

{

throw

new

UnsupportedOperationException(); } 複製代碼/<code>

仔細看這個方法,雖然使用了模板方法模式,卻沒有使用 abstract 。這點確實有意思,可以先停下思考一下。

先來簡單介紹下模板方法模式: 規定好一些公用方法的主流程、算法而不具體實現,只是對流程進行控制,子類對具體的流程進行定義,這樣保證其子類實現也按照流程和算法架構進行實現。

舉個例子:抽象汽車公用方法 run() 包含以下功能

  • start() --- 啟動汽車
  • init() --- 初始化功能 : 可以自定義,打開空調,打開收音機,播放音樂,打開天窗...
  • go() --- 前進:可以自定義,加速前進,百公里 0.1 s
  • end() --- 熄火

BMW888 車子是抽象汽車的具體實現,但是 run() 方法一樣要遵守主流程。

再來回顧上面的問題,為啥 AQS 裡 tryAcquire 是模板方法卻不定義成 abstract? 因為獨佔模式下只用實現tryAcquire/tryRelease,而共享模式下只用實現
tryAcquireShared/tryReleaseShared。

如果都定義成abstract,那麼每個模式也要去實現另一模式下的接口。這樣設計不符合設計規範, 違反了接口隔離原則(Interface Segregpation Principle),仔細想想也是不合理的。

接口隔離原則:每個接口中不存在子類用不到卻必須實現的方法,如果不然,就要將接口拆分

程序領域(id:think-holy) 作者:holy 程序汪一隻

喜歡文章請關注我

程序領域


分享到:


相關文章: