深入解析ReentrantLock源碼

前言

前段時間群裡很多人在群裡討論ReentrantLock,讓小編也有了發一篇ReentrantLock鎖文章的念頭,所以今天給大家分享一篇Lock鎖的文章,幫助大家更清晰的理解ReentrantLock

概述

ReentrantLock是一個可重入的互斥鎖,也被稱為獨佔鎖。它支持公平鎖和非公平鎖兩種模式。

ReentrantLock的使用方法

下面看一個最初級的例子:

<code>public class Test {

//默認內部採用非公平實現
ReentrantLock lock=new ReentrantLock();

public void myMethor(){
lock.lock();

//需要加鎖的一些操作

//一定要確保unlock能被執行到,尤其是在存在異常的情況下
lock.unlock();
}
}/<code>

在進入方法後,在需要加鎖的一些操作執行之前需要調用lock方法,在jdk文檔中對lock方法詳細解釋如下:

獲得鎖。如果鎖沒有被另一個線程佔用並且立即返回,則將鎖定計數設置為1。 如果當前線程已經保持鎖定,則保持計數增加1,該方法立即返回。 如果鎖被另一個線程保持,則當前線程將被禁用以進行線程調度,並且在鎖定已被獲取之前處於休眠狀態,此時鎖定保持計數被設置為1。

這裡也很好的解釋了什麼是可重入鎖,如果一個線程已經持有了鎖,它再次請求獲取自己已經拿到的鎖,是能夠獲取成功的,這就是可重入鎖。

在需要加鎖的代碼執行完畢之後,就會調用unlock釋放掉鎖。在jdk文檔之中對,unlock的解釋如下:

嘗試釋放此鎖。如果當前線程是該鎖的持有者,則保持計數遞減。 如果保持計數現在為零,則鎖定被釋放。 如果當前線程不是該鎖的持有者,則拋出IllegalMonitorStateException 。

在這裡有一個需要注意的地點,lock和unlock都反覆提到了一個計數,這主要是因為ReentrantLock是可重入的。每次獲取鎖(重入)就將計數器加一,每次釋放的時候的計數器減一,直到計數器為0,就將鎖釋放掉了。

以上就是最基礎,最簡單的使用方法。其餘的一些方法,都是一些拓展的功能,查看jdk文檔即可知道如何使用。

源碼分析

繼承體系

深入解析ReentrantLock源碼

可以看出ReentrantLock繼承自AQS並實現了Lock接口。它內部有公平鎖和非公平鎖兩種實現,這兩種實現都是繼承自Sync。根據ReentrantLock決定到底採用公平鎖還是非公平鎖實現。

<code> public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}/<code>

核心方法源碼分析

Lock方法

  • 首先調用具體的Lock實現.sync可能是非公平鎖實現也可能是公平鎖實現,這取決於你new對象時的參數。
<code>public void lock() {
sync.lock();
}/<code>

我們以非公平鎖實現來看下面的下面的代碼。

  • 非公平鎖的lock方法的具體實現如下
<code>final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}/<code>

首先進來就是一個判斷,其中判斷的條件就是compareAndSetState(0, 1)

.毫無疑問這是一個CAS。它的意思是如果當前的state的值的為0就將1與其交換(可以理解為將1賦值給0)並返回true。其實在這一步如果state的值修改成功了,那麼鎖就獲取成功了。setExclusiveOwnerThread(Thread.currentThread())這行代碼就是將當前線程設置為該排他鎖的擁有者。

如果CAS失敗了,那麼就調用acquire(1);

  • 如果初次獲得鎖失敗就調用qcquire(1)這個方法的具體實現如下;
<code>public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}/<code>

這個方法進來首先第一步就是調用tryAcquire(arg).那麼該方法是幹什麼的呢?非公平鎖實際是調用了這個實現:

<code>protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}/<code>

它具體的實現是在nonfairTryAcquire(acquires)中。

<code> final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); //獲取鎖的狀態state,這就是前面我們CAS的操作對象
if (c == 0) {
//c==0說明沒被其它獲取

if (compareAndSetState(0, acquires)) { //CAS修改state
//CAS修改成功,說明獲取鎖成功,將當前線程設置為該排他鎖的擁有者
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//如果鎖已經被佔有,但是是被當前鎖佔有的(可重入的具體體現)
//計數器加一
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//鎖被其它線程佔有,就返回false
return false;
}/<code>
  • 第二次嘗試獲取鎖失敗後,就進行下一步操作

我們再會過頭看void acquire(int arg)首先嚐試獲取鎖,獲取成功就直接返回了,獲取失敗就會執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)進行排隊。這一行代碼可以分為兩部分看,一部分是addWaiter(Node.EXCLUSIVE)一部分是acquireQueued.我們先看addWaiter(Node.EXCLUSIVE)

<code> private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;

if (pred != null) {
//隊列已經初始化了,就直接入隊即可
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node; //返回
}
}
//隊列沒有初始化,初始化隊列併入隊
enq(node);
return node;
}/<code>

初始化對立對入隊的具體實現如下:

<code>    private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//初始化隊列
if (compareAndSetHead(new Node()))
tail = head;
} else {
//隊列初始化成功,進行入隊
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}/<code>

這裡稍微補充一下這個AQS中的這個等待隊列。

深入解析ReentrantLock源碼

  1. 節點也創建了,等待隊列也入了現在該看boolean acquireQueued(final Node node, int arg)方法了。這個方法的具體實現如下:
<code>final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //獲取當前節點的先驅節點
if (p == head && tryAcquire(arg)) {
//如果當前節點的前一個節點是頭節點,就會執行tryAcquire(arg)再次嘗試獲取鎖
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//根據情況進入park狀態
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}/<code>

unlock方法

  • 和加鎖類似,調用具體的實現
<code>    public void unlock() {
sync.release(1);
}/<code>
  • 具體的release實現
<code>    public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//喚醒等待的線程,可以拿鎖了
unparkSuccessor(h);
return true;
}
return false;
}/<code>

該方法首先就調用了tryRelease(arg)方法,這個方法就是實現釋放資源的關鍵。釋放的具體操作,也印證了在jdk文檔之中的關於unlock和lock的說明。

<code>protected final boolean tryRelease(int releases) {
int c = getState() - releases; //計算釋放後的state的值
if (Thread.currentThread() != getExclusiveOwnerThread())
//如果當前線程沒有持有鎖,就拋異常
throw new IllegalMonitorStateException();
boolean free = false; //標記為釋放失敗
if (c == 0) {
//如果state為0了,說沒沒有線程佔有該鎖了
//進行重置所有者
free = true;
setExclusiveOwnerThread(null);
}
//重置state的值
setState(c);
return free;
}/<code>
  • 如果還有線程在等待鎖資源,那麼就可以喚醒它們了回到boolean release(int arg)看
<code> if (h != null && h.waitStatus != 0)
//喚醒等待的線程,可以拿鎖了
unparkSuccessor(h);/<code>

ReentrantLock的高階使用方法

我們使用synchronized的時候,可以通過wait和notify來讓線程等待,和喚醒線程。在ReentrantLock中,我們也可以使用Condition中的await和signal來使線程等待和喚醒。以下面這段代碼來解釋:

<code>public class Test {

static ReentrantLock lock=new ReentrantLock();
//獲取到condition
static Condition condition=lock.newCondition();

public static class TaskA implements Runnable{

@Override
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName() + "開始執行");
try {
System.out.println(Thread.currentThread().getName() + "準備釋放掉鎖並等待");
//在此等待,直到其它線程喚醒
condition.await();
System.out.println(Thread.currentThread().getName() + "重新拿到鎖並執行");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}

}
}


public static class TaskB implements Runnable{

@Override
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName() + "開始執行");

System.out.println(Thread.currentThread().getName() + "開始喚醒等待的線程");
//喚醒等待的線程
condition.signal();
try {
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + "任務執行完畢");
}catch (InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

public static void main(String[] args) {

Thread taskA=new Thread(new TaskA(),"taskA");
Thread taskB=new Thread(new TaskB(),"taskB");
taskA.start();
taskB.start();
}

}/<code>

輸出結果:

<code>taskA開始執行
taskA準備釋放掉鎖並等待
taskB開始執行
taskB開始喚醒等待的線程
taskB任務執行完畢
taskA重新拿到鎖並執行/<code>

現象解釋:首先taskA拿到鎖,並執行,到condition.await();釋放鎖,並進入阻塞。taskB因此拿到剛才taskA釋放掉的鎖,taskB開始執行。taskB執行到condition.signal();喚醒了taskA,taskB繼續執行,taskA因為拿不到鎖,因此雖然已經被喚醒了,但是還是要等到taskB執行完畢,釋放鎖後,才有機會拿到鎖,執行自己的代碼。

那麼這個過程,源碼到底是如何實現的呢?

Condition源碼分析

await()的源碼分析

具體的實現如下:

<code>  public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); //添加一個條件節點
int savedState = fullyRelease(node); //釋放掉所有的資源
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//如果當前線程不在等待隊列中,park阻塞
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break; //線程被中斷就跳出循環
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
//取消條件隊列中已經取消的等待節點的鏈接
unlinkCancelledWaiters();
if (interruptMode != 0)
//等待結束後處理中斷
reportInterruptAfterWait(interruptMode);
}/<code>

基本的步驟如下:

  1. 首先判斷線程是否被中斷,如果中斷則拋出InterruptedException()異常
  2. 添加當前線程到條件隊列中去,然後釋放掉所有的資源
  3. 如果當前線程不在等待隊列中,就直接park阻塞當前線程

signal()方法源碼分析

具體的實現代碼如下:

<code>        public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}/<code>

這個方法中最重要的也就是doSignal(first).它的實現如下:

<code>        private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null; //解除等待隊列中首節點的鏈接
} while (!transferForSignal(first) && //轉移入等待隊列
(first = firstWaiter) != null);
}/<code>

該方法所做的事情就是從等待隊列中移除指定節點,並將其加入等待隊列中去。轉移節點的方法實現如下:

<code>    final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
//CAS修改狀態失敗,說明節點被取消了,直接返回false
return false;


/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); //加入節點到等待隊列
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//如果前節點被取消,說明當前為最後一個等待線程,直接unpark喚醒,
LockSupport.unpark(node.thread);
return true;
}/<code>

至此ReentrantLock的源碼分析就結束了!

感謝大家的閱讀


原文鏈接:https://www.cnblogs.com/zofun/p/12215848.html


分享到:


相關文章: