java之AQS和顯式鎖

本次內容主要介紹AQS、AQS的設計及使用、 ReentrantLock、 ReentrantReadWriteLock以及手寫一個可重入獨佔鎖

1、什麼是AQS ?

A QS,隊列同步器AbstractQueuedSynchronizer的簡寫,JDK1.5引入的, 是用來構建鎖或者其他同步組件的基礎框架,它使用了一個int成員變量表示同步狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作。AQS的作者Doug Lea大師期望它能夠成為實現大部分同步需求的基礎。

2、AQS的設計及其作用

AbstractQueuedSynchronizer是一個抽象類,先看一下其類圖。

java之AQS和顯式鎖

AQS中裡有一個volatile修飾int型的state來代表同步狀態,使用同步器提供的3個方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))來改變狀態,因為它們能夠保證狀態的改變是安全的。

AQS使用的是模板方法模式,主要使用方式是繼承,且通常將子類推薦定義為靜態內部類,子類通過繼承AQS並實現它的抽象方法來管理同步狀態。AQS自身沒有實現任何同步接口,它僅僅是定義了若干同步狀態獲取和釋放的方法來供自定義同步組件使用,同步器既可以支持獨佔式地獲取同步狀態,也可以支持共享式地獲取同步狀態,這樣就可以方便實現不同類型的同步組件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。 AQS是實現鎖(也可以是任意同步組件)的關鍵,在鎖的實現中聚合同步器。可以這樣理解二者之間的關係:

  • 鎖是面向使用者的,它定義了使用者與鎖交互的接口(比如可以允許兩個線程並行訪問),隱藏了實現細節
  • 同步器面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的排隊、等待與喚醒等底層操作。鎖和同步器很好地隔離了使用者和實現者所需關注的領域。實現者需要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步組件的實現中,並調用同步器提供的模板方法,而這些模板方法將會調用使用者重寫的方法。

實現自定義同步組件時,將會調用AQS提供的模板方法,AQS的模板方法如下:

java之AQS和顯式鎖

AQS提供的模板方法基本上分為3類:獨佔式獲取與釋放同步狀態、共享式獲取與釋放同步狀態和查詢同步隊列中的等待線程情況。AQS中可重寫的方法如下:

java之AQS和顯式鎖

AQS中有一個內部類Node,用於構造一個隊列來保存排隊等待獲取鎖的線程。看一下Node的源碼及其簡單說明:

<code>static final class Node {
/**標記線程是因為獲取共享資源失敗被阻塞添加到隊列中的*/

static final Node SHARED = new Node();

/**表示線程因為獲取獨佔資源失敗被阻塞添加到隊列中的*/
static final Node EXCLUSIVE = null;

/**表示線程因為中斷或者等待超時,需要從等待隊列中取消等待*/
static final int CANCELLED = 1;

/**表示當前線程佔有鎖,隊列中沒有存放線程引用頭結點的後繼結點A處於等待狀態,
* 如果已佔有鎖的線程釋放鎖或被CANCEL之後就會通知結點A去獲取鎖。*/
static final int SIGNAL = -1;

/**當持有鎖的線程調用了Condition(下面會講到具體運用)的signal()方法之後,處於同一condition下的等待線程會去競爭鎖*/
static final int CONDITION = -2;

/**表示把waitStatus的值,指示下一個acquireShared應該無條件傳播*/
static final int PROPAGATE = -3;

/**表示當前線程的等待狀態*/
volatile int waitStatus;

volatile Node prev;

volatile Node next;

/**表示進入AQS隊列中的線程引用*/
volatile Thread thread;
Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;

}


final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}/<code>

AQS基礎內容先了解這麼多,後面會用AQS實現一個自己的可重入獨佔式鎖。

3、顯式鎖Lock

與使用關鍵字synchronized相比,顯式鎖Lock提供了更廣泛的加鎖操作。 Lock獲取鎖的方法更加靈活,並且支持多個關聯的Condition對象,先看一下Lock的常用API:

java之AQS和顯式鎖

與關鍵字synchronized相比,Lock有以下幾個優勢:(1)可以嘗試非阻塞地獲取鎖,如果這一時刻鎖沒有被其他線程獲取到,則成功獲取並持有鎖。

(2)獲取鎖過程中可以被中斷。

(3)超時獲取鎖,可以指定一個時間,在指定的時間範圍內獲取鎖,如果截止時間到了仍然無法獲取鎖,則返回,可以避免線程長時間阻塞。

Lock也有缺點,比如說必須手動的釋放鎖,所以在使用Lock時有一個範式,以 ReentrantLock為例:

<code>class X {
private final ReentrantLock lock = new ReentrantLock();
// ...

public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}/<code>

還有一個要注意的地方是,不要將獲取鎖的過程寫在try塊中,因為如果在獲取鎖(自定義鎖的實現)時發生了異常,異常拋出的同時,也會導致鎖無故釋放。

4、 ReentrantLock

4.1 公平鎖和非公平鎖

ReentrantLock是可重入的互斥鎖,與使用synchronized修飾的方法和代碼塊具有相同的基本行為和語義,但具有擴展的功能。最明顯的一個擴展功能是ReentrantLock可以定義為公平鎖或非公平鎖,synchronized內部實現使用的是非公平鎖機制。從時間上來說,先對鎖進行獲取的請求一定先被滿足,那麼這個鎖是公平的,反之是不公平的。 ReentrantLock提供了一個構造函數,能夠控制鎖是否是公平的。事實上,公平的鎖機制往往沒有非公平的效率高,原因如下: 在激烈競爭的情況下,恢復一個被掛起的線程與該線程真正開始運行之間存在著嚴重的延遲。假設線程A持有一個鎖,並且線程B請求這個鎖。由於這個鎖已被線程A持有,因此B將被掛起。當A釋放鎖時,B將被喚醒,因此會再次嘗試獲取鎖。與此同時,如果C也請求這個鎖,那麼C很可能會在B被完全喚醒之前獲得、使用以及釋放這個鎖。這樣的情況是一種“雙贏”的局面,B獲得鎖的時刻並沒有推遲,C更早地獲得了鎖,並且吞吐量也獲得了提高,用一張圖來說明。

java之AQS和顯式鎖

我們可以去看下公平鎖和非公平鎖加鎖的源碼,區別其實非常小,先看非公平鎖:

<code> 1 final boolean nonfairTryAcquire(int acquires) {
2 final Thread current = Thread.currentThread();
3 int c = getState();
4 if (c == 0) {
5 if (compareAndSetState(0, acquires)) {
6 setExclusiveOwnerThread(current);
7 return true;
8 }
9 }
10 else if (current == getExclusiveOwnerThread()) {
11 int nextc = c + acquires;
12 if (nextc < 0) // overflow
13 throw new Error("Maximum lock count exceeded");
14 setState(nextc);
15 return true;
16 }
17 return false;
18 }/<code>

再看公平鎖:

<code> 1    protected final boolean tryAcquire(int acquires) {
2 final Thread current = Thread.currentThread();
3 int c = getState();
4 if (c == 0) {
5 if (!hasQueuedPredecessors() &&
6 compareAndSetState(0, acquires)) {
7 setExclusiveOwnerThread(current);
8 return true;
9 }
10 }
11 else if (current == getExclusiveOwnerThread()) {
12 int nextc = c + acquires;
13 if (nextc < 0)
14 throw new Error("Maximum lock count exceeded");
15 setState(nextc);
16 return true;
17 }
18 return false;
19 }
20 }/<code>

通過對比源碼可以發現,公平鎖在第5行的判斷條件裡多了一個!hasQueuedPredecessors(),這個的意思是查詢是否有線程在排隊等待獲取鎖,如果有線程在排隊,則不去搶鎖。而非公平鎖才不管你有沒有線程在排隊等待,直接去搶一次再說,不管搶不搶的到。

4.2 ReentrantLock使用

隔壁老王在某寶買了一個FBB版的娃娃,假設娃娃從廣東發出,目的是上海,距離大約1500公里。娃娃發出後,在離目的地小於100公里的時候給老王發短信說,你的娃娃快到了。在上海的快遞員接到娃娃後,會給老王打電話讓他來取娃娃。這是一個典型的等待/通知機制,在之前的篇幅中我們使用Object類中的wait()和notifyAll()等待通知機制實現了一個自己的數據庫連接池,現在使用 ReentrantLock來模擬剛剛老王買娃娃的場景。

業務實現代碼:

<code> 1 import java.util.concurrent.locks.Condition;
2 import java.util.concurrent.locks.Lock;
3 import java.util.concurrent.locks.ReentrantLock;
4
5 public class BuyFBBWawa {
6 public final static String DESTINATION = "Shanghai";
7 /**娃娃剩餘運輸里程數*/
8 private int km;
9 /**娃娃當前位置*/
10 private String site;
11 private Lock lock = new ReentrantLock();
12 /**距離Condition*/
13 private Condition kmCondition = lock.newCondition();

14 /**位置Condition*/
15 private Condition siteCondition = lock.newCondition();
16
17 public BuyFBBWawa() {
18 }
19
20 public BuyFBBWawa(int km, String site) {
21 this.km = km;
22 this.site = site;
23 }
24
25 /**
26 * 距離目的地小於100公里,通知處於wait狀態並需要給老王發送短信的線程工作
27 */
28 public void changeKm() {
29 lock.lock();
30 try {
31 this.km = 99;
32 kmCondition.signal();//通知其他在kmCondition上等待的線程
33 } finally {
34 lock.unlock();
35 }
36 }
37
38 /**
39 * 到達菜鳥驛站,通知處於wait狀態並需要給老王打電話的線程工作
40 */
41 public void changeSite() {
42 lock.lock();
43 try {
44 this.site = "Shanghai";
45 siteCondition.signal();//通知其他在siteCondition上等待的線程
46 } finally {
47 lock.unlock();
48 }
49 }
50
51 /**
52 * 當娃娃的剩餘里程數小於100時給老王發短信
53 */

54 public void waitKm() {
55 lock.lock();
56 try {
57 while (this.km >= 100) {
58 try {
59 kmCondition.await();//當前線程在kmCondition上進行等待
60 System.out.println("check km thread[" + Thread.currentThread().getName()
61 + "] is be notify");
62 } catch (InterruptedException e) {
63 e.printStackTrace();
64 }
65 }
66 } finally {
67 lock.unlock();
68 }
69
70 System.out.println("娃娃離老王已經不足100公里,我給他發個短信");
71 }
72
73 /**當娃娃到達目的地時給老王打電話*/
74 public void waitSite() {
75 lock.lock();
76 try {
77 while (!this.site.equals(DESTINATION)) {
78 try {
79 siteCondition.await();//當前線程在siteCondition上進行等待
80 System.out.println("check Site thread[" + Thread.currentThread().getName()
81 + "] is be notify");
82 } catch (InterruptedException e) {
83 e.printStackTrace();
84 }
85 }
86 } finally {
87 lock.unlock();
88 }
89 System.out.println("娃娃已經到達目的地,我給他打個電話讓他來取");
90 }
91 }/<code>

測試代碼:

<code>public class TestBuyWawa { 

private static BuyFBBWawa fbbWawa = new BuyFBBWawa(1500, "Guangdong");

/**檢查里程數變化的線程,不滿足條件,線程一直等待*/
private static class CheckKm extends Thread {
@Override
public void run() {
fbbWawa.waitKm();
}
}

/**檢查地點變化的線程,不滿足條件,線程一直等待*/
private static class CheckSite extends Thread {
@Override
public void run() {
fbbWawa.waitSite();
}
}

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new CheckSite().start();
}
for (int i = 0; i < 3; i++) {
new CheckKm().start();
}

Thread.sleep(1000);
fbbWawa.changeKm();//娃娃距離目的地小於100公里
Thread.sleep(2000);
fbbWawa.changeSite();//娃娃到達目的地
}
}/<code>

這段代碼使用ReentrantLock和Condition模擬了老王買的娃娃的運輸過程,從程序輸出可以看到,通過不同的Condition實現了點對點的通知,這是與使用synchronized+wait()/notifyAll()最大的區別,如果對wait()/notifyAll()使用方法不熟悉的同學,歡迎閱讀之前的《java線程間的協作》。使用synchronized+wait()/notifyAll()的時候,不能指定喚醒某類線程,只能喚醒等待在對象上的所有線程,故儘量使用notifyAll()而不是notify(),在使用Lock+Condition的時候,由於可以指定喚醒某類線程,所以儘量使用signal()而不是signalAll()。

java之AQS和顯式鎖

5、 ReentrantReadWriteLock

5.1 ReentrantReadWriteLock介紹

之前提到的synchroniezd和ReentrantLock都是排它鎖,這些鎖在同一時刻只允許一個線程訪問,而讀寫鎖ReentrantReadWriteLock在同一時刻可以允許多個讀線程訪問,但是在寫線程訪問時,所有的讀線程和其他寫線程均被阻塞。讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖,通過分離讀鎖和寫鎖,使得併發性相比一般的排他鎖有了很大提升。除了保證寫操作對讀操作的可見性以及併發性的提升之外,讀寫鎖能夠簡化讀寫交互場景的編程方式。假設在程序中定義一個共享的用作緩存數據結構,它大部分時間提供讀服務(例如查詢和搜索),而寫操作佔有的時間很少,但是寫操作完成之後的更新需要對後續的讀服務可見。如果不使用讀寫鎖,完成上述工作就要使用Java的等待通知機制,就是當寫操作開始時,所有晚於寫操作的讀操作均會進入等待狀態,只有寫操作完成並進行通知之後,所有等待的讀操作才能繼續執行(寫操作之間依靠synchronized關鍵進行同步),這樣做的目的是使讀操作能讀取到正確的數據,不會出現髒讀。改用讀寫鎖實現上述功能,只需要在讀操作時獲取讀鎖,寫操作時獲取寫鎖即可。當寫鎖被獲取到時,後續(非當前寫操作線程)的讀寫操作都會被阻塞,寫鎖釋放之後,所有操作繼續執行,編程方式相對於使用等待通知機制的實現方式而言,變得簡單明瞭。一般情況下,讀寫鎖的性能都會比排它鎖好,因為大多數場景讀是多於寫的。在讀多於寫的情況下,讀寫鎖能夠提供比排它鎖更好的併發性和吞吐量。

5.2 使用ReentrantReadWriteLock

我們來模擬一個讀多寫少的場景,分別使用synchroniezd和 ReentrantReadWriteLock,看看效率的差異。假設某種商品,讀寫比列為1:10,我們寫一段代碼來模擬。

商品類:

<code>public class GoodsInfo {
/**總銷售額*/
private double totalMoney;
/**庫存數*/
private int storeNumber;

public GoodsInfo( int totalMoney, int storeNumber) {
this.totalMoney = totalMoney;
this.storeNumber = storeNumber;
}

public double getTotalMoney() {
return totalMoney;
}

public int getStoreNumber() {
return storeNumber;
}

public void changeNumber(int sellNumber) {
this.totalMoney += sellNumber * 9.9;
this.storeNumber -= sellNumber;
}
}/<code>

商品接口:

<code>public interface GoodsService {
GoodsInfo getNumber();
void setNumber(int number);
}/<code>

使用讀寫鎖來實現商品接口:

<code>import java.util.concurrent.locks.Lock; 

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class UseRwLock implements GoodsService {
private GoodsInfo goodsInfo;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock getLock = lock.readLock();//讀鎖
private final Lock setLock = lock.writeLock();//寫鎖

public UseRwLock(GoodsInfo goodsInfo) {
this.goodsInfo = goodsInfo;
}

@Override
public GoodsInfo getNumber() {
getLock.lock();
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
getLock.unlock();
}
return this.goodsInfo;
}

@Override
public void setNumber(int number) {
setLock.lock();
try {
Thread.sleep(5);
goodsInfo.changeNumber(number);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
setLock.unlock();
}
}
}/<code>

使用synchronized實現商品接口:

<code>public class UseSynchronized implements GoodsService {
private GoodsInfo goodsInfo;

public UseSynchronized(GoodsInfo goodsInfo) {
this.goodsInfo = goodsInfo;
}

@Override
public synchronized GoodsInfo getNumber() {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return this.goodsInfo;
}

@Override
public synchronized void setNumber(int number) {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
goodsInfo.changeNumber(number);
}
}/<code>

測試類,先使用synchronized實現:

<code> 1 import java.util.Random;
2
3 public class GoodsInfoTest {
4 static final int readWriteRatio = 10;//讀寫線程的比例
5 static final int writeThreadCount = 1;//寫線程數量
6
7 /**
8 * 讀線程
9 */
10 private static class GetTask implements Runnable {
11 private GoodsService goodsService;
12
13 public GetTask(GoodsService goodsService) {
14 this.goodsService = goodsService;
15 }
16
17 @Override
18 public void run() {
19 long start = System.currentTimeMillis();
20 for (int i = 0; i < 100; i++) {//每個讀線程操作100次
21 goodsService.getNumber();
22 }
23 System.out.println(Thread.currentThread().getName() + "讀取商品數據耗時:"

24 + (System.currentTimeMillis() - start) + "ms");
25 }
26 }
27
28 /**
29 * 寫線程
30 */
31 private static class SetTask implements Runnable {
32 private GoodsService goodsService;
33
34 public SetTask(GoodsService goodsService) {
35 this.goodsService = goodsService;
36 }
37
38 @Override
39 public void run() {
40 long start = System.currentTimeMillis();
41 Random r = new Random();
42 for (int i = 0; i < 10; i++) {//每個寫線程操作10次
43 goodsService.setNumber(r.nextInt(10));
44 }
45 System.out.println(Thread.currentThread().getName()
46 + "寫商品數據耗時:" + (System.currentTimeMillis() - start) + "ms");
47 }
48 }
49
50 public static void main(String[] args) throws InterruptedException {
51 GoodsInfo goodsInfo = new GoodsInfo(100000, 10000);
52 GoodsService goodsService = new UseSynchronized(goodsInfo);
53
54 for (int i = 0; i < writeThreadCount; i++) { //啟動1個寫線程
55 new Thread(new SetTask(goodsService)).start();
56 for (int j = 0; j < readWriteRatio; j++) { //啟動10個讀線程
57 new Thread(new GetTask(goodsService)).start();
58 }
59 Thread.sleep(10);
60 }
61 }
62 }/<code>

程序輸出:

java之AQS和顯式鎖

再把剛剛測試類修改一下,只需要把第52行修改成讀寫鎖實現,即 GoodsService goodsService = new UseRwLock(goodsInfo);程序輸出:

java之AQS和顯式鎖

對比可以看出,對於讀多寫少的場景,使用讀寫鎖比使用獨佔鎖效率高很多。

6、手寫一個自己的可重入獨佔鎖

鎖的 重進入是指任意線程在獲取到鎖之後能夠再次獲取該鎖而不會被鎖所阻塞,synchronized關鍵字隱式的支持重進入,比如一個synchronized修飾的遞歸方法,在方法執行時,執行線程在獲取了鎖之後仍能連續多次地獲得該鎖,該特性的實現需要解決以下兩個問題:

(1)線程再次獲取鎖。鎖需要去識別獲取鎖的線程是否為當前佔據鎖的線程,如果是,則再次成功獲取。

(2)鎖的最終釋放。線程重複n次獲取了鎖,隨後在第n次釋放該鎖後,其他線程能夠獲取到該鎖。鎖的最終釋放要求鎖對於獲取進行計數自增,計數表示當前鎖被重複獲取的次數,而鎖被釋放時,計數自減,當計數等於0時表示鎖已經成功釋放。

從上面ReentrantLock的公平鎖和非公平鎖加鎖的源碼也可以看出,getState()返回的是一個累計獲取鎖的次數。我們基於以上2點,利用AQS手寫一個簡易版本的可重入獨佔鎖。

實現類:

<code>import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class MyReentrantLock implements Lock {
/***
* 內部類繼承AQS
*/
static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {//鎖被第一次獲取
setExclusiveOwnerThread(Thread.currentThread());//設置當前線程為鎖獨佔線程
return true;
} else if (Thread.currentThread() == getExclusiveOwnerThread()) {//鎖被多次獲取

setState(getState() + 1);//對獲取鎖的次數累加
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setState(getState() - 1);
if (getState() == 0) {
setExclusiveOwnerThread(null);
}
return true;
}

@Override
protected boolean isHeldExclusively() {
return getState() > 0;
}

/**
* 返回一個Condition,每個condition都包含了一個condition隊列
* 這是能夠喚醒指定線程的關鍵
*/
Condition newCondition() {
return new ConditionObject();
}
}

/**僅需要將操作代理到Sync上,調用AQS模板方法*/
private final Sync sync = new Sync();

/***
* 調用AQS的模板方法acquire(int arg)
*/
public void lock() {
System.out.println(Thread.currentThread().getName() + " 準備獲取鎖");
sync.acquire(1);

System.out.println(Thread.currentThread().getName() + " 已經獲取到鎖");
}

public boolean tryLock() {
return sync.tryAcquire(1);
}

/***
* 調用AQS的模板方法release(int arg)
*/
public void unlock() {
System.out.println(Thread.currentThread().getName() + " 準備釋放鎖");
sync.release(1);
System.out.println(Thread.currentThread().getName() + " 已經釋放鎖");
}

public Condition newCondition() {
return sync.newCondition();
}

public boolean isLocked() {
return sync.isHeldExclusively();
}

public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}/<code>

測試類:

<code>import java.util.concurrent.locks.Lock;

public class Test {
static final Lock lock = new MyReentrantLock();

/**
* 遞歸獲取鎖
*

* @param deep 遞歸深度
*/
public static void reenter(int deep) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + ":遞歸深度:" + deep);
int currentDeep = deep - 1;
if (currentDeep == 0) {
return;
} else {
reenter(currentDeep);
}
} finally {
lock.unlock();
}
}

static class WorkerThread extends Thread {
public void run() {
reenter(3);
}
}

public static void main(String[] args) {
// 啟動2個子線程去爭搶鎖
for (int i = 0; i < 2; i++) {
Thread thread = new WorkerThread();
thread.start();
}
}
}/<code>

從程序輸出可以看到,利用AQS,我們自定義的MyReentrantLock實現了可重入獨佔鎖的功能。

java之AQS和顯式鎖

7、結語

本次就分享這麼多內容,希望大家看了有收穫。


分享到:


相關文章: