併發包
java.util.concurrent從jdk1.5開始新加入的一個包,致力於解決併發編程的線程安全問題,使用戶能夠更為快捷方便的編寫多線程情況下的併發程序。
同步容器
同步容器只有包括Vector和HashTable,相比其他容器類只是多用了Synchronize的技術
Vector與ArrayList區別
1.ArrayList是最常用的List實現類,內部是通過數組實現的,它允許對元素進行快速隨機訪問。數組的缺點是每個元素之間不能有間隔,當數組大小不滿足時需要增加存儲能力,就要講已經有數組的數據複製到新的存儲空間中。當從ArrayList的中間位置插入或者刪除元素時,需要對數組進行復制、移動、代價比較高。因此,它適合隨機查找和遍歷,不適合插入和刪除。
2.Vector與ArrayList一樣,也是通過數組實現的,不同的是它支持線程的同步,即某一時刻只有一個線程能夠寫Vector,避免多線程同時寫而引起的不一致性,但實現同步需要很高的花費,因此,訪問它比訪問ArrayList慢
注意: Vector線程安全、ArrayList線程不安全
HasTable與HasMap區別
1.HashMap不是線程安全的
HastMap是一個接口 是map接口的子接口,是將鍵映射到值的對象,其中鍵和值都是對象,並且不能包含重複鍵,但可以包含重複值。HashMap允許null key和null value,而hashtable不允許。
2.HashTable是線程安全的一個Collection。
3.HashMap是Hashtable的輕量級實現(非線程安全的實現),他們都完成了Map接口,主要區別在於HashMap允許空(null)鍵值(key),由於非線程安全,效率上可能高於Hashtable。
HashMap允許將null作為一個entry的key或者value,而Hashtable不允許。
HashMap把Hashtable的contains方法去掉了,改成containsvalue和containsKey。
注意: HashTable線程安全,HashMap線程不安全。
synchronizedMap synchronizedList
Collections.synchronized*(m) 將線程不安全集合變為線程安全集合
Map m = Collections.synchronizedMap(new HashMap());
List l = Collections.synchronizedList(new ArrayList());
ConcurrentHashMap
ConcurrentHashMap內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個小的HashTable,它們有自己的鎖。只要多個修改操作發生在不同的段上,它們就可以併發進行。把一個整體分成了16個段(Segment)也就是最高支持16個線程的併發修改操作。
這也是在重線程場景時減小鎖的粒度從而降低鎖競爭的一種方案。並且代碼中大多共享變量使用volatile關鍵字聲明,目的是第一時間獲取修改的內容,性能非常好。
併發容器
CountDownLatch
CountDownLatch是JAVA提供在java.util.concurrent包下的一個輔助類,可以把它看成是一個計數器,其內部維護著一個count計數,只不過對這個計數器的操作都是原子操作,同時只能有一個線程去操作這個計數器,CountDownLatch通過構造函數傳入一個初始計數值,調用者可以通過調用CounDownLatch對象的cutDown()方法,來使計數減1;如果調用對象上的await()方法,那麼調用者就會一直阻塞在這裡,直到別人通過cutDown方法,將計數減到0,才可以繼續執行。
public class Test002 {
public static void main(String[] args) throws InterruptedException {
System.out.println("等待子線程執行完畢...");
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("子線程," +Thread.currentThread().getName() + "開始執行...");
countDownLatch.countDown();// 每次減去1
System.out.println("子線程," + Thread.currentThread().getName() + "結束執行...");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("子線程," + Thread.currentThread().getName() + "開始執行...");
countDownLatch.countDown();
System.out.println("子線程," + Thread.currentThread().getName() + "結束執行...");
}
}).start();
countDownLatch.await();// 調用當前方法主線程阻塞 countDown結果為0, 阻塞變為運行狀態
System.out.println("兩個子線程執行完畢....");
System.out.println("繼續主線程執行..");
}
}
CyclicBarrier
一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程後可以重用,所以稱它為循環 的 barrier。
使用場景
需要所有的子任務都完成時,才執行主任務,這個時候就可以選擇使用CyclicBarrier。
public class CyclicBarrierTest {
public static void main(String[] args) throws IOException, InterruptedException {
//如果將參數改為4,但是下面只加入了3個選手,這永遠等待下去
//Waits until all parties have invoked await on this barrier.
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Thread(new Runner(barrier, "1號選手")));
executor.submit(new Thread(new Runner(barrier, "2號選手")));
executor.submit(new Thread(new Runner(barrier, "3號選手")));
executor.shutdown();
}
}
class Runner implements Runnable {
// 一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
super();
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(8));
System.out.println(name + " 準備好了...");
// barrier的await方法,在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " 起跑!");
}
}
Semaphore
Semaphore是計數信號量。Semaphore管理一系列許可證。每個acquire方法阻塞,直到有一個許可證可以獲得然後拿走一個許可證;每個release方法增加一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實並沒有實際的許可證這個對象,Semaphore只是維持了一個可獲得許可證的數量。
Semaphore經常用於限制獲取某種資源的線程數量
需求: 一個廁所只有3個坑位,但是有10個人來上廁所,那怎麼辦?假設10個人的編號分別為1-10,並且1號先到廁所,10號最後到廁所。那麼1-3號來的時候必然有可用坑位,順利如廁,4號來的時候需要看看前面3人是否有人出來了,如果有人出來,進去,否則等待。同樣的道理,4-10號也需要等待正在上廁所的人出來後才能進去,並且誰先進去這得看等待的人是否有素質,是否能遵守先來先上的規則。
class Parent implements Runnable {
private String name;
private Semaphore wc;
public Parent(String name,Semaphore wc){
this.name=name;
this.wc=wc;
}
@Override
public void run() {
try {
// 剩下的資源(剩下的茅坑)
int availablePermits = wc.availablePermits();
if (availablePermits > 0) {
System.out.println(name+"天助我也,終於有茅坑了...");
} else {
System.out.println(name+"怎麼沒有茅坑了...");
}
//申請茅坑 如果資源達到3次,就等待
wc.acquire();
System.out.println(name+"終於輪我上廁所了..爽啊");
Thread.sleep(new Random().nextInt(1000)); // 模擬上廁所時間。
System.out.println(name+"廁所上完了...");
wc.release();
} catch (Exception e) {
}
}
}
public class TestSemaphore02 {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <=10; i++) {
Parent parent = new Parent("第"+i+"個人,",semaphore);
new Thread(parent).start();
}
}
}
併發隊列
ConcurrentLinkedQueue
ConcurrentLinkedQueue : 是一個適用於高併發場景下的隊列,通過無鎖的方式,實現了高併發狀態下的高性能,通常ConcurrentLinkedQueue性能好於BlockingQueue.它是一個基於鏈接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最先加入的,尾是最近加入的,該隊列不允許null元素。
add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中這倆個方法沒有任何區別) poll() 和peek() 都是取頭元素節點,區別在於前者會刪除元素,後者不會。
public class ConcurrentLinkedQueueTest {
private static ConcurrentLinkedQueue
private static int count = 2; // 線程個數
//CountDownLatch,一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。
private static CountDownLatch latch = new CountDownLatch(count);
public static void main(String[] args) throws InterruptedException {
long timeStart = System.currentTimeMillis();
ExecutorService es = Executors.newFixedThreadPool(4);
ConcurrentLinkedQueueTest.offer();
for (int i = 0; i < count; i++) {
es.submit(new Poll());
}
latch.await(); //使得主線程(main)阻塞直到latch.countDown()為零才繼續執行
System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
es.shutdown();
}
/**
* 生產
*/
public static void offer() {
for (int i = 0; i < 100000; i++) {
queue.offer(i);
}
}
static class Poll implements Runnable {
public void run() {
// while (queue.size()>0) {
while (!queue.isEmpty()) {
System.out.println(queue.poll());
}
latch.countDown();
}
}
}
BlockingQueue
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:
在隊列為空時,獲取元素的線程會等待隊列變為非空。
當隊列滿時,存儲元素的線程會等待隊列可用。
阻塞隊列常用於生產者和消費者的場景,生產者是往隊列裡添加元素的線程,消費者是從隊列裡拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裡拿元素
ArrayBlockingQueue
ArrayBlockingQueue是一個有邊界的阻塞隊列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,我們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。
ArrayBlockingQueue是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。
LinkedBlockingQueue
LinkedBlockingQueue阻塞隊列大小的配置是可選的,如果我們初始化時指定一個大小,它就是有邊界的,如果不指定,它就是無邊界的。說是無邊界,其實是採用了默認大小為Integer.MAX_VALUE的容量 。它的內部實現是一個鏈表
SynchronousQueue
SynchronousQueue隊列內部僅允許容納一個元素。當一個線程插入一個元素後會被阻塞,除非這個元素被另一個線程消費
public class BlockingQueueTest2 {
/**
*
* 定義裝蘋果的籃子
*
*/
public class Basket {
// 籃子,能夠容納3個蘋果
BlockingQueue
basket = new LinkedBlockingQueue (3); // 生產蘋果,放入籃子
public void produce() throws InterruptedException {
// put方法放入一個蘋果,若basket滿了,等到basket有位置
basket.put("An apple");
}
// 消費蘋果,從籃子中取走
public String consume() throws InterruptedException {
// take方法取出一個蘋果,若basket為空,等到basket有蘋果為止(獲取並移除此隊列的頭部)
return basket.take();
}
}
// 定義蘋果生產者
class Producer implements Runnable {
private String instance;
private Basket basket;
public Producer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 生產蘋果
System.out.println("生產者準備生產蘋果:" + instance);
basket.produce();
System.out.println("!生產者生產蘋果完畢:" + instance);
// 休眠300ms
Thread.sleep(300);
}
} catch (InterruptedException ex) {
System.out.println("Producer Interrupted");
}
}
}
// 定義蘋果消費者
class Consumer implements Runnable {
private String instance;
private Basket basket;
public Consumer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 消費蘋果
System.out.println("消費者準備消費蘋果:" + instance);
System.out.println(basket.consume());
System.out.println("!消費者消費蘋果完畢:" + instance);
// 休眠1000ms
Thread.sleep(1000);
}
} catch (InterruptedException ex) {
System.out.println("Consumer Interrupted");
}
}
}
public static void main(String[] args) {
BlockingQueueTest2 test = new BlockingQueueTest2();
// 建立一個裝蘋果的籃子
Basket basket = test.new Basket();
ExecutorService service = Executors.newCachedThreadPool();
Producer producer = test.new Producer("生產者001", basket);
Producer producer2 = test.new Producer("生產者002", basket);
Consumer consumer = test.new Consumer("消費者001", basket);
service.submit(producer);
service.submit(producer2);
service.submit(consumer);
// 程序運行5s後,所有任務停止
// try {
// Thread.sleep(1000 * 5);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// service.shutdownNow();
}
}
PriorityBlockingQueue
優先級阻塞隊列,該實現類需要自己實現一個繼承了 Comparator 接口的類, 在插入資源時會按照自定義的排序規則來對資源數組進行排序。 其中值大的排在數組後面 ,取值時從數組頭開始取
public class TestQueue{
static Logger logger = LogManager.getLogger();
static Random random = new Random(47);
public static void main(String args[]) throws InterruptedException
{
PriorityBlockingQueue
queue = new PriorityBlockingQueue (); ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable()
{
public void run()
{
int i = 0;
while (true)
{
queue.put(new PriorityEntity(random.nextInt(10), i++));
try
{
TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
}
catch (InterruptedException e)
{
logger.error(e);
}
}
}
});
executor.execute(new Runnable()
{
public void run()
{
while (true)
{
try
{
System.out.println("take-- " + queue.take() + " left:-- [" + queue.toString() + "]");
try
{
TimeUnit.MILLISECONDS.sleep(random.nextInt(3000));
}
catch (InterruptedException e)
{
logger.error(e);
}
}
catch (InterruptedException e)
{
logger.error(e);
}
}
}
});
try
{
TimeUnit.SECONDS.sleep(5);
}
catch (InterruptedException e)
{
logger.error(e);
}
}
static class PriorityEntity implements Comparable
{
private static int count = 0;
private int id = count++;
private int priority;
private int index = 0;
public PriorityEntity(int _priority, int _index)
{
System.out.println("_priority : " + _priority);
this.priority = _priority;
this.index = _index;
}
public String toString()
{
return id + "# [index=" + index + " priority=" + priority + "]";
}
//數字小,優先級高
public int compareTo(PriorityEntity o)
{
return this.priority > o.priority ? 1 : this.priority < o.priority ? -1 : 0;
}
}
}
線程池
開發過程中,合理地使用線程池可以帶來3個好處:
降低資源消耗:通過重複利用已創建的線程降低線程創建和銷燬造成的消耗。
提高響應速度:當任務到達時,任務可以不需要等到線程創建就能立即執行。
提高線程的可管理性:線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控。
線程池作用
線程池作用就是限制系統中執行線程的數量。
根據系統的環境情況,可以自動或手動設置線程數量,達到運行的最佳效果;少了浪費了系統資源,多了造成系統擁擠效率不高。用線程池控制線程數量,其他線程排隊等候。一個任務執行完畢,再從隊列的中取最前面的任務開始執行。若隊列中沒有等待進程,線程池的這一資源處於等待。當一個新任務需要運行時,如果線程池中有等待的工作線程,就可以開始運行了;否則進入等待隊列。
線程池的分類
newCachedThreadPool
創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閒線程,若無可回收,則新建線程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue
()); }
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
public void run() {
System.out.println(index);
}
});
}
}
}
線程池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的線程,而不用每次新建線程
newFixedThreadPool
創建一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
()); }
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
因為線程池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒打印3個數字。定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()
newScheduledThreadPool
創建一個定長線程池,支持定時及週期性任務執行。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
}
}
表示延遲3秒執行
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
}
}
表示延遲1秒後每3秒執行一次
newSingleThreadExecutor
創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
())); }
阿里發佈的 Java開發手冊中強制線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue
workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize - 線程池核心池的大小。
maximumPoolSize - 線程池的最大線程數。
keepAliveTime - 當線程數大於核心時,此為終止前多餘的空閒線程等待新任務的最長時間。
unit - keepAliveTime 的時間單位。
workQueue - 用來儲存等待執行任務的隊列。
threadFactory - 線程工廠。
handler - 拒絕策略。
線程優先級:
corePoolSize > workQueue > maximumPoolSize>handler(拒絕)
拒絕策略:
- CallerRunsPolicy :這個策略重試添加當前的任務,他會自動重複調用 execute() 方法,直到成功。
- AbortPolicy :對拒絕任務拋棄處理,並且拋出異常。(默認使用的)
- DiscardPolicy :對拒絕任務直接無聲拋棄,沒有異常信息。
- DiscardOldestPolicy :對拒絕任務不拋棄,而是拋棄隊列裡面等待最久的一個線程,然後把拒絕任務加到隊列。
合理配置線程池
要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:
- 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
- 任務的優先級:高,中和低。
- 任務的執行時間:長,中和短。
- 任務的依賴性:是否依賴其他系統資源,如數據庫連接。
- 任務性質不同的任務可以用不同規模的線程池分開處理。CPU密集型任務配置儘可能少的線程數量,如配置Ncpu+1個線程的線程池。IO密集型任務則由於需要等待IO操作,線程並不是一直在執行任務,則配置儘可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。
- 優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先得到執行,需要注意的是如果一直有優先級高的任務提交到隊列裡,那麼優先級低的任務可能永遠不能執行。
- 執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。
- 依賴數據庫連接池的任務,因為線程提交SQL後需要等待數據庫返回結果,如果等待的時間越長CPU空閒時間就越長,那麼線程數應該設置越大,這樣才能更好的利用CPU。
CPU密集型時,任務可以少配置線程數,大概和機器的cpu核數相當,這樣可以使得每個線程都在執行任務 IO密集型時,大部分線程都阻塞,故需要多配置線程數,2*cpu核數 操作系統之名稱解釋: 某些進程花費了絕大多數時間在計算上,而其他則在等待I/O上花費了大多是時間,前者稱為計算密集型(CPU密集型)computer-bound,後者稱為I/O密集型,I/O-bound。
寫在最後:歡迎留言討論,私信“Java”或“架構資料”有驚喜!加關注,持續更新!!!
閱讀更多 Java互聯網架構師 的文章