Java併發工具類主要有CyclicBarrier、CountDownLatch、Semaphore和Exchanger,日常開發中經常使用的是CountDownLatch和Semaphore。下面就簡單分析下這幾個併發工具類:
CyclicBarrier 內存屏障
CyclicBarrier底層藉助於一個count計數器和Lock/Condition實現內存內存屏障功能,在對count--時必須先獲取到lock,如果count不為0,則調用condition.wait進行阻塞操作;直到當count為0時,執行barrierCommand(如果配置的話,執行barrierCommand的線程是剛好將count減到0的那個線程),然後調用condition.signalAll喚醒所有等待的線程。
CyclicBarrier可用於多線程同步、多線程計算最後合併計算結果的場景,比如分片計算最後使用CyclicBarrier統計最後的結果等。
CyclicBarrier使用示例如下:
<code>public
static
void
main
(String[] args
) throws Exception { CyclicBarrier barrier =new
CyclicBarrier(2
, () -> System.out
.println(Thread.currentThread().getName() +": all is ok"
)); Runnable task = () -> {try
{ System.out
.println(Thread.currentThread().getName() +": start wait"
); barrier.await
(); System.out
.println(Thread.currentThread().getName() +": start ok"
); }catch
(Exception e) { e.printStackTrace(); } }; Thread t1 =new
Thread(task,"thread1"
); Thread t2 =new
Thread(task,"thread2"
); t2.start(); t1.start(); t1.join
(); t2.join
(); }/<code>
CountDownLatch 計數器
CountDownLatch允許一個或多個線程等待其他線程完成操作。CountDownLatch底層藉助於AQS來實現功能,初始化一個CountDownLatch(n)時,相當於創建了一個state為n的AQS,當調用countDown()時會對AQS進行減一操作,如果state為0,則會對阻塞隊列中所有線程進行喚醒操作。
CountDownLatch計數器必須大於等於0,等於0的時候調用await方法時不會阻塞當前線程,注意CountDownLatch不可能重新初始化或者修改CountDownLatch對象的內部計數的值。一個線程調用coundDown方法happen-before,另一個線程調用await方法。
<code>public
static
void
main
(String[] args
) throws Exception { CountDownLatch downLatch =new
CountDownLatch(2
); Runnable task = () -> {try
{ System.out
.println(Thread.currentThread().getName() +": start countDown"
); downLatch.countDown(); System.out
.println(Thread.currentThread().getName() +": start ok"
); }catch
(Exception e) { e.printStackTrace(); } }; Thread t1 =new
Thread(task,"thread1"
); Thread t2 =new
Thread(task,"thread2"
); t1.start(); t2.start(); downLatch.await
(); System.out
.println("main wait ok"
); t1.join
(); t2.join
(); }/<code>
Semaphore信號量
Semaphore用來控制同時訪問特定資源的線程數量,它通過協調各個線程,保證合理的使用公共資源。Semaphore可用作流量控制,特別是公共資源有限的應用場景,比如數據庫連接。
Semaphore底層也是基於AQS,初始化Semaphore(n)相當於初始化一個state為n的AQS,調用acquire()時會對進行state - 1操作,如果結果大於0則CAS設置state為state-1,相當於獲取到了信號量,否則進行阻塞操作(調用tryAcquire則不會阻塞線程)。調用release會對state進行++操作。
<code>public
static
void
main
(String[] args
) { Semaphore semaphore =new
Semaphore(2
); ExecutorService executor = Executors.newFixedThreadPool(10
); Runnable task = () -> {try
{ System.out
.println(Thread.currentThread().getName() +" acquire before"
); semaphore.acquire(); System.out
.println(Thread.currentThread().getName() +" acquire ok"
); semaphore.release(); }catch
(InterruptedException e) { e.printStackTrace(); } }; executor.execute(task); executor.execute(task); executor.execute(task); executor.execute(task); }/<code>
Exchanger 線程間交換數據
Exchanger是一個用戶線程間交換數據的工具類,它提供了一個同步點,在這個同步點上,兩個線程可以交換彼此的數據。這兩個線程通過exchange方法交換數據,如果第一個線程先執行exchange方法,他會一直等待第二個線程也執行exchange方法,當兩個線程都達到同步點時,這兩個線程交換數據,將本線程產生的數據傳遞給對方。
<code>public
static
void
main(String
[] args) { Exchanger<String
> exchanger =new
Exchanger<>(); Runnable task = () -> {try
{String
result = exchanger.exchange(Thread.currentThread().getName()); System.out.println(Thread.currentThread().getName() +": "
+ result); }catch
(InterruptedException e) { e.printStackTrace(); } }; ExecutorService executor = Executors.newFixedThreadPool(2
); executor.execute(task); executor.execute(task); }/<code>
Exchanger實現分析
Exchanger算法的核心是通過一個可交換數據的slot,以及一個可以帶有數據item的參與者,slot是Node類型,Node定義如下:
<code> .misc.Contendedstatic
final
class
Node
{int
index;int
bound;int
collides;int
hash; Object item;volatile
Object match;volatile
Thread parked; }static
final
class
Participant
extends
ThreadLocal
<Node
> {public
NodeinitialValue
()
{return
new
Node(); } }/<code>
每一個參與者都帶有一個Participant,當調用exchange時,如果slot為空,則將自己攜帶的數據CAS設置到slot上,然後park自己;如果slot不為空,則表示已經有線程在slot裡設置了數據,則讀取Node.item字段,並將自己攜帶的數據設置到Node.match字段,然後喚醒之前設置數據的線程(之前阻塞的線程在喚醒後讀取Node.match字段返回),然後返回數據即可。
小結
瞭解了這些Java併發工具類,小夥伴們在日常開發中,都用到哪種或者哪幾種呢?
歡迎分享給其他小夥伴進行交流,或者評論區留言一起討論哈 : )
關鍵字: CountDownLatch println new