线程并发——AQS抽象队列同步器

AQS队列同步器英文全称AbstractQueuedSynchronizer,这是一个抽象类,为什么我们今天需要学习这个抽象类呢?这个抽象类它的神奇之处到底是什么呢?我们一起来掀开它的神奇面纱吧!

什么是AQS(AbstractQueuedSynchronizer)?


线程并发——AQS抽象队列同步器

AQS中文翻译为同步器,Lock接口的实现类基本都是通过包含AQS子类对象来完成线程访问控制的。比如说Lock中的获取锁和释放锁操作。说白了就是一把抽象的锁。


线程并发——AQS抽象队列同步器

Lock通过调用AQS子类的lock方法实现获取锁,unlock方法实现释放锁。


线程并发——AQS抽象队列同步器

线程并发——AQS抽象队列同步器

1.1 AQS源码分析

1.1.1类字段分析:

 private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

1)我们可以看到字段中其实最主要的就是一个volatile 修饰的state(状态),state用于标记锁状态。

2)compareAndSetState方法是使用CAS算法修改state保证state修改时的数据安全。

3)head和tail表示队列的头和尾,也就是锁该类将线程和线程状态封装成一个节点Node存入到同步队列中,结构如下:


线程并发——AQS抽象队列同步器

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

1.1.2方法分析:


线程并发——AQS抽象队列同步器

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

1.1.3锁类型分析:


线程并发——AQS抽象队列同步器

1)公平锁能保证:老的线程排队使用锁,新线程仍然排队使用锁。 2)非公平锁保证:老的线程排队使用锁;但是无法保证新线程抢占已经在排队的线程的锁。

注意:ReentrantLock初始化默认为非公平锁,非公平锁减少了线程的挂起和恢复运行效率高于公平锁。公平锁可以解决饥饿发生。

1.1.4获取锁操作分析:

final void lock() {//获取锁

if (compareAndSetState(0, 1))//获取锁成功

setExclusiveOwnerThread(Thread.currentThread());//独占锁

else

acquire(1);//其他线程尝试获取锁

}

acquire****方法:


线程并发——AQS抽象队列同步器


1)tryAcquire()尝试直接去获取资源,如果成功则直接返回;

2)addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

3)acquireQueued()****方法;重新竞争同步锁

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//自旋
final Node p = node.predecessor();//获取当前节点的上一个节点
if (p == head && tryAcquire(arg)) {//上一节点为头节点和获取尝试获取锁成功
setHead(node);//上一个节点已经获取到锁,将当前阶段设置为头节点,下一次就由自己获取锁
p.next = null; // 头节点获取到锁离开队列
failed = false;
return interrupted;//返回中断状态
}
//不是头节点或者获取不到锁
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire****方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)//如果上一节点被唤醒,当前节点就进入等待
return true;
if (ws > 0) {//如果上一节点线程被取消了
do {
node.prev = pred = pred.prev;//当前阶段尝试跳过上一节点,插个队
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//尝试将上一个节点设置为唤醒状态
}
return false;
}

parkAndCheckInterrupt****方法:

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//线程进入等待状态
return Thread.interrupted();//线程唤醒后,检查自己是否真的处于中断状态,注意:interrupted会清除中断状态
}

1)selfInterrupt(),自我中断,当前线程调用了interrupt方法

流程图如下:


线程并发——AQS抽象队列同步器

16.1.5 释放锁操作分析

release****方法:

public final boolean release(int arg) {
if (tryRelease(arg)) {//尝试释放锁
Node h = head;
if (h != null && h.waitStatus != 0)//头节点不为null并且等待状态不是为没有状态
unparkSuccessor(h);//唤醒头节点的后面一个节点
return true;
}
return false;

}

tryRelease****方法:

protected final boolean tryRelease(int releases) {
int c = getState() - releases;//重入锁state大于1,需要调用两次unlock方法,释放两次
if (Thread.currentThread() != getExclusiveOwnerThread())//如果独占锁不是当前需要释放的线程,抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);//独占锁设置为null,释放锁
}
setState(c);
return free;
}

unparkSuccessor****方法:

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)//等待状态,不是被取消的状态
compareAndSetWaitStatus(node, ws, 0);//CAS修改回到最初的状态,没有等待的状态下
Node s = node.next;
if (s == null || s.waitStatus > 0) {//各种等待情况下
s = null;
//从队尾开始找不为空和不是当前节点的节点,直到找到当前节点的后面一个节点位置
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)//如果t为等待状态下
s = t;//s设置为t,拿到t这个节点

}
if (s != null)//唤醒当前节点的下一个节点
LockSupport.unpark(s.thread);
}


分享到:


相關文章: