并发编程技术(五)之AbstractQueuedSynchronizer源码分析

上一节我们讲了ReentrantLock的实现解析,详细请回顾《 》,讲ReentrantLock实现解析时也讲到AbstractQueuedSynchronizer,本节开始讲解AbstractQueuedSynchronizer的源码实现。

AbstractQueuedSynchronizer(AQS)是JDK中实现并发编程的核心,它提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。平时我们工作中经常用到的ReentrantLock,CountDownLatch等都是基于它来实现的,AQS类在java.util.concurrent.locks下面。

AQS也是基于CAS实现的,CAS(CompareAndSet)是最小粒度的操作,保证了原子性,通过硬件指令集实现。

AbstractQueuedSynchronizer数据结构


并发编程技术(五)之AbstractQueuedSynchronizer源码分析


分析类,首先就要分析底层采用了何种数据结构

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
Node() {
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
...
}


说明:AbstractQueuedSynchronizer类底层的数据结构是使用双向链表,是队列的一种实现,故也可看成是队列。队列包括head结点和tail结点。


AQS主要有以下几个方法

1.acquire(int)

独占式模式下获取锁的抽象逻辑,此方法中大部分需要子类实现。可以参考ReentrantLock的实现逻辑,我们在上节内容概要讲述了ReentrantLock。
//独占锁模式获取锁
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}


tryAcquire是由子类实现尝试获得锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

若获得锁失败的情况下会执行下面的方法

把当前线程封装成一个node,添加到队列中。若同步器为空的情况下则直接执行enq(node)方法,若有同步器有node,通过compareAndSetTail方法把tail节点指向最后一个节点


//添加mode节点到队列中
private Node addWaiter(Node mode) {
//把当前线程封装成Node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure

Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//当队列中没有节点时,把当前添加到队列中
enq(node);
return node;
}
compareAndSetTail是通过Unsafe.compareAndSwapObject来实现交换


private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}


通过自旋的方式

//把当前节点添加到队列中
private Node enq(final Node node) {
for (;;) {//通过自旋方式把当前节点添加到队列中
Node t = tail;
if (t == null) { // 必须初始化一个Node节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;//设置当前节点的上个节点
if (compareAndSetTail(t, node)) {//把当前节点设置为tail节点
t.next = node;//设置原tail节点的下一个节点
return t;
}

}
}
}


final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取节点的上一个节点
final Node p = node.predecessor();
//当前节点为head才可以尝试获得锁
if (p == head && tryAcquire(arg)) {
setHead(node);//head节点指向当前节点
p.next = null; // 当前节点的上一个节点会GC回收
failed = false;
return interrupted;
}
//若上面获得锁失败,判断是不是要执行挂起操作
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}


//判断前节点是否需要挂起,当状态为SIGNAL时可以放心挂起,后面线程会唤醒SIGNAL状态的线程
//设置节点为SIGNAL,是防止发生惊群效应

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//若前节点状态为SIGNAL时,将要被线程唤醒
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//把取消的节点移除掉
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//设置前节点状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}


//挂起线程,说明当前线程处于阻塞状态
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();//线程复位
}


//取消锁的获取
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel

// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}


//线程复位
static void selfInterrupt() {
Thread.currentThread().interrupt();
}


以上获取锁的过程就讲完了。下面我们讲释放锁

//释放锁
public final boolean release(int arg) {
//尝试释放锁
if (tryRelease(arg)) {
Node h = head;

//唤醒处于等待下的线程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//尝试释放锁, 必须子类实现尝试释放过程,可以参考ReentrantLock中的
//释放锁是与获取锁一一对应,获取10次就要释放10次,释放时必须是当前线程的释放
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}


//唤醒处于等待下的线程
private void unparkSuccessor(Node node) {
//
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//对无效节点的清除
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}


AQS的获取锁和释放锁都讲完了,以上是根据个人理解做了分析,如有不正确请留言讨论。

----------




分享到:


相關文章: