阿里P7告訴你什麼是java並發包、線程池、鎖

併發包

java.util.concurrent從jdk1.5開始新加入的一個包,致力於解決併發編程的線程安全問題,使用戶能夠更為快捷方便的編寫多線程情況下的併發程序。

同步容器

同步容器只有包括Vector和HashTable,相比其他容器類只是多用了Synchronize的技術

Vector與ArrayList區別

1.ArrayList是最常用的List實現類,內部是通過數組實現的,它允許對元素進行快速隨機訪問。數組的缺點是每個元素之間不能有間隔,當數組大小不滿足時需要增加存儲能力,就要講已經有數組的數據複製到新的存儲空間中。當從ArrayList的中間位置插入或者刪除元素時,需要對數組進行復制、移動、代價比較高。因此,它適合隨機查找和遍歷,不適合插入和刪除。

2.Vector與ArrayList一樣,也是通過數組實現的,不同的是它支持線程的同步,即某一時刻只有一個線程能夠寫Vector,避免多線程同時寫而引起的不一致性,但實現同步需要很高的花費,因此,訪問它比訪問ArrayList慢

注意: Vector線程安全、ArrayList線程不安全

HasTable與HasMap區別

1.HashMap不是線程安全的

HastMap是一個接口 是map接口的子接口,是將鍵映射到值的對象,其中鍵和值都是對象,並且不能包含重複鍵,但可以包含重複值。HashMap允許null key和null value,而hashtable不允許。

2.HashTable是線程安全的一個Collection。

3.HashMap是Hashtable的輕量級實現(非線程安全的實現),他們都完成了Map接口,主要區別在於HashMap允許空(null)鍵值(key),由於非線程安全,效率上可能高於Hashtable。

HashMap允許將null作為一個entry的key或者value,而Hashtable不允許。

HashMap把Hashtable的contains方法去掉了,改成containsvalue和containsKey。

注意: HashTable線程安全,HashMap線程不安全。

synchronizedMap synchronizedList

Collections.synchronized*(m) 將線程不安全集合變為線程安全集合

Map m = Collections.synchronizedMap(new HashMap());

List l = Collections.synchronizedList(new ArrayList());

ConcurrentHashMap

ConcurrentHashMap內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個小的HashTable,它們有自己的鎖。只要多個修改操作發生在不同的段上,它們就可以併發進行。把一個整體分成了16個段(Segment)也就是最高支持16個線程的併發修改操作。

這也是在重線程場景時減小鎖的粒度從而降低鎖競爭的一種方案。並且代碼中大多共享變量使用volatile關鍵字聲明,目的是第一時間獲取修改的內容,性能非常好。

併發容器

CountDownLatch

CountDownLatch是JAVA提供在java.util.concurrent包下的一個輔助類,可以把它看成是一個計數器,其內部維護著一個count計數,只不過對這個計數器的操作都是原子操作,同時只能有一個線程去操作這個計數器,CountDownLatch通過構造函數傳入一個初始計數值,調用者可以通過調用CounDownLatch對象的cutDown()方法,來使計數減1;如果調用對象上的await()方法,那麼調用者就會一直阻塞在這裡,直到別人通過cutDown方法,將計數減到0,才可以繼續執行。

public class Test002 {

public static void main(String[] args) throws InterruptedException {

System.out.println("等待子線程執行完畢...");

CountDownLatch countDownLatch = new CountDownLatch(2);

new Thread(new Runnable() {

@Override

public void run() {

System.out.println("子線程," +Thread.currentThread().getName() + "開始執行...");

countDownLatch.countDown();// 每次減去1

System.out.println("子線程," + Thread.currentThread().getName() + "結束執行...");

}

}).start();

new Thread(new Runnable() {

@Override

public void run() {

System.out.println("子線程," + Thread.currentThread().getName() + "開始執行...");

countDownLatch.countDown();

System.out.println("子線程," + Thread.currentThread().getName() + "結束執行...");

}

}).start();

countDownLatch.await();// 調用當前方法主線程阻塞 countDown結果為0, 阻塞變為運行狀態

System.out.println("兩個子線程執行完畢....");

System.out.println("繼續主線程執行..");

}

}

阿里P7告訴你什麼是java併發包、線程池、鎖

CyclicBarrier

一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程後可以重用,所以稱它為循環 的 barrier。

使用場景

需要所有的子任務都完成時,才執行主任務,這個時候就可以選擇使用CyclicBarrier。

public class CyclicBarrierTest {

public static void main(String[] args) throws IOException, InterruptedException {

//如果將參數改為4,但是下面只加入了3個選手,這永遠等待下去

//Waits until all parties have invoked await on this barrier.

CyclicBarrier barrier = new CyclicBarrier(3);

ExecutorService executor = Executors.newFixedThreadPool(3);

executor.submit(new Thread(new Runner(barrier, "1號選手")));

executor.submit(new Thread(new Runner(barrier, "2號選手")));

executor.submit(new Thread(new Runner(barrier, "3號選手")));

executor.shutdown();

}

}

class Runner implements Runnable {

// 一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)

private CyclicBarrier barrier;

private String name;

public Runner(CyclicBarrier barrier, String name) {

super();

this.barrier = barrier;

this.name = name;

}

@Override

public void run() {

try {

Thread.sleep(1000 * (new Random()).nextInt(8));

System.out.println(name + " 準備好了...");

// barrier的await方法,在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。

barrier.await();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (BrokenBarrierException e) {

e.printStackTrace();

}

System.out.println(name + " 起跑!");

}

}

阿里P7告訴你什麼是java併發包、線程池、鎖

Semaphore

Semaphore是計數信號量。Semaphore管理一系列許可證。每個acquire方法阻塞,直到有一個許可證可以獲得然後拿走一個許可證;每個release方法增加一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實並沒有實際的許可證這個對象,Semaphore只是維持了一個可獲得許可證的數量。

Semaphore經常用於限制獲取某種資源的線程數量

需求: 一個廁所只有3個坑位,但是有10個人來上廁所,那怎麼辦?假設10個人的編號分別為1-10,並且1號先到廁所,10號最後到廁所。那麼1-3號來的時候必然有可用坑位,順利如廁,4號來的時候需要看看前面3人是否有人出來了,如果有人出來,進去,否則等待。同樣的道理,4-10號也需要等待正在上廁所的人出來後才能進去,並且誰先進去這得看等待的人是否有素質,是否能遵守先來先上的規則。

class Parent implements Runnable {

private String name;

private Semaphore wc;

public Parent(String name,Semaphore wc){

this.name=name;

this.wc=wc;

}

@Override

public void run() {

try {

// 剩下的資源(剩下的茅坑)

int availablePermits = wc.availablePermits();

if (availablePermits > 0) {

System.out.println(name+"天助我也,終於有茅坑了...");

} else {

System.out.println(name+"怎麼沒有茅坑了...");

}

//申請茅坑 如果資源達到3次,就等待

wc.acquire();

System.out.println(name+"終於輪我上廁所了..爽啊");

Thread.sleep(new Random().nextInt(1000)); // 模擬上廁所時間。

System.out.println(name+"廁所上完了...");

wc.release();

} catch (Exception e) {

}

}

}

public class TestSemaphore02 {

public static void main(String[] args) {

Semaphore semaphore = new Semaphore(3);

for (int i = 1; i <=10; i++) {

Parent parent = new Parent("第"+i+"個人,",semaphore);

new Thread(parent).start();

}

}

}

阿里P7告訴你什麼是java併發包、線程池、鎖

併發隊列

ConcurrentLinkedQueue

ConcurrentLinkedQueue : 是一個適用於高併發場景下的隊列,通過無鎖的方式,實現了高併發狀態下的高性能,通常ConcurrentLinkedQueue性能好於BlockingQueue.它是一個基於鏈接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最先加入的,尾是最近加入的,該隊列不允許null元素。

add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中這倆個方法沒有任何區別) poll() 和peek() 都是取頭元素節點,區別在於前者會刪除元素,後者不會。

public class ConcurrentLinkedQueueTest {

private static ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();

private static int count = 2; // 線程個數

//CountDownLatch,一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。

private static CountDownLatch latch = new CountDownLatch(count);

public static void main(String[] args) throws InterruptedException {

long timeStart = System.currentTimeMillis();

ExecutorService es = Executors.newFixedThreadPool(4);

ConcurrentLinkedQueueTest.offer();

for (int i = 0; i < count; i++) {

es.submit(new Poll());

}

latch.await(); //使得主線程(main)阻塞直到latch.countDown()為零才繼續執行

System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");

es.shutdown();

}

/**

* 生產

*/

public static void offer() {

for (int i = 0; i < 100000; i++) {

queue.offer(i);

}

}

static class Poll implements Runnable {

public void run() {

// while (queue.size()>0) {

while (!queue.isEmpty()) {

System.out.println(queue.poll());

}

latch.countDown();

}

}

}

BlockingQueue

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:

在隊列為空時,獲取元素的線程會等待隊列變為非空。

當隊列滿時,存儲元素的線程會等待隊列可用。

阻塞隊列常用於生產者和消費者的場景,生產者是往隊列裡添加元素的線程,消費者是從隊列裡拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裡拿元素

ArrayBlockingQueue

ArrayBlockingQueue是一個有邊界的阻塞隊列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,我們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。

ArrayBlockingQueue是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。

LinkedBlockingQueue

LinkedBlockingQueue阻塞隊列大小的配置是可選的,如果我們初始化時指定一個大小,它就是有邊界的,如果不指定,它就是無邊界的。說是無邊界,其實是採用了默認大小為Integer.MAX_VALUE的容量 。它的內部實現是一個鏈表

SynchronousQueue

SynchronousQueue隊列內部僅允許容納一個元素。當一個線程插入一個元素後會被阻塞,除非這個元素被另一個線程消費

public class BlockingQueueTest2 {

/**

*

* 定義裝蘋果的籃子

*

*/

public class Basket {

// 籃子,能夠容納3個蘋果

BlockingQueue basket = new LinkedBlockingQueue(3);

// 生產蘋果,放入籃子

public void produce() throws InterruptedException {

// put方法放入一個蘋果,若basket滿了,等到basket有位置

basket.put("An apple");

}

// 消費蘋果,從籃子中取走

public String consume() throws InterruptedException {

// take方法取出一個蘋果,若basket為空,等到basket有蘋果為止(獲取並移除此隊列的頭部)

return basket.take();

}

}

// 定義蘋果生產者

class Producer implements Runnable {

private String instance;

private Basket basket;

public Producer(String instance, Basket basket) {

this.instance = instance;

this.basket = basket;

}

public void run() {

try {

while (true) {

// 生產蘋果

System.out.println("生產者準備生產蘋果:" + instance);

basket.produce();

System.out.println("!生產者生產蘋果完畢:" + instance);

// 休眠300ms

Thread.sleep(300);

}

} catch (InterruptedException ex) {

System.out.println("Producer Interrupted");

}

}

}

// 定義蘋果消費者

class Consumer implements Runnable {

private String instance;

private Basket basket;

public Consumer(String instance, Basket basket) {

this.instance = instance;

this.basket = basket;

}

public void run() {

try {

while (true) {

// 消費蘋果

System.out.println("消費者準備消費蘋果:" + instance);

System.out.println(basket.consume());

System.out.println("!消費者消費蘋果完畢:" + instance);

// 休眠1000ms

Thread.sleep(1000);

}

} catch (InterruptedException ex) {

System.out.println("Consumer Interrupted");

}

}

}

public static void main(String[] args) {

BlockingQueueTest2 test = new BlockingQueueTest2();

// 建立一個裝蘋果的籃子

Basket basket = test.new Basket();

ExecutorService service = Executors.newCachedThreadPool();

Producer producer = test.new Producer("生產者001", basket);

Producer producer2 = test.new Producer("生產者002", basket);

Consumer consumer = test.new Consumer("消費者001", basket);

service.submit(producer);

service.submit(producer2);

service.submit(consumer);

// 程序運行5s後,所有任務停止

// try {

// Thread.sleep(1000 * 5);

// } catch (InterruptedException e) {

// e.printStackTrace();

// }

// service.shutdownNow();

}

}

PriorityBlockingQueue

優先級阻塞隊列,該實現類需要自己實現一個繼承了 Comparator 接口的類, 在插入資源時會按照自定義的排序規則來對資源數組進行排序。 其中值大的排在數組後面 ,取值時從數組頭開始取

public class TestQueue{

static Logger logger = LogManager.getLogger();

static Random random = new Random(47);

public static void main(String args[]) throws InterruptedException

{

PriorityBlockingQueue queue = new PriorityBlockingQueue();

ExecutorService executor = Executors.newCachedThreadPool();

executor.execute(new Runnable()

{

public void run()

{

int i = 0;

while (true)

{

queue.put(new PriorityEntity(random.nextInt(10), i++));

try

{

TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));

}

catch (InterruptedException e)

{

logger.error(e);

}

}

}

});

executor.execute(new Runnable()

{

public void run()

{

while (true)

{

try

{

System.out.println("take-- " + queue.take() + " left:-- [" + queue.toString() + "]");

try

{

TimeUnit.MILLISECONDS.sleep(random.nextInt(3000));

}

catch (InterruptedException e)

{

logger.error(e);

}

}

catch (InterruptedException e)

{

logger.error(e);

}

}

}

});

try

{

TimeUnit.SECONDS.sleep(5);

}

catch (InterruptedException e)

{

logger.error(e);

}

}

static class PriorityEntity implements Comparable

{

private static int count = 0;

private int id = count++;

private int priority;

private int index = 0;

public PriorityEntity(int _priority, int _index)

{

System.out.println("_priority : " + _priority);

this.priority = _priority;

this.index = _index;

}

public String toString()

{

return id + "# [index=" + index + " priority=" + priority + "]";

}

//數字小,優先級高

public int compareTo(PriorityEntity o)

{

return this.priority > o.priority ? 1 : this.priority < o.priority ? -1 : 0;

}

}

}

線程池

開發過程中,合理地使用線程池可以帶來3個好處:

降低資源消耗:通過重複利用已創建的線程降低線程創建和銷燬造成的消耗。

提高響應速度:當任務到達時,任務可以不需要等到線程創建就能立即執行。

提高線程的可管理性:線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控。

線程池作用

線程池作用就是限制系統中執行線程的數量。

根據系統的環境情況,可以自動或手動設置線程數量,達到運行的最佳效果;少了浪費了系統資源,多了造成系統擁擠效率不高。用線程池控制線程數量,其他線程排隊等候。一個任務執行完畢,再從隊列的中取最前面的任務開始執行。若隊列中沒有等待進程,線程池的這一資源處於等待。當一個新任務需要運行時,如果線程池中有等待的工作線程,就可以開始運行了;否則進入等待隊列。

線程池的分類

newCachedThreadPool

創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閒線程,若無可回收,則新建線程

public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue());

}

public class ThreadPoolExecutorTest {

public static void main(String[] args) {

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {

final int index = i;

try {

Thread.sleep(index * 1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

cachedThreadPool.execute(new Runnable() {

public void run() {

System.out.println(index);

}

});

}

}

}

線程池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的線程,而不用每次新建線程

newFixedThreadPool

創建一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue());

}

public class ThreadPoolExecutorTest {

public static void main(String[] args) {

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

for (int i = 0; i < 10; i++) {

final int index = i;

fixedThreadPool.execute(new Runnable() {

public void run() {

try {

System.out.println(index);

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});

}

}

}

因為線程池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒打印3個數字。定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()

newScheduledThreadPool

創建一個定長線程池,支持定時及週期性任務執行。

public ScheduledThreadPoolExecutor(int corePoolSize) {

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

new DelayedWorkQueue());

}

public class ThreadPoolExecutorTest {

public static void main(String[] args) {

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

scheduledThreadPool.schedule(new Runnable() {

public void run() {

System.out.println("delay 3 seconds");

}

}, 3, TimeUnit.SECONDS);

}

}

表示延遲3秒執行

public class ThreadPoolExecutorTest {

public static void main(String[] args) {

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

scheduledThreadPool.scheduleAtFixedRate(new Runnable() {

public void run() {

System.out.println("delay 1 seconds, and excute every 3 seconds");

}

}, 1, 3, TimeUnit.SECONDS);

}

}

表示延遲1秒後每3秒執行一次

newSingleThreadExecutor

創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue()));

}

阿里發佈的 Java開發手冊中強制線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler)

corePoolSize - 線程池核心池的大小。

maximumPoolSize - 線程池的最大線程數。

keepAliveTime - 當線程數大於核心時,此為終止前多餘的空閒線程等待新任務的最長時間。

unit - keepAliveTime 的時間單位。

workQueue - 用來儲存等待執行任務的隊列。

threadFactory - 線程工廠。

handler - 拒絕策略。

線程優先級:

corePoolSize > workQueue > maximumPoolSize>handler(拒絕)

拒絕策略:

  1. CallerRunsPolicy :這個策略重試添加當前的任務,他會自動重複調用 execute() 方法,直到成功。
  2. AbortPolicy :對拒絕任務拋棄處理,並且拋出異常。(默認使用的)
  3. DiscardPolicy :對拒絕任務直接無聲拋棄,沒有異常信息。
  4. DiscardOldestPolicy :對拒絕任務不拋棄,而是拋棄隊列裡面等待最久的一個線程,然後把拒絕任務加到隊列。

合理配置線程池

要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:

  • 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
  • 任務的優先級:高,中和低。
  • 任務的執行時間:長,中和短。
  • 任務的依賴性:是否依賴其他系統資源,如數據庫連接。
  • 任務性質不同的任務可以用不同規模的線程池分開處理。CPU密集型任務配置儘可能少的線程數量,如配置Ncpu+1個線程的線程池。IO密集型任務則由於需要等待IO操作,線程並不是一直在執行任務,則配置儘可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。
  • 優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先得到執行,需要注意的是如果一直有優先級高的任務提交到隊列裡,那麼優先級低的任務可能永遠不能執行。
  • 執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。
  • 依賴數據庫連接池的任務,因為線程提交SQL後需要等待數據庫返回結果,如果等待的時間越長CPU空閒時間就越長,那麼線程數應該設置越大,這樣才能更好的利用CPU。

CPU密集型時,任務可以少配置線程數,大概和機器的cpu核數相當,這樣可以使得每個線程都在執行任務 IO密集型時,大部分線程都阻塞,故需要多配置線程數,2*cpu核數 操作系統之名稱解釋: 某些進程花費了絕大多數時間在計算上,而其他則在等待I/O上花費了大多是時間,前者稱為計算密集型(CPU密集型)computer-bound,後者稱為I/O密集型,I/O-bound。

寫在最後:歡迎留言討論,私信“Java”或“架構資料”有驚喜!加關注,持續更新!!!


分享到:


相關文章: