併發編程技術六之共享鎖CountDownLatch源碼分析

我們上一節學了 ,今天我們看一下共享鎖CountDownLatch的源碼。

CountDownLatch有兩個主要方法

  • countDown()
  • await()

使用這兩個方法很簡單,這裡就不演示了。


併發編程技術六之共享鎖CountDownLatch源碼分析


這個類其實很簡單有兩個主要方法和一個同步器來完成,內部類Syn 繼承AbstractQueuedSynchronizer來完成同步器

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

我們先不分析這裡面的代碼,先了解下這是個同步器,我們先分析類中的兩個主要方法

countDown()方法

public void countDown() {
sync.releaseShared(1);
}

sync是類中定義的內部類private final Sync sync; 而方法是AQS中的釋放共享鎖方法,

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

這裡的tryReleaseShared()這個方法是使用上面的內部類實現

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

使用自旋鎖來判斷是否釋放共享鎖,當state大於0時才可以釋放共享鎖

await()方法

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

這裡也是使用到AQS中的acquireSharedInterruptibly()方法,獲取一個共享鎖。因為需要多個線程同時使用同一把鎖。

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);

}
  1. 判斷當前線程是否中斷,若中斷,則拋異常
  2. 嘗試獲取共享鎖
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

被volatile修飾的state,如果為0,則不會獲取共享鎖(不會阻塞),只有大於0也就是有線程在運行時,則去獲取共享鎖。於是到我們熟悉的代碼

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//上節已經講過,回顧上節代碼
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {//上個節點是head節點
int r = tryAcquireShared(arg);//再次嘗試獲取共享鎖
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);

}
}

r >= 0時說明計數器為0。

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//head節點指向當前節點
//propagate是上個方法的 r ,因為r=1所以進if語句
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;//當前節點的next=null
if (s == null || s.isShared())
doReleaseShared();//釋放共享鎖的過程
}
}

釋放共享鎖的過程

private void doReleaseShared() {

for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;//
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//釋放鎖,喚醒節點的線程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

shouldParkAfterFailedAcquire() 方法也是我們上節講的代碼。

判斷前節點是否需要掛起,當狀態為SIGNAL時可以放心掛起,後面線程會喚醒SIGNAL狀態的線程。

以上是根據個人理解做了分析,如有不正確請留言討論。

----------


分享到:


相關文章: