- <strong>
- <strong>
- <strong>
- <strong>
0. 主要内容
- 文章分为两部分:
- 第一个部分主要讲并发流程控制的各大类的使用及案例
- 第二部分主要是先将AQS的组成及原理,然后结合CountDownLatch、Semaphore等分析源码逻辑
ps: 文章内容比较多
1. 并发流程控制
1.1 什么是并发流程控制
- 并发流程控制,就是让线程之间相互配合完成任务,来满足业务逻辑
- 如:让线程A等待线程B完成后再执行等策略
1.2 并发流程控制的工具
2. CountDownLatch计数门闩
2.1 作用
- 并发流程控制的工具,用于等待数量(我们设定的)足够后再执行某些任务
2.2 主要方法
- CountDownLatch(int count):只有一个构造方法,参数count为需要倒数的值
- await():调用此方法的线程会被挂起,它会等到count值为零的时候才继续执行
- countdown():讲count减1,直到0,等待的线程会被唤醒
2.3 用法一:等待线程执行完毕
<code>/** * @author yiren */public class CountDownLatchExample01 { public static void main(String[] args) throws InterruptedException { AtomicInteger integer = new AtomicInteger(1); CountDownLatch latch = new CountDownLatch(5); ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { executorService.execute(() -> { try { System.out.println(Thread.currentThread().getName()+ " produce ...."); TimeUnit.SECONDS.sleep(1); integer.incrementAndGet(); } catch (InterruptedException e) { e.printStackTrace(); }finally { latch.countDown(); } }); } System.out.println(Thread.currentThread().getName() + " waiting...."); latch.await(); System.out.println(Thread.currentThread().getName() + " finished!"); System.out.println(Thread.currentThread().getName() + " num: " + integer.get()); executorService.shutdown(); }}/<code>
<code>pool-1-thread-1 produce ....pool-1-thread-2 produce ....pool-1-thread-3 produce ....main waiting....pool-1-thread-4 produce ....pool-1-thread-5 produce ....main finished!main num: 6Process finished with exit code 0/<code>
2.4 用法二:多等一
<code>/** * @author yiren */public class CountDownLatchExample02 { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { executorService.execute(() -> { System.out.println(Thread.currentThread().getName() + " ready!"); try { latch.await(); System.out.println(Thread.currentThread().getName()+ " produce ...."); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }finally { latch.countDown(); } }); } Thread.sleep(10); System.out.println(Thread.currentThread().getName() + " ready!"); latch.countDown(); System.out.println(Thread.currentThread().getName() + " go!"); executorService.shutdown(); }}/<code>
<code>pool-1-thread-1 ready!pool-1-thread-4 ready!pool-1-thread-3 ready!pool-1-thread-2 ready!pool-1-thread-5 ready!main ready!main go!pool-1-thread-1 produce ....pool-1-thread-2 produce ....pool-1-thread-5 produce ....pool-1-thread-3 produce ....pool-1-thread-4 produce ....Process finished with exit code 0/<code>
2.4 注意
- CountDownLatch不仅可以无限等待,还可以给参数,在指定的事件内如果等到就唤醒线程继续执行
- boolean await(long timeout, TimeUnit unit) 复制代码
- CountDownLatch不能重用,如果涉及重新计数,可以使用CyclicBarrier或者新创建CountDownLatch
3. Semaphore信号量
3.1 信号量作用
- Semaphore可以用来限制或管理数量有限的资源使用情况
- 信号量的租用是维护一个许可计数,线程可以获取许可,然后信号量减一;线程也可以释放许可,信号量就加一;如果信号量的许可颁发完了,其他线程想要获取,就需要等待,直到有另外的线程释放了许可。
3.2 信号量使用
- 初始化Semaphore指定许可数量
- 在需要获取许可的代码前面加上acquire()或者acquireUniterruptibly()方法
- 任务执行完成有调用release()释放许可
3.3 主要方法
- Semaphore(int permits, boolean fair)这里设置许可数量,以及是否使用公平策略。 如果传入true那么久吧等待线程放入到FIFO的队列里面。
- aquire()请求许可,可以响应中断
- aquireUnniterruptibly()请求许可不可中断
- tryAcquire()看看现在有没有空闲的许可,如果有那就返回true;这个方法还可以设置等待时间给一个timeout,让线程等待一段时间。
- release()释放许可
3.4 案例演示
<code>/** * @author yiren */public class SemaphoreExample01 { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3, true); ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 8; i++) { executorService.execute(() -> { try { System.out.println(Thread.currentThread().getName()+" start to get permit"); semaphore.acquire(); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now() +" finished!"); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); } }); } executorService.shutdown(); }}/<code>
<code>pool-1-thread-1 start to get permitpool-1-thread-4 start to get permitpool-1-thread-3 start to get permitpool-1-thread-2 start to get permitpool-1-thread-5 start to get permitpool-1-thread-6 start to get permitpool-1-thread-7 start to get permitpool-1-thread-8 start to get permitpool-1-thread-3 2020-02-21T19:54:47.392 finished!pool-1-thread-1 2020-02-21T19:54:47.392 finished!pool-1-thread-4 2020-02-21T19:54:47.392 finished!pool-1-thread-6 2020-02-21T19:54:49.396 finished!pool-1-thread-2 2020-02-21T19:54:49.396 finished!pool-1-thread-5 2020-02-21T19:54:49.396 finished!pool-1-thread-8 2020-02-21T19:54:51.401 finished!pool-1-thread-7 2020-02-21T19:54:51.401 finished!Process finished with exit code 0/<code>
3.5 注意点
- 获取和释放的许可证必须一致,acquire和release都是可以传入数值的来确定获取和释放的数量。如果我们获取和释放不一致,就会容易导致程序bug。当然也不是绝对,除非有特殊业务需求,否则都获取释放设置为一样的
- 注意在初始化Semaphore的时候设置公平性,一般设置为true会比较合理。如果插队情况比较严重的话,某些线程可能一直阻塞
- 获取和释放许可对线程并不要求,线程A获取了可以线程B释放。
4. Condition接口
4.1 作用
- 当线程A需要等待某个任务或者某个资源,就可以执行condition.await()方法,然后就会陷入阻塞状态。
- 此时另一个线程B,去获取资源或者执行任务完成后,调用condition.signal()或者signalAll()方法,通知线程A,继续执行
- 这个类似于object.wait()、notify()、notifyAll()
- signal()方法如果遇到多个线程都在等待的时候,会去唤醒等待时间最长的那个
- 在我们ReentrantLock中就可以直接新建Condition。看下面案例
4.2 案例演示
- 普通用法
<code>/** * @author yiren */public class ConditionExample01 { private static ReentrantLock lock = new ReentrantLock(); private static Condition condition = lock.newCondition(); public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { task1(); }); Thread thread2 = new Thread(() -> { task2(); }); thread1.start(); Thread.sleep(100); thread2.start(); } private static void task1() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " start await()"); condition.await(); System.out.println(Thread.currentThread().getName() + " await finished!"); Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private static void task2() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " start signal()"); Thread.sleep(1000); condition.signal(); System.out.println(Thread.currentThread().getName() + " signal finished!"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }}/<code>
<code>Thread-0 start await()Thread-1 start signal()Thread-1 signal finished!Thread-0 await finished!Process finished with exit code 0/<code>
- 生产者消费者模式
<code>/** * @author yiren */public class ConditionExample02 { private int queueSize = 10; private PriorityQueue<integer> queue = new PriorityQueue<>(queueSize); private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public static void main(String[] args) { ConditionExample02 conditionDemo2 = new ConditionExample02(); Producer producer = conditionDemo2.new Producer(); Consumer consumer = conditionDemo2.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread { @Override public void run() { consume(); } private void consume() { while (true) { lock.lock(); try { while (queue.size() == 0) { System.out.println("队列空,等待数据"); try { notEmpty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.poll(); notFull.signalAll(); System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素"); } finally { lock.unlock(); } } } } class Producer extends Thread { @Override public void run() { produce(); } private void produce() { while (true) { lock.lock(); try { while (queue.size() == queueSize) { System.out.println("队列满,等待有空余"); try { notFull.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.offer(1); notEmpty.signalAll(); System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size())); } finally { lock.unlock(); } } } }}/<integer>/<code>
- 以上使用两个Condition作为队列满和空的通知传递工具在生产者和消费者之间互通
4.3 注意点
- 我们知道Lock可以看做synchronized的替代方案,而Condition就是用来替代object.wait/notify的,在用法上几乎一致。
- 调用await()方法时必须持有Lock锁,否则会抛出异常,并且await()方法会释放当前持有的Lock锁,
- 一个Lock锁可以有多个Condition更加灵活
5. CyclicBarrier循环栅栏
5.1 作用
- CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程
- 当需要多个线程配合完成任务,并最后需要统一汇总时,我们就可以使用CyclicBarrier,当某个线程完成任务后,它先会等待,等到所有线程都执行好了任务,再一起继续执行剩下的任务 比如:同时出去聚餐约在了公司,等大家到公司了一起走过去。
- 但是注意CyclicBarrier是可以重复使用的,这个和CountDownLatch不同
5.2 案例
<code>/** * @author yiren */public class CyclicBarrierExample { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() { @Override public void run() { System.out.println("所有人都到场了, 大家统一出发!"); } }); for (int i = 0; i < 10; i++) { new Thread(new Task(i, cyclicBarrier)).start(); } } static class Task implements Runnable { private int id; private CyclicBarrier cyclicBarrier; public Task(int id, CyclicBarrier cyclicBarrier) { this.id = id; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程" + id + "现在前往集合地点"); try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("线程" + id + "到了集合地点,开始等待其他人到达"); cyclicBarrier.await(); System.out.println("线程" + id + "出发了"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }}/<code>
<code>线程0现在前往集合地点线程2现在前往集合地点线程3现在前往集合地点线程1现在前往集合地点线程4现在前往集合地点线程5现在前往集合地点线程6现在前往集合地点线程7现在前往集合地点线程8现在前往集合地点线程9现在前往集合地点线程3到了集合地点,开始等待其他人到达线程9到了集合地点,开始等待其他人到达线程8到了集合地点,开始等待其他人到达线程4到了集合地点,开始等待其他人到达线程5到了集合地点,开始等待其他人到达所有人都到场了, 大家统一出发!线程5出发了线程3出发了线程8出发了线程4出发了线程9出发了线程1到了集合地点,开始等待其他人到达线程6到了集合地点,开始等待其他人到达线程0到了集合地点,开始等待其他人到达线程7到了集合地点,开始等待其他人到达线程2到了集合地点,开始等待其他人到达所有人都到场了, 大家统一出发!线程2出发了线程1出发了线程7出发了线程0出发了线程6出发了Process finished with exit code 0/<code>
- 每五个人到了过后,就出发一批
5.3 CountDownLatch和CyclicBarrier`区别
- 作用不同:CountDownLatch使用countDown()是用于事件的,而CyclicBarrier使用await()是用于线程的
- 可重用性不同:CountDownLatch在倒数到0后不能再次重用,除非创建新对象;而CyclicBarrier是可以直接重用的
6. 深入AQS理解J.U.C的根基
6.1 AQS作用及其重要性
- AQS在CountDownLatch等工具内都有使用,全称是:AbstractQueuedSynchronizer是一个抽象类
- 锁和上面的线程并发控制类(Semaphore等)都有类似的地方。 其实他们底层都是使用了AQS作为基类的拓展
- 正因为他们很多工作都类似,JDK就把这部分通用逻辑抽离了出来,提供给他们直接使用,使其不必关注很多深层次的细节,从而完成他们的功能。
- 我们可以大致看一下我们锁用到的这些并发控制的工具类和锁的内部实现 `Semaphore``
<code>
- ReentrantLock
<code>public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L;....../<code>
- CountDownLatch
<code>public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); }....../<code>
- 由上源码我们可以看到,里面都有一个内部类,Sync继承自AbstractQueuedSynchronizer
- 那么AQS是用来干些什么事情的呢? J.U.C基本都是是基于AQS实现的,AQS是一个用于构建锁、同步器、线程协作工具类的框架供给子类使用,主要使用模板模式来设计。 它主要工作就是管理线程的阻塞与唤醒,实现同步的管理,以及阻塞线程的队列管理工作
6.2 AQS的组成及内部原理
- AbstractQueuedSynchronizer自JDK1.5加入,是基于FIFO等待队列实现的一个用于同步器的基础框架。
- JDK1.8 继承AQS实现的类:
- 我们可以看到,在可重入锁,读写锁,计数门闩等,信号量里面都是用了AQS的子类,接下来我们就学习一下AQS的内部原理
- AQS的三大部分 state:状态, FIFO队列:线程竞争锁的管理队列 获取和释放方法:需要工具类去实现的方法
- state:状态
<code> /** * The synchronization state. */ private volatile int state; /<code>
- 它的含义并不具体,根据实现的不同而不同,如:Semaphore内是剩余许可数量、CountDownLatch内是还需要倒数的数量,可看做一个计数器,只是不同类的作用及意义不用
<code>
- 状态值的更新,是使用Unsafe的CAS完成
- 在ReentrantLock中:state表示锁的占用情况,可重入的计数,每重入一次就加一,当要释放锁时,它的值就会变成0,表示不被任何线程占有。
- FIFO队列:
<code>
这个队列是用来存放等待的线程的,AQS会对这个队列进行管理。当多个线程竞争锁时,没有拿到锁的,就会被翻到队列中,当前拿到锁的执行任务的线程结束,AQS就会从队列中选一个线程来占有这个锁。 AQS维护一个双向链表的等待队列,把等待线程都放到这个队列里面管理;队列头节点是当前拿到锁的线程;在AQS中保存了这个队列的头尾节点。
获取和释放的方法 获取方法: 获取操作会依赖state变量,经常会阻塞,如:获取不到锁的时候,获取不到许可的时候等 在ReentrantLock中,就是获取锁。state+1 在Semaphore中就是acquire获取许可,state-1,当state==0就会阻塞 在CountDownLatch中就是await方法,就是等待state==0 释放方法: 释放操作不会阻塞 在ReentrantLock中就是unlock方法调用release(1)对应state-1 在Semaphore中就是realease,也是state-1 CountDownLatch中就是countDown方法,也是state-1 一般情况下,实现类都会实现tryAcquire和tryRelease相关方法,以对应各个类的需求
6.3 AQS的用法
- 指定协作逻辑,实现获取和释放方法
- 在内部写一个Sync类继承AQS
- 根据是否独占来决定重写的方法:独占使用tryAcquire/tryRelease、共享使用tryAcquireShared(int acquires)/tryReleaseShared(int releases),在主逻辑里面的获取释放相关方法中调用Sync的方法
7. AQS在CountDownLatch中的源码剖析
- 下面我们以CountDownLatch为例分析源码:
- 构造函数 我们看到内部实现就是初始化一个Sync然后把计数值传入
<code> public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }/<code>
- 我们可以看下面的CountDownlatch中Sync的实现,在构造方法创建的Sync传入的count调用了setState方法传入了AQS的state中
- 在CountDownLatch内部有一个继承AQS的
<code> private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }/<code>
- CountDownLatch的getCount()方法
<code>
我们可以看到getCount实际也是调用Sync的getCount()来获取state并返回
- CountDownLatch的countDown()方法
<code>
CountDownLatch的countDown()方法
<code> public void countDown() { sync.releaseShared(1); }/<code>
- 我们看一看到它直接调用了AQS的releaseShared(1)
<code> public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }/<code>
- 而releaseShared则是回去调用CountDownLatch中实现的tryReleaseShared
<code> protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }/<code>
- 而在tryReleaseShared中则是主要对state的值做-1操作,如果state大于零可以获取到就减一并且用CAS并发更新值,如果最新值为0就返回true
- 返回true过后就doReleaseShared释放锁,唤醒队列里面的等待线程。也就是调用了await()方法的线程
- CountDownLatch的await()方法
<code>
而await则会调用AQS中的默认实现sync.acquireSharedInterruptibly(1);
<code> protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }/<code>
<code> protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }/<code>
- 而在tryReleaseShared中则是主要对state的值做-1操作,如果state大于零可以获取到就减一并且用CAS并发更新值,如果最新值为0就返回true
- 返回true过后就doReleaseShared释放锁,唤醒队列里面的等待线程。也就是调用了await()方法的线程
- AQS在CountDownLatch中使用的一些点: 调用CountDownLatch的await()时,便会尝试获取共享锁,开始时是获取不到锁的,于是就被阻塞 可以获取到的条件就是计数器为0,也就是state==0的时候。 只有每次调用countDown方法才会使得计数器减一,减到0时就回去唤醒阻塞中的线程。
8. AQS在Semaphore中的源码剖析
- 由于上面讲得很细了,接下来就简略一些
- 在Semaphore中state就是许可证的数量
- 主要的操作就是acquire和release,也是借用Sync对state的操作来控制线程的阻塞与唤醒
<code> public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }/<code>
<code> public void release() { sync.releaseShared(1); }/<code>
- 先看下acquire调用的acquireSharedInterruptibly此方法在上面已经说过。
<code> public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }/<code>
- 而在Semaphore中Sync有两个实现:NonfairSync、FairSync
- 在FairSync中tryAcquireShared就会有hasQueuedPredecessors判断,如果不是头节点,那就返回-1,在acquireSharedInterruptibly方法中去调用doAcquireSharedInterruptibly入队并且阻塞线程
<code> protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }/<code>
而在NonfairSync中而是直接调用Sync的nonfairTryAcquireShared
<code>protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires);}/<code>
<code> final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }/<code>
- 可以看到其中并没有对是否阻塞队列的头节点判断,直接去获取值,判断是会否许可足够。
<code> public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }/<code>
<code> protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }/<code>
- 我们可以看到此处就是关于Semaphore的已获取许可的释放 把state加回去然后用CAS更新state
9. AQS在ReentrantLock中的应用
- 源码就不分析了
- 在ReentrantLock中,state主要是重入的次数,加锁的时候state+1 ,而在释放锁的时候,state-1然后判断当前的state==0
- 在ReentrantLock中与AQS相关的有三个类:UnfairSync,FairSync,Sync
- 关于加锁和解锁的逻辑也是AQS中的acquire方法的逻辑(获取锁失败就会放入队列中)和release方法(调用子类的tryRelease来去掉头部,并且唤醒线程)
- 而加锁解锁中的逻辑,主要是公平锁和非公平锁的区别,公平锁会去判断是否在队列头部,如果在才会去执行,而非公平锁则会抢锁。不会管你是不是在队列头部。
- 相信在上面的源码分析过后,分析ReentrantLock是十分简单的。大家可以自行分析。
作者:苡仁
原文链接:https://juejin.im/post/5e5354e6f265da5724466dbf
閱讀更多 追逐仰望星空 的文章