從源碼角度徹底理解ReentrantLock(重入鎖)

目錄

1.前言

2.AbstractQueuedSynchronizer介紹2.1 AQS是構建同步組件的基礎

2.2 AQS的內部結構(ReentrantLock的語境下)

3 非公平模式加鎖流程3.1加鎖流程真正意義上的入口

3.2 嘗試獲取鎖的通用方法:tryAcquire()

3.3 獲取鎖失敗的線程如何安全的加入同步隊列:addWaiter()

3.4 線程加入同步隊列後會做什麼:acquireQueued()

3.5 加鎖流程源碼總結

4.非公平模式解鎖流程4.1 解鎖流程源碼解讀

4.2 解鎖流程源碼總結

5. 公平鎖相比非公平鎖的不同

6. 一些疑問的解答6.1 為什麼基於FIFO的同步隊列可以實現非公平鎖?

6.2 為什麼非公平鎖性能好

7. 閱讀源碼的收穫

1.前言

在ReentrantLock(重入鎖)功能詳解和應用演示這篇文章裡我們講解並演示了ReentrantLock(重入鎖)的各種功能,其中就談到ReentrantLock可以有公平鎖和非公平鎖的不同實現,只要在構造它的時候傳入不同的布爾值,繼續跟進下源碼我們就能發現,關鍵在於實例化內部變量sync的方式不同,如下所示


<code>/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}/<code>

公平鎖內部是FairSync,非公平鎖內部是NonfairSync。而不管是FairSync還是NonfariSync,都間接繼承自AbstractQueuedSynchronizer這個抽象類,如下圖所示

  • NonfairSync的類繼承關係


從源碼角度徹底理解ReentrantLock(重入鎖)

  • FairSync的類繼承關係


從源碼角度徹底理解ReentrantLock(重入鎖)

該抽象類為我們的加鎖和解鎖過程提供了統一的模板方法,只是一些細節的處理由該抽象類的實現類自己決定。所以在解讀ReentrantLock(重入鎖)的源碼之前,有必要了解下AbstractQueuedSynchronizer。

2.AbstractQueuedSynchronizer介紹

2.1 AQS是構建同步組件的基礎

AbstractQueuedSynchronizer,簡稱AQS,為構建不同的同步組件(重入鎖,讀寫鎖,CountDownLatch等)提供了可擴展的基礎框架,如下圖所示。


從源碼角度徹底理解ReentrantLock(重入鎖)

AQS以模板方法模式在內部定義了獲取和釋放同步狀態的模板方法,並留下鉤子函數供子類繼承時進行擴展,由子類決定在獲取和釋放同步狀態時的細節,從而實現滿足自身功能特性的需求。除此之外,AQS通過內部的同步隊列管理獲取同步狀態失敗的線程,向實現者屏蔽了線程阻塞和喚醒的細節。

2.2 AQS的內部結構(ReentrantLock的語境下)

AQS的內部結構主要由同步等待隊列構成

2.2.1 同步等待隊列

AQS中同步等待隊列的實現是一個帶頭尾指針(這裡用指針表示引用是為了後面講解源碼時可以更直觀形象,況且引用本身是一種受限的指針)且不帶哨兵結點(後文中的頭結點表示隊列首元素結點,不是指哨兵結點)的雙向鏈表。

<code>/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;//指向隊列首元素的頭指針

/**

* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;//指向隊列尾元素的尾指針/<code>

head是頭指針,指向隊列的首元素;tail是尾指針,指向隊列的尾元素。而隊列的元素結點Node定義在AQS內部,主要有如下幾個成員變量

<code>volatile Node prev; //指向前一個結點的指針

volatile Node next; //指向後一個結點的指針
volatile Thread thread; //當前結點代表的線程
volatile int waitStatus; //等待狀態
/<code>
  • prev:指向前一個結點的指針
  • next:指向後一個結點的指針
  • thread:當前結點表示的線程,因為同步隊列中的結點內部封裝了之前競爭鎖失敗的線程,故而結點內部必然有一個對應線程實例的引用

waitStatus:對於重入鎖而言,主要有3個值。0:初始化狀態;-1(SIGNAL):當前結點表示的線程在釋放鎖後需要喚醒後續節點的線程;1(CANCELLED):在同步隊列中等待的線程等待超時或者被中斷,取消繼續等待。

同步隊列的結構如下圖所示


從源碼角度徹底理解ReentrantLock(重入鎖)

為了接下來能夠更好的理解加鎖和解鎖過程的源碼,對該同步隊列的特性進行簡單的講解:

  • 1.同步隊列是個先進先出(FIFO)隊列,獲取鎖失敗的線程將構造結點並加入隊列的尾部,並阻塞自己。如何才能線程安全的實現入隊是後面講解的重點,畢竟我們在講鎖的實現,這部分代碼肯定是不能用鎖的。
  • 2.隊列首結點可以用來表示當前正獲取鎖的線程。
  • 3.當前線程釋放鎖後將嘗試喚醒後續處結點中處於阻塞狀態的線程。

為了加深理解,還可以在閱讀源碼的過程中思考下這個問題:

這個同步隊列是FIFO隊列,也就是說先在隊列中等待的線程將比後面的線程更早的得到鎖,那ReentrantLock是如何基於這個FIFO隊列實現非公平鎖的?

2.2.2 AQS中的其他數據結構(ReentrantLock的語境下)

  • 同步狀態變量
<code>/**
* The synchronization state.
*/
private volatile int state;/<code>

這是一個帶volatile前綴的int值,是一個類似計數器的東西。在不同的同步組件中有不同的含義。以ReentrantLock為例,state可以用來表示該鎖被線程重入的次數。當state為0表示該鎖不被任何線程持有;當state為1表示線程恰好持有該鎖1次(未重入);當state大於1則表示鎖被線程重入state次。因為這是一個會被併發訪問的量,為了防止出現可見性問題要用volatile進行修飾。

  • 持有同步狀態的線程標誌
<code>/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;/<code>

如註釋所言,這是在獨佔同步模式下標記持有同步狀態線程的。ReentrantLock就是典型的獨佔同步模式,該變量用來標識鎖被哪個線程持有。

瞭解AQS的主要結構後,就可以開始進行ReentrantLock的源碼解讀了。由於非公平鎖在實際開發中用的比較多,故以講解非公平鎖的源碼為主。以下面這段對非公平鎖使用的代碼為例

<code>/**
* @author:machao
* @create: 2020-01-09
**/
public class NoFairLockTest {


public static void main(String[] args) {

//創建非公平鎖
ReentrantLock lock = new ReentrantLock(false);

try {

//加鎖
lock.lock();

//模擬業務處理用時
TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

} finally {
//釋放鎖
lock.unlock();
}

}
}/<code>

3 非公平模式加鎖流程

加鎖流程從lock.lock()開始

<code>public void lock() {
sync.lock();
}/<code>

進入該源碼,正確找到sycn的實現類後可以看到真正有內容的入口方法

3.1加鎖流程真正意義上的入口

<code>/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
//加鎖流程真正意義上的入口
final void lock() {
//以cas方式嘗試將AQS中的state從0更新為1
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());//獲取鎖成功則將當前線程標記為持有鎖的線程,然後直接返回
else
acquire(1);//獲取鎖失敗則執行該方法
}

/<code>

首先嚐試快速獲取鎖,以cas的方式將state的值更新為1,只有當state的原值為0時更新才能成功,因為state在ReentrantLock的語境下等同於鎖被線程重入的次數,這意味著只有當前鎖未被任何線程持有時該動作才會返回成功。若獲取鎖成功,則將當前線程標記為持有鎖的線程,然後整個加鎖流程就結束了。若獲取鎖失敗,則執行acquire方法

<code>/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}/<code>

該方法主要的邏輯都在if判斷條件中,這裡面有3個重要的方法tryAcquire(),addWaiter()和acquireQueued(),這三個方法中分別封裝了加鎖流程中的主要處理邏輯,理解了這三個方法到底做了哪些事情,整個加鎖流程就清晰了。

3.2 嘗試獲取鎖的通用方法:tryAcquire()

tryAcquire是AQS中定義的鉤子方法,如下所示

<code>protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}/<code>

該方法默認會拋出異常,強制同步組件通過擴展AQS來實現同步功能的時候必須重寫該方法,ReentrantLock在公平和非公平模式下對此有不同實現,非公平模式的實現如下:

<code>protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}/<code>

底層調用了nonfairTryAcquire()

從方法名上我們就可以知道這是非公平模式下嘗試獲取鎖的方法,具體方法實現如下

<code>/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();//獲取當前線程實例
int c = getState();//獲取state變量的值,即當前鎖被重入的次數
if (c == 0) { //state為0,說明當前鎖未被任何線程持有
if (compareAndSetState(0, acquires)) { //以cas方式獲取鎖
setExclusiveOwnerThread(current); //將當前線程標記為持有鎖的線程
return true;//獲取鎖成功,非重入
}
}
else if (current == getExclusiveOwnerThread()) { //當前線程就是持有鎖的線程,說明該鎖被重入了
int nextc = c + acquires;//計算state變量要更新的值

if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);//非同步方式更新state值
return true; //獲取鎖成功,重入
}
return false; //走到這裡說明嘗試獲取鎖失敗
}/<code>

這是非公平模式下獲取鎖的通用方法。它囊括了當前線程在嘗試獲取鎖時的所有可能情況:

  • 1.當前鎖未被任何線程持有(state=0),則以cas方式獲取鎖,若獲取成功則設置exclusiveOwnerThread為當前線程,然後返回成功的結果;若cas失敗,說明在得到state=0和cas獲取鎖之間有其他線程已經獲取了鎖,返回失敗結果。
  • 2.若鎖已經被當前線程獲取(state>0,exclusiveOwnerThread為當前線程),則將鎖的重入次數加1(state+1),然後返回成功結果。因為該線程之前已經獲得了鎖,所以這個累加操作不用同步。
  • 3.若當前鎖已經被其他線程持有(state>0,exclusiveOwnerThread不為當前線程),則直接返回失敗結果

因為我們用state來統計鎖被線程重入的次數,所以當前線程嘗試獲取鎖的操作是否成功可以簡化為:state值是否成功累加1,是則嘗試獲取鎖成功,否則嘗試獲取鎖失敗。

其實這裡還可以思考一個問題:nonfairTryAcquire已經實現了一個囊括所有可能情況的嘗試獲取鎖的方式,為何在剛進入lock方法時還要通過compareAndSetState(0, 1)去獲取鎖,畢竟後者只有在鎖未被任何線程持有時才能執行成功,我們完全可以把compareAndSetState(0, 1)去掉,對最後的結果不會有任何影響。這種在進行通用邏輯處理之前針對某些特殊情況提前進行處理的方式在後面還會看到,一個直觀的想法就是它能提升性能,而代價是犧牲一定的代碼簡潔性。

退回到上層的acquire方法,

<code>public final void acquire(int arg) {
if (!tryAcquire(arg) && //當前線程嘗試獲取鎖,若獲取成功返回true,否則false
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //只有當前線程獲取鎖失敗才會執行者這部分代碼
selfInterrupt();
}/<code>

tryAcquire(arg)返回成功,則說明當前線程成功獲取了鎖(第一次獲取或者重入),由取反和&&可知,整個流程到這結束,只有當前線程獲取鎖失敗才會執行後面的判斷。先來看addWaiter(Node.EXCLUSIVE) 部分,這部分代碼描述了當線程獲取鎖失敗時如何安全的加入同步等待隊列。這部分代碼可以說是整個加鎖流程源碼的精華,充分體現了併發編程的藝術性。

3.3 獲取鎖失敗的線程如何安全的加入同步隊列:addWaiter()

這部分邏輯在addWaiter()方法中

<code>/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//首先創建一個新節點,並將當前線程實例封裝在內部,mode這裡為null
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;

if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);//入隊的邏輯這裡都有
return node;
}/<code>

首先創建了一個新節點,並將當前線程實例封裝在其內部,之後我們直接看enq(node)方法就可以了,中間這部分邏輯在enq(node)中都有,之所以加上這部分“重複代碼”和嘗試獲取鎖時的“重複代碼”一樣,對某些特殊情況

進行提前處理,犧牲一定的代碼可讀性換取性能提升。

<code>/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;//t指向當前隊列的最後一個節點,隊列為空則為null
if (t == null) { // Must initialize //隊列為空
if (compareAndSetHead(new Node())) //構造新結點,CAS方式設置為隊列首元素,當head==null時更新成功
tail = head;//尾指針指向首結點
} else { //隊列不為空
node.prev = t;
if (compareAndSetTail(t, node)) { //CAS將尾指針指向當前結點,當t(原來的尾指針)==tail(當前真實的尾指針)時執行成功
t.next = node; //原尾結點的next指針指向當前結點

return t;
}
}
}
}/<code>

這裡有兩個CAS操作:

  • compareAndSetHead(new Node()),CAS方式更新head指針,僅當原值為null時更新成功
<code>/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/<code>
  • compareAndSetTail(t, node),CAS方式更新tial指針,僅當原值為t時更新成功
<code>/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}/<code>

外層的for循環保證了所有獲取鎖失敗的線程經過失敗重試後最後都能加入同步隊列。因為AQS的同步隊列是不帶哨兵結點的,故當隊列為空時要進行特殊處理,這部分在if分句中。注意當前線程所在的結點不能直接插入 空隊列,因為阻塞的線程是由前驅結點進行喚醒的。故先要插入一個結點作為隊列首元素,當鎖釋放時由它來喚醒後面被阻塞的線程,從邏輯上這個隊列首元素也可以表示當前正獲取鎖的線程,雖然並不一定真實持有其線程實例。

首先通過new Node()創建一個空結點,然後以CAS方式讓頭指針指向該結點(該結點並非當前線程所在的結點),若該操作成功,則將尾指針也指向該結點。這部分的操作流程可以用下圖表示


從源碼角度徹底理解ReentrantLock(重入鎖)

當隊列不為空,則執行通用的入隊邏輯,這部分在else分句中

<code>else {
node.prev = t;//step1:待插入結點pre指針指向原尾結點

if (compareAndSetTail(t, node)) { step2:CAS方式更改尾指針
t.next = node; //原尾結點next指針指向新的結點
return t;
}
}/<code>

首先當前線程所在的結點的前向指針pre指向當前線程認為的尾結點,源碼中用t表示。然後以CAS的方式將尾指針指向當前結點,該操作僅當tail=t,即尾指針在進行CAS前未改變時成功。若CAS執行成功,則將原尾結點的後向指針next指向新的尾結點。整個過程如下圖所示


從源碼角度徹底理解ReentrantLock(重入鎖)

整個入隊的過程並不複雜,是典型的CAS加失敗重試的樂觀鎖策略。其中只有更新頭指針和更新尾指針這兩步進行了CAS同步,可以預見高併發場景下性能是非常好的。但是本著質疑精神我們不禁會思考下這麼做真的線程安全嗎?

  • 1.隊列為空的情況:
    因為隊列為空,故head=tail=null,假設線程執行2成功,則在其執行3之前,因為tail=null,其他進入該方法的線程因為head不為null將在2處不停的失敗,所以3即使沒有同步也不會有線程安全問題。
  • 2.隊列不為空的情況:
    假設線程執行5成功,則此時4的操作必然也是正確的(當前結點的prev指針確實指向了隊列尾結點,換句話說tail指針沒有改變,如若不然5必然執行失敗),又因為4執行成功,當前節點在隊列中的次序已經確定了,所以6何時執行對線程安全不會有任何影響,比如下面這種情況

  • 從源碼角度徹底理解ReentrantLock(重入鎖)

    為了確保真的理解了它,可以思考這個問題:把enq方法圖中的4放到5之後,整個入隊的過程還線程安全嗎?

    到這為止,獲取鎖失敗的線程加入同步隊列的邏輯就結束了。但是線程加入同步隊列後會做什麼我們並不清楚,這部分在acquireQueued方法中

    3.4 線程加入同步隊列後會做什麼:acquireQueued()

    先看acquireQueued方法的源碼

    <code>/**
    * Acquires in exclusive uninterruptible mode for thread already in
    * queue. Used by condition wait methods as well as acquire.
    *
    * @param node the node
    * @param arg the acquire argument
    * @return {@code true} if interrupted while waiting
    */
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    //死循環,正常情況下線程只有獲得鎖才能跳出循環
    for (;;) {
    final Node p = node.predecessor();//獲得當前線程所在結點的前驅結點
    //第一個if分句
    if (p == head && tryAcquire(arg)) {
    setHead(node); //將當前結點設置為隊列頭結點
    p.next = null; // help GC
    failed = false;
    return interrupted;//正常情況下死循環唯一的出口

    }
    //第二個if分句
    if (shouldParkAfterFailedAcquire(p, node) && //判斷是否要阻塞當前線程
    parkAndCheckInterrupt()) //阻塞當前線程
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }/<code>

    這段代碼主要的內容都在for循環中,這是一個死循環,主要有兩個if分句構成。第一個if分句中,當前線程首先會判斷前驅結點是否是頭結點,如果是則嘗試獲取鎖,獲取鎖成功則會設置當前結點為頭結點(更新頭指針)。為什麼必須前驅結點為頭結點才嘗試去獲取鎖?因為頭結點表示當前正佔有鎖的線程,正常情況下該線程釋放鎖後會通知後面結點中阻塞的線程,阻塞線程被喚醒後去獲取鎖,這是我們希望看到的。然而還有一種情況,就是前驅結點取消了等待,此時當前線程也會被喚醒,這時候就不應該去獲取鎖,而是往前回溯一直找到一個沒有取消等待的結點,然後將自身連接在它後面。一旦我們成功獲取了鎖併成功將自身設置為頭結點,就會跳出for循環。否則就會執行第二個if分句:確保前驅結點的狀態為SIGNAL,然後阻塞當前線程。

    先來看shouldParkAfterFailedAcquire(p, node),從方法名上我們可以大概猜出這是判斷是否要阻塞當前線程的,方法內容如下

    <code>/**
    * Checks and updates status for a node that failed to acquire.
    * Returns true if thread should block. This is the main signal
    * control in all acquire loops. Requires that pred == node.prev.
    *
    * @param pred node's predecessor holding status
    * @param node the node
    * @return {@code true} if thread should block
    */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) //狀態為SIGNAL

    /*
    * This node has already set status asking a release
    * to signal it, so it can safely park.
    */
    return true;
    if (ws > 0) { //狀態為CANCELLED,
    /*
    * Predecessor was cancelled. Skip over predecessors and
    * indicate retry.
    */
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else { //狀態為初始化狀態(ReentrentLock語境下)
    /*
    * waitStatus must be 0 or PROPAGATE. Indicate that we
    * need a signal, but don't park yet. Caller will need to
    * retry to make sure it cannot acquire before parking.
    */
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }/<code>

    可以看到針對前驅結點pred的狀態會進行不同的處理

    • 1.pred狀態為SIGNAL,則返回true,表示要阻塞當前線程。
    • 2.pred狀態為CANCELLED,則一直往隊列頭部回溯直到找到一個狀態不為CANCELLED的結點,將當前節點node掛在這個結點的後面。
    • 3.pred的狀態為初始化狀態,此時通過compareAndSetWaitStatus(pred, ws, Node.SIGNAL)方法將pred的狀態改為SIGNAL。

    其實這個方法的含義很簡單,就是確保當前結點的前驅結點的狀態為SIGNAL,SIGNAL意味著線程釋放鎖後會喚醒後面阻塞的線程。畢竟,只有確保能夠被喚醒,當前線程才能放心的阻塞。

    但是要注意只有在前驅結點已經是SIGNAL狀態後才會執行後面的方法立即阻塞,對應上面的第一種情況。其他兩種情況則因為返回false而重新執行一遍 for循環。這種延遲阻塞其實也是一種高併發場景下的優化,試想我如果在重新執行循環的時候成功獲取了鎖,是不是線程阻塞喚醒的開銷就省了呢?

    最後我們來看看阻塞線程的方法parkAndCheckInterrupt

    shouldParkAfterFailedAcquire返回true表示應該阻塞當前線程,則會執行parkAndCheckInterrupt方法,這個方法比較簡單,底層調用了LockSupport來阻塞當前線程,源碼如下:

    <code>/**
    * Convenience method to park and then check if interrupted
    *
    * @return {@code true} if interrupted
    */
    private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
    }/<code>

    該方法內部通過調用LockSupport的park方法來阻塞當前線程,不清楚LockSupport的可以看看這裡。LockSupport功能簡介及原理淺析

    下面通過一張流程圖來說明線程從加入同步隊列到成功獲取鎖的過程


    從源碼角度徹底理解ReentrantLock(重入鎖)

    概括的說,線程在同步隊列中會嘗試獲取鎖,失敗則被阻塞,被喚醒後會不停的重複這個過程,直到線程真正持有了鎖,並將自身結點置於隊列頭部。

    3.5 加鎖流程源碼總結

    ReentrantLock非公平模式下的加鎖流程如下


    從源碼角度徹底理解ReentrantLock(重入鎖)

    4.非公平模式解鎖流程

    4.1 解鎖流程源碼解讀

    解鎖的源碼相對簡單,源碼如下:

    <code>public void unlock() {
    sync.release(1);
    }/<code>


    <code>public final boolean release(int arg) {
    if (tryRelease(arg)) { //釋放鎖(state-1),若釋放後鎖可被其他線程獲取(state=0),返回true
    Node h = head;
    //當前隊列不為空且頭結點狀態不為初始化狀態(0)
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h); //喚醒同步隊列中被阻塞的線程
    return true;
    }
    return false;
    }/<code>

    正確找到sync的實現類,找到真正的入口方法,主要內容都在一個if語句中,先看下判斷條件tryRelease方法

    <code>protected final boolean tryRelease(int releases) {
    int c = getState() - releases; //計算待更新的state值
    if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { //待更新的state值為0,說明持有鎖的線程未重入,一旦釋放鎖其他線程將能獲取
    free = true;
    setExclusiveOwnerThread(null);//清除鎖的持有線程標記
    }
    setState(c);//更新state值
    return free;
    }/<code>

    tryRelease其實只是將線程持有鎖的次數減1,即將state值減1,若減少後線程將完全釋放鎖(state值為0),則該方法將返回true,否則返回false。由於執行該方法的線程必然持有鎖,故該方法不需要任何同步操作。 若當前線程已經完全釋放鎖,即鎖可被其他線程使用,則還應該喚醒後續等待線程。不過在此之前需要進行兩個條件的判斷:

    • h!=null是為了防止隊列為空,即沒有任何線程處於等待隊列中,那麼也就不需要進行喚醒的操作
    • h.waitStatus != 0是為了防止隊列中雖有線程,但該線程還未阻塞,由前面的分析知,線程在阻塞自己前必須設置前驅結點的狀態為SIGNAL,否則它不會阻塞自己。

    接下來就是喚醒線程的操作,unparkSuccessor(h)源碼如下

    <code>private void unparkSuccessor(Node node) {
    /*
    * If status is negative (i.e., possibly needing signal) try
    * to clear in anticipation of signalling. It is OK if this
    * fails or if status is changed by waiting thread.
    */
    int ws = node.waitStatus;
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

    /*
    * Thread to unpark is held in successor, which is normally
    * just the next node. But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    LockSupport.unpark(s.thread);
    }/<code>

    一般情況下只要喚醒後繼結點的線程就行了,但是後繼結點可能已經取消等待,所以從隊列尾部往前回溯,找到離頭結點最近的正常結點,並喚醒其線程。

    4.2 解鎖流程源碼總結


    從源碼角度徹底理解ReentrantLock(重入鎖)

    5. 公平鎖相比非公平鎖的不同

    公平鎖模式下,對鎖的獲取有嚴格的條件限制。在同步隊列有線程等待的情況下,所有線程在獲取鎖前必須先加入同步隊列。隊列中的線程按加入隊列的先後次序獲得鎖。 從公平鎖加鎖的入口開始,


    從源碼角度徹底理解ReentrantLock(重入鎖)

    對比非公平鎖,少了非重入式獲取鎖的方法,這是第一個不同點

    接著看獲取鎖的通用方法tryAcquire(),該方法在線程未進入隊列,加入隊列阻塞前和阻塞後被喚醒時都會執行


    從源碼角度徹底理解ReentrantLock(重入鎖)

    在真正CAS獲取鎖之前加了判斷,內容如下

    <code>public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
    ((s = h.next) == null || s.thread != Thread.currentThread());
    }/<code>

    從方法名我們就可知道這是判斷隊列中是否有優先級更高的等待線程,隊列中哪個線程優先級最高?由於頭結點是當前獲取鎖的線程,隊列中的第二個結點代表的線程優先級最高。 那麼我們只要判斷隊列中第二個結點是否存在以及這個結點是否代表當前線程就行了。這裡分了兩種情況進行探討:

    1. 第二個結點已經完全插入,但是這個結點是否就是當前線程所在結點還未知,所以通過s.thread != Thread.currentThread()進行判斷,如果為true,說明第二個結點代表其他線程。
    2. 第二個結點並未完全插入,我們知道結點入隊一共分三步:
    • 1.待插入結點的pre指針指向原尾結點
    • 2.CAS更新尾指針
    • 3.原尾結點的next指針指向新插入結點

    所以(s = h.next) == null 就是用來判斷2剛執行成功但還未執行3這種情況的。這種情況第二個結點必然屬於其他線程。 以上兩種情況都會使該方法返回true,即當前有優先級更高的線程在隊列中等待,那麼當前線程將不會執行CAS操作去獲取鎖,保證了線程獲取鎖的順序與加入同步隊列的順序一致,很好的保證了公平性,但也增加了獲取鎖的成本。

    6. 一些疑問的解答

    6.1 為什麼基於FIFO的同步隊列可以實現非公平鎖?

    由FIFO隊列的特性知,先加入同步隊列等待的線程會比後加入的線程更靠近隊列的頭部,那麼它將比後者更早的被喚醒,它也就能更早的得到鎖。從這個意義上,對於在同步隊列中等待的線程而言,它們獲得鎖的順序和加入同步隊列的順序一致,這顯然是一種公平模式。然而,線程並非只有在加入隊列後才有機會獲得鎖,哪怕同步隊列中已有線程在等待,非公平鎖的不公平之處就在於此。回看下非公平鎖的加鎖流程,線程在進入同步隊列等待之前有兩次搶佔鎖的機會:

    • 第一次是非重入式的獲取鎖,只有在當前鎖未被任何線程佔有(包括自身)時才能成功;
    • 第二次是在進入同步隊列前,包含所有情況的獲取鎖的方式。

    只有這兩次獲取鎖都失敗後,線程才會構造結點並加入同步隊列等待。而線程釋放鎖時是先釋放鎖(修改state值),然後才喚醒後繼結點的線程的。試想下這種情況,線程A已經釋放鎖,但還沒來得及喚醒後繼線程C,而這時另一個線程B剛好嘗試獲取鎖,此時鎖恰好不被任何線程持有,它將成功獲取鎖而不用加入隊列等待。線程C被喚醒嘗試獲取鎖,而此時鎖已經被線程B搶佔,故而其獲取失敗並繼續在隊列中等待。整個過程如下圖所示


    從源碼角度徹底理解ReentrantLock(重入鎖)

    如果以線程第一次嘗試獲取鎖到最後成功獲取鎖的次序來看,非公平鎖確實很不公平。因為在隊列中等待很久的線程相比還未進入隊列等待的線程並沒有優先權,甚至競爭也處於劣勢:在隊列中的線程要等待其他線程喚醒,在獲取鎖之前還要檢查前驅結點是否為頭結點。在鎖競爭激烈的情況下,在隊列中等待的線程可能遲遲競爭不到鎖。這也就非公平在高併發情況下會出現的飢餓問題。那我們在開發中為什麼大多使用會導致飢餓的非公平鎖?很簡單,因為它性能好啊。

    6.2 為什麼非公平鎖性能好

    非公平鎖對鎖的競爭是搶佔式的(隊列中線程除外),線程在進入等待隊列前可以進行兩次嘗試,這大大增加了獲取鎖的機會。這種好處體現在兩個方面:

    • 1.線程不必加入等待隊列就可以獲得鎖,不僅免去了構造結點並加入隊列的繁瑣操作,同時也節省了線程阻塞喚醒的開銷,線程阻塞和喚醒涉及到線程上下文的切換和操作系統的系統調用,是非常耗時的。在高併發情況下,如果線程持有鎖的時間非常短,短到線程入隊阻塞的過程超過線程持有並釋放鎖的時間開銷,那麼這種搶佔式特性對併發性能的提升會更加明顯。
    • 2.減少CAS競爭。如果線程必須要加入阻塞隊列才能獲取鎖,那入隊時CAS競爭將變得異常激烈,CAS操作雖然不會導致失敗線程掛起,但不斷失敗重試導致的對CPU的浪費也不能忽視。除此之外,加鎖流程中至少有兩處通過將某些特殊情況提前來減少CAS操作的競爭,增加併發情況下的性能。一處就是獲取鎖時將非重入的情況提前,如下圖所示


    從源碼角度徹底理解ReentrantLock(重入鎖)

    另一處就是入隊的操作,將同步隊列非空的情況提前處理


    從源碼角度徹底理解ReentrantLock(重入鎖)

    這兩部分的代碼在之後的通用邏輯處理中都有,很顯然屬於重複代碼,但因為避免了執行無意義的流程代碼,比如for循環,獲取同步狀態等,高併發場景下也能減少CAS競爭失敗的可能。

    7. 閱讀源碼的收穫

    • 1.熟悉了ReentrantLock的內部構造以及加鎖和解鎖的流程,理解了非公平鎖和公平鎖實現的本質區別以及為何前者相比後者有更好的性能。以此為基礎,我們可以更好的使用ReentrantLock。
    • 2.通過對部分實現細節的學習,瞭解瞭如何以CAS算法構建無鎖的同步隊列,我們可以借鑑並以此來構建自己的無鎖的併發容器。

    JAVA進階架構程序員福利:我這裡還總結整理了比較全面的JAVA相關的面試資料,都已經整理成了

    PDF版,這些都可以分享給大家,關注私信我:【806】,免費領取!


    分享到:


    相關文章: