09.14 Java併發編程-ReentrantReadWriteLock

基於AQS的前世今生,來學習併發工具類ReentrantReadWriteLock。本文將從ReentrantReadWriteLock的產生背景、源碼原理解析和應用來學習這個併發工具類。

1、 產生背景

前面我們學習的重入鎖ReentrantLock本質上還是互斥鎖,每次最多隻能有一個線程持有ReentrantLock。對於維護數據完整性來說,互斥通常是一種過於強硬的規則,因此也就不必要的限制了併發性。互斥是一種保守的加鎖策略,雖然可以避免“寫/寫”衝突和“寫/讀”衝突,但也同樣避免了“讀/讀”衝突。和互聯網的“二八法則”一樣,大部分數據都是讀數據,可以存放在緩存中,數據結構的操作其實很多也是讀操作,可以考慮適當的放寬加鎖需求,允許多個讀操作線程同時訪問數據結構以提升程序的性能。在這樣的需求背景下,就產生了讀寫鎖ReadWriteLock,一個資源可以同時被多個讀操作訪問,或者被一個寫操作訪問,但是不能讀寫操作同時訪問。ReadWriteLock定義了接口規範,實際實現讀寫鎖控制的類是ReentrantReadWriteLock,該類為讀寫鎖提供了可重入的加鎖語義。

2、 源碼原理解析

2.1 讀寫鎖原理

既然是讀寫鎖,那就是有兩把鎖,可以用AQS的同步狀態表示其中的一把鎖,再引入一個新的屬性表示另外一把鎖,但是這麼做就變成了二元併發安全問題,使問題變得更加複雜。ReentrantReadWriteLock選擇了用一個屬性,即AQS的同步狀態來表示讀寫鎖,怎樣用一個屬性來表示讀寫鎖呢?那就是位運算,對位運算不熟悉的可以先看下此文。

ReentantReadWriteLock採用“按位切割”的方式,就是將這個32位的int型state變量分為高16位和低16位來使用,高16位代表讀狀態,低16位代表寫狀態讀鎖是可以共享的,而寫鎖是互斥的,對於寫鎖而言,用低16位表示線程的重入次數,但是讀鎖因為可以同時有多個線程,所以重入次數需要通過其他的方式來記錄,那就是ThreadLocal變量。從這也可以總結出來和ReentrantLock相比,寫鎖的重入次數會減少,最多不能超過65535次。讀鎖的線程數也有限制,最對不能超過65535個。

假設狀態變量是c,則讀狀態就是c>>>16(無符號右移16位),其實就是通過無符號右移運算抹掉低的16位,剩下的就是c的高16位。寫狀態是c&((1 << 16) - 1),其實就是c&00000000000000001111111111111111,與運算之後,高的16位被抹掉,剩下的就是c的低16位。如果讀線程申請讀鎖,當前寫鎖重入次數不為 0 時,則等待,否則可以馬上分配;如果是寫線程申請寫鎖,當前狀態為 0 則可以馬上分配,否則等待。

2.2 讀鎖的獲取和釋放

讀鎖的獲取方法如下:

Java併發編程-ReentrantReadWriteLock

protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();//當前線程
int c = getState();
//持有寫鎖的線程可以獲取讀鎖
if (exclusiveCount(c) != 0 && //已經分配了寫鎖
getExclusiveOwnerThread() != current) //當前線程不是持有寫鎖的線程
return -1;
int r = sharedCount(c); //讀鎖獲取次數
if (!readerShouldBlock() && //由子類根據公平策略實現決定是否可獲取讀鎖
r < MAX_COUNT && //讀鎖獲取次數小於最大值
compareAndSetState(c, c + SHARED_UNIT)) {//更新讀鎖狀態
if (r == 0) {//讀鎖的第一個線程 此時可以不用記錄到ThreadLocal
firstReader = current;
firstReaderHoldCount = 1; //避免查找ThreadLocal 提升效率
} else if (firstReader == current) {//讀鎖的第一個線程重入
firstReaderHoldCount++;
} else {//非讀鎖的第一個線程
HoldCounter rh = cachedHoldCounter; //下面為重入次數更新
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current); //獲取讀鎖失敗 循環重試
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {//獲取到寫鎖
if (getExclusiveOwnerThread() != current)

return -1; //非寫鎖線程獲取失敗
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT) //讀鎖數量達到最大
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {//讀鎖獲取成功 處理方式和之前類似
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
Java併發編程-ReentrantReadWriteLock

讀鎖的釋放方法如下:

Java併發編程-ReentrantReadWriteLock

protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {//當前線程是讀鎖的第一個線程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) //第一次佔有讀鎖 直接清除該線程
firstReader = null;
else
firstReaderHoldCount--;//讀鎖的第一個線程重入次數減少
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();

int count = rh.count;
if (count <= 1) {
readHolds.remove();//讀鎖釋放
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count; //重入次數減少
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
//減少讀鎖的線程數量
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
Java併發編程-ReentrantReadWriteLock

2.3 寫鎖的獲取和釋放

寫鎖的獲取方法如下:

Java併發編程-ReentrantReadWriteLock

protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);//寫鎖狀態
if (c != 0) {//表示鎖已經被分配出去了 if c != 0 and w == 0表示獲取讀鎖
// (Note: if c != 0 and w == 0 then shared count != 0)
//其他線程獲取到了寫鎖
if (w == 0 || current != getExclusiveOwnerThread())

return false;
//寫鎖重入次數超過最大值
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire 更新寫鎖重入次數
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||//子類實現寫鎖是否公平獲取
!compareAndSetState(c, c + acquires))
return false;//cas獲取寫鎖失敗
setExclusiveOwnerThread(current);//獲取寫鎖成功 獨佔
return true;
}
Java併發編程-ReentrantReadWriteLock

寫鎖的釋放方法如下:

Java併發編程-ReentrantReadWriteLock

protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())//當前線程不持有寫鎖
throw new IllegalMonitorStateException();
int nextc = getState() - releases; //重入次數減少
boolean free = exclusiveCount(nextc) == 0; //減少到0寫鎖釋放
if (free)
setExclusiveOwnerThread(null); //寫鎖釋放
setState(nextc);
return free;
}
Java併發編程-ReentrantReadWriteLock

2.4 鎖降級

鎖降級指的是寫鎖降級為讀鎖,首先持有當前寫鎖,然後獲取到讀鎖,在tryAcquireShared方法中已經體現了該過程,隨後再釋放該寫鎖的過程。鎖降級主要是為了保持數據的可見性,如果當前線程不獲取讀鎖而是直接釋放寫鎖,假設此時有另外的線程獲取到了寫鎖並修改了數據,那麼當前線程是無法知曉數據已經更新了。如果當前線程遵循鎖降級的過程,則其他線程會被阻塞,直到當前線程操作完成其他線程才可以獲取寫鎖進行數據更新。RentrantReadWriteLock不支持鎖升級(把持讀鎖、獲取寫鎖,最後釋放讀鎖的過程)。目的也是保證數據可見性,如果讀鎖已被多個線程獲取,其中任意線程成功獲取了寫鎖並更新了數據,則其更新對其他獲取到讀鎖的線程是不可見的。

3、 應用

概況性的總結RentrantReadWriteLock的應用,就是ReentrantLock能使用的地方,RentrantReadWriteLock都能使用,而且能提供更好的吞吐率。


分享到:


相關文章: