關於Java併發工具,90%的程序員需要了解的那些技術棧

關於Java併發工具,90%的程序員需要了解的那些技術棧

Java併發工具類主要有CyclicBarrier、CountDownLatch、Semaphore和Exchanger,日常開發中經常使用的是CountDownLatch和Semaphore。下面就簡單分析下這幾個併發工具類:

CyclicBarrier 內存屏障

CyclicBarrier底層藉助於一個count計數器和Lock/Condition實現內存內存屏障功能,在對count--時必須先獲取到lock,如果count不為0,則調用condition.wait進行阻塞操作;直到當count為0時,執行barrierCommand(如果配置的話,執行barrierCommand的線程是剛好將count減到0的那個線程),然後調用condition.signalAll喚醒所有等待的線程。

CyclicBarrier可用於多線程同步、多線程計算最後合併計算結果的場景,比如分片計算最後使用CyclicBarrier統計最後的結果等。

CyclicBarrier使用示例如下:

<code>

public

 

static

 

void

 

main

(

String[] args

) throws Exception

 {     CyclicBarrier barrier = 

new

 CyclicBarrier(

2

,              () -> System.

out

.println(Thread.currentThread().getName() + 

": all is ok"

));     Runnable task = () -> {         

try

 {             System.

out

.println(Thread.currentThread().getName() + 

": start wait"

);             barrier.

await

();             System.

out

.println(Thread.currentThread().getName() + 

": start ok"

);         } 

catch

 (Exception e) {             e.printStackTrace();         }     };     Thread t1 = 

new

 Thread(task, 

"thread1"

);     Thread t2 = 

new

 Thread(task, 

"thread2"

);     t2.start();     t1.start();     t1.

join

();     t2.

join

(); }/<code>

CountDownLatch 計數器

CountDownLatch允許一個或多個線程等待其他線程完成操作。CountDownLatch底層藉助於AQS來實現功能,初始化一個CountDownLatch(n)時,相當於創建了一個state為n的AQS,當調用countDown()時會對AQS進行減一操作,如果state為0,則會對阻塞隊列中所有線程進行喚醒操作。

CountDownLatch計數器必須大於等於0,等於0的時候調用await方法時不會阻塞當前線程,注意CountDownLatch不可能重新初始化或者修改CountDownLatch對象的內部計數的值。一個線程調用coundDown方法happen-before,另一個線程調用await方法。

<code>

public

 

static

 

void

 

main

(

String[] args

) throws Exception

 {     CountDownLatch downLatch = 

new

 CountDownLatch(

2

);     Runnable task = () -> {         

try

 {             System.

out

.println(Thread.currentThread().getName() + 

": start countDown"

);             downLatch.countDown();             System.

out

.println(Thread.currentThread().getName() + 

": start ok"

);         } 

catch

 (Exception e) {             e.printStackTrace();         }     };     Thread t1 = 

new

 Thread(task, 

"thread1"

);     Thread t2 = 

new

 Thread(task, 

"thread2"

);     t1.start();     t2.start();     downLatch.

await

();     System.

out

.println(

"main wait ok"

);     t1.

join

();     t2.

join

(); }/<code>

Semaphore信號量

Semaphore用來控制同時訪問特定資源的線程數量,它通過協調各個線程,保證合理的使用公共資源。Semaphore可用作流量控制,特別是公共資源有限的應用場景,比如數據庫連接。

Semaphore底層也是基於AQS,初始化Semaphore(n)相當於初始化一個state為n的AQS,調用acquire()時會對進行state - 1操作,如果結果大於0則CAS設置state為state-1,相當於獲取到了信號量,否則進行阻塞操作(調用tryAcquire則不會阻塞線程)。調用release會對state進行++操作。

<code>

public

 

static

 

void

 

main

(

String[] args

)

 {     Semaphore semaphore = 

new

 Semaphore(

2

);     ExecutorService executor = Executors.newFixedThreadPool(

10

);     Runnable task = () -> {         

try

 {             System.

out

.println(Thread.currentThread().getName() + 

" acquire before"

);             semaphore.acquire();             System.

out

.println(Thread.currentThread().getName() + 

" acquire ok"

);             semaphore.release();         } 

catch

 (InterruptedException e) {             e.printStackTrace();         }     };     executor.execute(task);     executor.execute(task);     executor.execute(task);     executor.execute(task); }/<code>

Exchanger 線程間交換數據

Exchanger是一個用戶線程間交換數據的工具類,它提供了一個同步點,在這個同步點上,兩個線程可以交換彼此的數據。這兩個線程通過exchange方法交換數據,如果第一個線程先執行exchange方法,他會一直等待第二個線程也執行exchange方法,當兩個線程都達到同步點時,這兩個線程交換數據,將本線程產生的數據傳遞給對方。

<code>

public

 

static

 

void

 main(

String

[] args) {     Exchanger<

String

> exchanger = 

new

 Exchanger<>();     Runnable task = () -> {         

try

 {             

String

 result = exchanger.exchange(Thread.currentThread().getName());             System.out.println(Thread.currentThread().getName() + 

": "

 + result);         } 

catch

 (InterruptedException e) {             e.printStackTrace();         }     };     ExecutorService executor = Executors.newFixedThreadPool(

2

);     executor.execute(task);     executor.execute(task); }/<code>

Exchanger實現分析

Exchanger算法的核心是通過一個可交換數據的slot,以及一個可以帶有數據item的參與者,slot是Node類型,Node定義如下:

<code> .misc.Contended 

static

 

final

 

class

 

Node

 

{     

int

 index;                   

int

 bound;                   

int

 collides;                

int

 hash;                    Object item;                 

volatile

 Object match;       

volatile

 Thread parked;  }

static

 

final

 

class

 

Participant

 

extends

 

ThreadLocal

<

Node

{     

public

 Node 

initialValue

()

 

return

 

new

 Node(); } }/<code>

每一個參與者都帶有一個Participant,當調用exchange時,如果slot為空,則將自己攜帶的數據CAS設置到slot上,然後park自己;如果slot不為空,則表示已經有線程在slot裡設置了數據,則讀取Node.item字段,並將自己攜帶的數據設置到Node.match字段,然後喚醒之前設置數據的線程(之前阻塞的線程在喚醒後讀取Node.match字段返回),然後返回數據即可。

小結

瞭解了這些Java併發工具類,小夥伴們在日常開發中,都用到哪種或者哪幾種呢?

歡迎分享給其他小夥伴進行交流,或者評論區留言一起討論哈 : )


分享到:


相關文章: