我們上一節學了 ,今天我們看一下共享鎖CountDownLatch的源碼。
CountDownLatch有兩個主要方法
- countDown()
- await()
使用這兩個方法很簡單,這裡就不演示了。
這個類其實很簡單有兩個主要方法和一個同步器來完成,內部類Syn 繼承AbstractQueuedSynchronizer來完成同步器
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
我們先不分析這裡面的代碼,先了解下這是個同步器,我們先分析類中的兩個主要方法
countDown()方法
public void countDown() {
sync.releaseShared(1);
}
sync是類中定義的內部類private final Sync sync; 而方法是AQS中的釋放共享鎖方法,
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
這裡的tryReleaseShared()這個方法是使用上面的內部類實現
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
使用自旋鎖來判斷是否釋放共享鎖,當state大於0時才可以釋放共享鎖
await()方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
這裡也是使用到AQS中的acquireSharedInterruptibly()方法,獲取一個共享鎖。因為需要多個線程同時使用同一把鎖。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
- 判斷當前線程是否中斷,若中斷,則拋異常
- 嘗試獲取共享鎖
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
被volatile修飾的state,如果為0,則不會獲取共享鎖(不會阻塞),只有大於0也就是有線程在運行時,則去獲取共享鎖。於是到我們熟悉的代碼
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//上節已經講過,回顧上節代碼
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {//上個節點是head節點
int r = tryAcquireShared(arg);//再次嘗試獲取共享鎖
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
r >= 0時說明計數器為0。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//head節點指向當前節點
//propagate是上個方法的 r ,因為r=1所以進if語句
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;//當前節點的next=null
if (s == null || s.isShared())
doReleaseShared();//釋放共享鎖的過程
}
}
釋放共享鎖的過程
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;//
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//釋放鎖,喚醒節點的線程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
shouldParkAfterFailedAcquire() 方法也是我們上節講的代碼。
判斷前節點是否需要掛起,當狀態為SIGNAL時可以放心掛起,後面線程會喚醒SIGNAL狀態的線程。
以上是根據個人理解做了分析,如有不正確請留言討論。
----------
閱讀更多 零售雲技術內參 的文章