java併發中CountDownLatch的使用

在java併發中,控制共享變量的訪問非常重要,有時候我們也想控制併發線程的執行順序,比如:等待所有線程都執行完畢之後再執行另外的線程,或者等所有線程都準備好了才開始所有線程的執行等。

這個時候我們就可以使用到CountDownLatch。

簡單點講,CountDownLatch存有一個放在QueuedSynchronizer中的計數器。當調用countdown() 方法時,該計數器將會減一。然後再調用await()來等待計數器歸零。

<code>private static final class Sync extends AbstractQueuedSynchronizer {
...
}

private final Sync sync;

public void countDown() {
sync.releaseShared(1);
}
/<code>
<code>    public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

/<code>

下面我們舉兩個使用的例子:

主線程等待子線程全都結束之後再開始運行

這裡我們定義子線程類,在子線程類裡面,我們傳入一個CountDownLatch用來計數,然後在子線程結束之前,調用該CountDownLatch的countDown方法。最後在主線程中調用await()方法來等待子線程結束執行。

<code>@Slf4j 

public class MainThreadWaitUsage implements Runnable {

private List<string> outputScraper;
private CountDownLatch countDownLatch;

public MainThreadWaitUsage(List<string> outputScraper, CountDownLatch countDownLatch) {
this.outputScraper = outputScraper;
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
outputScraper.add("Counted down");
countDownLatch.countDown();
}
}
/<string>/<string>/<code>

看下怎麼調用:

<code>    @Test
public void testCountDownLatch()
throws InterruptedException {

List<string> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<thread> workers = Stream
.generate(() -> new Thread(new MainThreadWaitUsage(outputScraper, countDownLatch)))
.limit(5)
.collect(toList());

workers.forEach(Thread::start);
countDownLatch.await();
outputScraper.add("Latch released");
log.info(outputScraper.toString());

}
/<thread>/<string>/<code>

執行結果如下:

<code>07:37:27.388 [main] INFO MainThreadWaitUsageTest - [Counted down, Counted down, Counted down, Counted down, Counted down, Latch released]
/<code>

等待所有線程都準備好再一起執行

上面的例子中,我們是主線程等待子線程,那麼在這個例子中,我們將會看看怎麼子線程一起等待到準備好的狀態,再一起執行。

思路也很簡單,在子線程開始之後,將等待的子線程計數器減一,在主線程中await該計數器,等計數器歸零之後,主線程再通知子線程運行。

<code>public class ThreadWaitThreadUsage implements Runnable {

private List<string> outputScraper;
private CountDownLatch readyThreadCounter;
private CountDownLatch callingThreadBlocker;
private CountDownLatch completedThreadCounter;

public ThreadWaitThreadUsage(
List<string> outputScraper,
CountDownLatch readyThreadCounter,
CountDownLatch callingThreadBlocker,
CountDownLatch completedThreadCounter) {

this.outputScraper = outputScraper;
this.readyThreadCounter = readyThreadCounter;
this.callingThreadBlocker = callingThreadBlocker;
this.completedThreadCounter = completedThreadCounter;
}

@Override
public void run() {
readyThreadCounter.countDown();
try {
callingThreadBlocker.await();
outputScraper.add("Counted down");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
completedThreadCounter.countDown();
}
}
}
/<string>/<string>/<code>

看下怎麼調用:

<code>    @Test
public void testCountDownLatch()
throws InterruptedException {

List<string> outputScraper = Collections.synchronizedList(new ArrayList<>());

CountDownLatch readyThreadCounter = new CountDownLatch(5);
CountDownLatch callingThreadBlocker = new CountDownLatch(1);
CountDownLatch completedThreadCounter = new CountDownLatch(5);
List<thread> workers = Stream
.generate(() -> new Thread(new ThreadWaitThreadUsage(
outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
.limit(5)
.collect(toList());

workers.forEach(Thread::start);
readyThreadCounter.await();
outputScraper.add("Workers ready");
callingThreadBlocker.countDown();
completedThreadCounter.await();
outputScraper.add("Workers complete");

log.info(outputScraper.toString());

}
/<thread>/<string>/<code>

輸出結果如下:

<code>07:41:47.861 [main] INFO ThreadWaitThreadUsageTest - [Workers ready, Counted down, Counted down, Counted down, Counted down, Counted down, Workers complete]
/<code>

停止CountdownLatch的await

如果我們調用await()方法,該方法將會等待一直到count=0才結束。但是如果在線程執行過程中出現了異常,可能導致countdown方法執行不了。那麼await()方法可能會出現無限等待的情況。

這個時候我們可以使用:

<code>    public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/<code>

更多內容請訪問:flydean的博客 flydean.com


分享到:


相關文章: